Skip to content

Commit

Permalink
feat(zeebe): support StreamActivatedJobs RPC
Browse files Browse the repository at this point in the history
fixes #17
  • Loading branch information
jwulf committed May 14, 2024
1 parent 02558d0 commit d8d47ed
Show file tree
Hide file tree
Showing 7 changed files with 529 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/__tests__/config/jest.cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export const cleanUp = async () => {
try {
await zeebe.cancelProcessInstance(key)
} catch (e) {
console.log('Error cancelling process instance', key)
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
}
}
Expand Down
48 changes: 48 additions & 0 deletions src/__tests__/testdata/StreamJobs.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1hcuhqo" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.22.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="stream-jobs" name="Stream Jobs" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start">
<bpmn:outgoing>Flow_1jco6ri</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1jco6ri" sourceRef="StartEvent_1" targetRef="Activity_0xyl2dv" />
<bpmn:endEvent id="Event_1kkcbv7" name="End">
<bpmn:incoming>Flow_1l2fpc9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1l2fpc9" sourceRef="Activity_0xyl2dv" targetRef="Event_1kkcbv7" />
<bpmn:serviceTask id="Activity_0xyl2dv" name="stream-job">
<bpmn:extensionElements>
<zeebe:taskDefinition type="stream-job" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_1jco6ri</bpmn:incoming>
<bpmn:outgoing>Flow_1l2fpc9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="stream-jobs">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="185" y="142" width="24" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0zd24as_di" bpmnElement="Activity_0xyl2dv">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1kkcbv7_di" bpmnElement="Event_1kkcbv7">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="440" y="142" width="20" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1jco6ri_di" bpmnElement="Flow_1jco6ri">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1l2fpc9_di" bpmnElement="Flow_1l2fpc9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
46 changes: 46 additions & 0 deletions src/__tests__/zeebe/integration/StreamJobs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses'

process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
jest.setTimeout(25000)

let bpmnProcessId: string
let processDefinitionKey: string

beforeAll(async () => {
suppressZeebeLogging()
})

afterAll(async () => {
restoreZeebeLogging()
await cancelProcesses(processDefinitionKey)
})

test('Can activate jobs using StreamActivatedJobs RPC', async () => {
// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
const zbc = new ZeebeGrpcClient()
;({ bpmnProcessId, processDefinitionKey } = (
await zbc.deployResource({
processFilename: './src/__tests__/testdata/StreamJobs.bpmn',
})
).deployments[0].process)
await cancelProcesses(processDefinitionKey)
zbc.streamJobs({
type: 'stream-job',
worker: 'test-worker',
taskHandler: async (job) => {
expect(job.variables.foo).toBe('bar')
const res = job.complete()
zbc.close().then(() => resolve(res))
return res
},
timeout: 3000,
})
zbc.createProcessInstance({
bpmnProcessId,
variables: { foo: 'bar' },
})
})
})
49 changes: 49 additions & 0 deletions src/subscription/Subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import EventEmitter from 'events'

export class Subscription<T> extends EventEmitter {
private _cancelled = false
dataGenerator: (interval: number) => AsyncGenerator<T, void, unknown>

constructor(restCall: () => Promise<T>) {
super()
// Define a function generator that fetches data at regular intervals
this.dataGenerator = async function* (
interval: number
// eslint-disable-next-line @typescript-eslint/no-explicit-any
): AsyncGenerator<T, void, unknown> {
while (true) {
try {
const data = await restCall()
if (data) {
// or 404
// If data is available, yield it
yield data
// Break the loop if data is available
break
}
} catch (error) {
console.error('Error fetching data:', error)
}
// Wait for the specified interval before trying again
await new Promise((resolve) => setTimeout(resolve, interval))
}
}
this.start()
}

public cancel() {
this._cancelled = true
this.emit('cancelled')
}

public get cancelled() {
return this._cancelled
}

private async start() {
const generator = this.dataGenerator(5000) // Fetch data every 5 seconds
for await (const data of generator) {
this.emit('data', data)
}
}
}
Loading

0 comments on commit d8d47ed

Please sign in to comment.