In [None]:
%%python module Camunda

from js import XMLHttpRequest
from urllib.parse import urlencode
from IPython.display import display, update_display
from robot.api.deco import keyword
from robot.libraries.BuiltIn import BuiltIn

import json
import os

def http(method, path, params=None, data=None):
    request = XMLHttpRequest.new()
    request.open(method, f"{os.environ['CAMUNDA_ENGINE_API']}{path}?{urlencode(params or {})}", False)
    request.setRequestHeader("X-XSRF-TOKEN", os.environ["CAMUNDA_CSRF_TOKEN"])
    request.setRequestHeader("Content-Type", "application/json")
    request.send(json.dumps(data))
    assert request.status in [200, 204], request.responseText
    return json.loads(request.responseText or "null")


class Camunda:
    ROBOT_LIBRARY_SCOPE = 'SUITE'
    ROBOT_LISTENER_API_VERSION = 2

    def __init__(self):
        self.ROBOT_LIBRARY_LISTENER = self
        self.process_ids = []
        self.display_ids = []
        
    def display_process(self, display_id):
        process = http("GET", f"/history/process-instance/{self.process_ids[-1]}")
        params = dict(processInstanceId=self.process_ids[-1])
        activities = http("GET", f"/history/activity-instance", params)
        xml = http("GET", f"/process-definition/{process['processDefinitionId']}/xml")
        config = dict(
            style=dict(height="400px"),
            activities=activities, 
        )
        if display_id in self.display_ids:
            update_display({
                "application/bpmn+xml": xml["bpmn20Xml"],
                "application/bpmn+json": json.dumps(config),
            }, raw=True, display_id=display_id)
        else:
            display({
                "application/bpmn+xml": xml["bpmn20Xml"],
                "application/bpmn+json": json.dumps(config),
            }, raw=True, display_id=display_id)
        self.display_ids.append(display_id)
    
    def get_definitions(self):
        params = dict(latest="true")
        results = http("GET", "/process-definition", params)
        return [d["key"] for d in results]
    
    def get_instances(self):
        return http("GET", "/process-instance")
    
    def get_external_tasks(self):
        assert self.process_ids, "No process managed by Camunda library instance is running"
        params = dict(processInstanceId=self.process_ids[-1])
        return http("GET", "/external-task", params)
    
    @keyword(tags=["bpmn"])
    def complete_external_task(self, topic, variables):
        worker_id = f"{self.__class__.__name__}:{id(self)}"
        data = dict(
            workerId=worker_id,
            maxTasks=1,
            topics=[dict(
                topicName=topic,
                lockDuration=1000
            )]
        )
        external_tasks = http("POST", "/external-task/fetchAndLock", data=data)
        external_tasks = [x for x in external_tasks if x["processInstanceId"] == self.process_ids[-1]]
        assert external_tasks, "No external tasks available"
        for external_task in external_tasks:
            data = dict(workerId=worker_id, variables=variables)
            http("POST", f"/external-task/{external_task['id']}/complete", data=data)
    
    @keyword(tags=["bpmn"])
    def start_instance(self, key):
        data = dict(variables={})
        instance = http("POST", f"/process-definition/key/{key}/start", data=data)
        self.process_ids.append(instance["id"])
    
    @keyword(tags=["bpmn"])
    def stop_instance(self):
        if self.process_ids:
            process_id = self.process_ids.pop()
            process = http("GET", f"/history/process-instance/{process_id}")
            if process["state"] == "ACTIVE":
                params = dict(
                    skipCustomListeners="true",
                    skipIoMappings="true",
                    failIfNotExist="false",
                )
                http("DELETE", f"/process-instance/{process_id}", params)
            
    def end_keyword(self, name, attrs):
        if "bpmn" in attrs['tags']:
            self.display_process("bpmn")
            BuiltIn().sleep("1 s")

In [None]:
*** Settings ***

Library  Camunda

Suite setup  Start instance  ${PROCESS_KEY}
Suite teardown  Stop instance

*** Variables ***

${PROCESS_KEY}  traffic-data-loader

In [None]:
*** Keywords ***

Number of instances should be
    [Arguments]  ${number}  ${error}
    ${instances}=  Get instances
    ${number of instances}=  Get length  ${instances}
    Should be equal as integers  ${number of instances}  ${number}  ${error}
    
Number of external tasks should be
    [Arguments]  ${number}  ${error}
    ${external tasks}=  Get external tasks
    ${number of external tasks}=  Get length  ${external tasks}
    Should be equal as integers  ${number of external tasks}  ${number}  ${error}

Topic for all externals tasks should be
    [Arguments]  ${topic}  ${error}
    ${external tasks}=  Get external tasks
    FOR  ${external task}  IN  @{external tasks}
        Should be equal  ${external task}[topicName]  ${topic}  ${error}
    END

In [None]:
*** Test Cases ***

Test process is properly setup and teardown
    Number of instances should be  1  Process should really be running
    
Test start and stop instance
    Start instance  ${PROCESS_KEY}
    Number of instances should be  2  A new process should really be running
    [Teardown]  Stop instance

In [None]:
*** Test Cases ***

Producer task is the first task
    Number of external tasks should be  1  Process should start with Producer task
    Topic for all externals tasks should be  Produce traffic data work items  Wrong topic

In [None]:
*** Test Cases ***

After producer task comes consumer task
    Sleep  5 s

    Number of external tasks should be  1  Process should start with Producer task
    Topic for all externals tasks should be  Produce traffic data work items  Wrong topic
    ${item}=  Create dictionary
    ${data}=  Create List  ${item}  ${item}  ${item}  ${item}  ${item}  ${item}
    ${variable}  Create dictionary  value=${{json.dumps(${data})}}  type=Json
    ${variables}=  Create dictionary  traffic_data=${variable}
    Complete external task
    ...  Produce traffic data work items
    ...  ${variables}
    

    FOR  ${tmp}  IN  @{data}
        Number of external tasks should be  1  Process should continue with Consumer task
        Topic for all externals tasks should be  Consume traffic data work item  Wrong topic
    
        ${item}=  Create dictionary
        ${variable}  Create dictionary  value=OK  type=String
        ${variables}=  Create dictionary  status=${variable}
        Complete external task
        ...  Consume traffic data work item
        ...  ${variables}
    END
    
    Number of external tasks should be  0  Process should have been completed