In [10]:
%%python module Camunda

from js import XMLHttpRequest

import json


class Http:
    @staticmethod
    def get(url):
        request = XMLHttpRequest.new()
        request.open("GET", url, False)
        request.send(None)
        assert request.status in [200], request.responseText
        return json.loads(request.responseText or 'null')
        
    @staticmethod
    def post(url, data):
        request = XMLHttpRequest.new()
        request.open("POST", url, False)
        request.setRequestHeader("Content-Type", "application/json")
        request.setRequestHeader("Content-Length", len(json.dumps(data)))
        request.send(json.dumps(data))
        assert request.status in [200, 204], request.responseText
        return json.loads(request.responseText or 'null')

    
class Camunda:

    def __init__(self, worker_id):
        self.base_url = "http://localhost:8080/engine-rest"
        self.worker_id = worker_id

    def get_tasks(self):
        return Http.get(f"{self.base_url}/external-task")
    
    def fetch_and_lock(self, topic):
        payload = {
            "workerId": self.worker_id,
            "maxTasks": 10,
            "topics": [{
                "topicName": topic,
                "lockDuration": 1000 * 60,
            }]
        }
        return Http.post(f"{self.base_url}/external-task/fetchAndLock", payload)
    
    def complete_task(self, task):
        payload = {
            "workerId": self.worker_id,
        }
        return Http.post(f"{self.base_url}/external-task/{task['id']}/complete", payload)   

In [14]:
*** Settings ***

Library  IPython.display
Library  Camunda  worker_id=Robot

In [15]:
*** Tasks ***

Get external tasks
    Get tasks

In [17]:
*** Tasks ***
    
Complete external tasks
    ${tasks}=  Fetch and lock  ExternalTaskTopic
    FOR  ${task}  IN  @{tasks}
        Display  ${task}
        Complete task  ${task}
    END