Skip to content

Commit

Permalink
feat(zeebe): support StreamActivatedJobs RPC (#160)
Browse files Browse the repository at this point in the history
* feat(zeebe): support StreamActivatedJobs RPC

fixes #17

* test(zeebe): add inputVariableDto to test

* refactor(zeebe): incorporate DTO decoding

* test(zeebe): fix StreamJobs test

* docs(zeebe): document StreamJobs

* feat(zeebe): enable compression

* test(zeebe): fix StreamJobs test

* test(zeebe): add delay in StreamJobs test

* test(zeebe): close Zeebe cllent in StreamJob test

* fix(zeebe): cleanup Job Streams on close

* feat(zeebe): return close method from StreamJobs
  • Loading branch information
jwulf committed Jun 5, 2024
1 parent 5eff624 commit 258296a
Show file tree
Hide file tree
Showing 11 changed files with 637 additions and 11 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ The SDK uses the [`debug`](https://github.com/debug-js/debug) library. To enable
| `camunda:worker` | Zeebe Worker |
| `camunda:zeebeclient` | Zeebe Client |

Here is an example of turning on debugging for the OAuth and Operate components:

```bash
DEBUG=camunda:oauth,camunda:operate node app.js
```

## Typing of Zeebe worker variables

The variable payload in a Zeebe worker task handler is available as an object `job.variables`. By default, this is of type `any`.
Expand Down Expand Up @@ -263,3 +269,15 @@ This follows the same strategy as the job variables, as previously described.
From 8.5, you can use Zeebe user tasks. See the documentation on [how to migrate to Zeebe user tasks](https://docs.camunda.io/docs/apis-tools/tasklist-api-rest/migrate-to-zeebe-user-tasks/).

The SDK supports the Zeebe REST API. Be sure to set the `ZEEBE_REST_ADDRESS` either via environment variable or configuration field.

## Job Streaming

The Zeebe gRPC API supports streaming available jobs, rather than polling for them.

The ZeebeGrpcClient method `StreamJobs` allows you to use this API.

Please note that only jobs that become available _after_ the stream is opened are pushed to the client. For jobs that were already activatable _before_ the method is called, you need to use a polling worker.

In this release, this is not handled for you. You must both poll and stream jobs to make sure that you get jobs that were available before your application started as well as jobs that become available after your application starts.

In a subsequent release, the ZeebeWorker will transparently handle this for you.
6 changes: 5 additions & 1 deletion src/__tests__/config/jest.cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ export const cleanUp = async () => {
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
console.log('Error cancelling process instance', key)
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
console.log(
`Don't worry about it - Operate is eventually consistent.`
)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/__tests__/config/jest.globalSetup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import { cleanUp } from './jest.cleanup'

export default async () => {
console.log('Running global setup...')
cleanUp()
await cleanUp()
}
48 changes: 48 additions & 0 deletions src/__tests__/testdata/StreamJobs.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1hcuhqo" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.22.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="stream-jobs" name="Stream Jobs" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start">
<bpmn:outgoing>Flow_1jco6ri</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1jco6ri" sourceRef="StartEvent_1" targetRef="Activity_0xyl2dv" />
<bpmn:endEvent id="Event_1kkcbv7" name="End">
<bpmn:incoming>Flow_1l2fpc9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1l2fpc9" sourceRef="Activity_0xyl2dv" targetRef="Event_1kkcbv7" />
<bpmn:serviceTask id="Activity_0xyl2dv" name="stream-job">
<bpmn:extensionElements>
<zeebe:taskDefinition type="stream-job" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_1jco6ri</bpmn:incoming>
<bpmn:outgoing>Flow_1l2fpc9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="stream-jobs">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="185" y="142" width="24" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0zd24as_di" bpmnElement="Activity_0xyl2dv">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1kkcbv7_di" bpmnElement="Event_1kkcbv7">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="440" y="142" width="20" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1jco6ri_di" bpmnElement="Flow_1jco6ri">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1l2fpc9_di" bpmnElement="Flow_1l2fpc9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
67 changes: 67 additions & 0 deletions src/__tests__/zeebe/integration/StreamJobs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses'

process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
jest.setTimeout(25000)

let bpmnProcessId: string
let processDefinitionKey: string

beforeAll(async () => {
suppressZeebeLogging()
})

afterAll(async () => {
restoreZeebeLogging()
await cancelProcesses(processDefinitionKey)
})

test('Can activate jobs using StreamActivatedJobs RPC', async () => {
const zbc = new ZeebeGrpcClient()
;({ bpmnProcessId, processDefinitionKey } = (
await zbc.deployResource({
processFilename: './src/__tests__/testdata/StreamJobs.bpmn',
})
).deployments[0].process)
await cancelProcesses(processDefinitionKey)

await new Promise((resolve) => {
let counter = 0
zbc.streamJobs({
type: 'stream-job',
worker: 'test-worker',
tenantIds: ['<default>'],
taskHandler: (job) => {
counter++
expect(job.variables.foo).toBe('bar')
const res = job.complete({})
if (counter === 3) {
zbc.close()
resolve(null)
}
return res
},
inputVariableDto: class {
foo!: string
},
fetchVariables: [],
timeout: 30000,
})
// Wait two seconds to ensure the stream is active
new Promise((resolve) => setTimeout(resolve, 2000)).then(() => {
zbc.createProcessInstance({
bpmnProcessId,
variables: { foo: 'bar' },
})
zbc.createProcessInstance({
bpmnProcessId,
variables: { foo: 'bar' },
})
zbc.createProcessInstance({
bpmnProcessId,
variables: { foo: 'bar' },
})
})
})
})
49 changes: 49 additions & 0 deletions src/subscription/Subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import EventEmitter from 'events'

export class Subscription<T> extends EventEmitter {
private _cancelled = false
dataGenerator: (interval: number) => AsyncGenerator<T, void, unknown>

constructor(restCall: () => Promise<T>) {
super()
// Define a function generator that fetches data at regular intervals
this.dataGenerator = async function* (
interval: number
// eslint-disable-next-line @typescript-eslint/no-explicit-any
): AsyncGenerator<T, void, unknown> {
while (true) {
try {
const data = await restCall()
if (data) {
// or 404
// If data is available, yield it
yield data
// Break the loop if data is available
break
}
} catch (error) {
console.error('Error fetching data:', error)
}
// Wait for the specified interval before trying again
await new Promise((resolve) => setTimeout(resolve, interval))
}
}
this.start()
}

public cancel() {
this._cancelled = true
this.emit('cancelled')
}

public get cancelled() {
return this._cancelled
}

private async start() {
const generator = this.dataGenerator(5000) // Fetch data every 5 seconds
for await (const data of generator) {
this.emit('data', data)
}
}
}
21 changes: 21 additions & 0 deletions src/zeebe/lib/GrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,26 @@ export class GrpcClient extends EventEmitter {
*/
'grpc.http2.max_pings_without_data':
this.config.zeebeGrpcSettings.GRPC_HTTP2_MAX_PINGS_WITHOUT_DATA,
/**
* Default compression algorithm for the channel, applies to sending messages.
*
* Possible values for this option are:
* - `0` - No compression
* - `1` - Compress with DEFLATE algorithm
* - `2` - Compress with GZIP algorithm
* - `3` - Stream compression with GZIP algorithm
*/
'grpc.default_compression_algorithm': 2,
/**
* Default compression level for the channel, applies to receiving messages.
*
* Possible values for this option are:
* - `0` - None
* - `1` - Low level
* - `2` - Medium level
* - `3` - High level
*/
'grpc.default_compression_level': 2,
interceptors: [this.interceptor],
})
this.listNameMethods = []
Expand Down Expand Up @@ -299,6 +319,7 @@ export class GrpcClient extends EventEmitter {
let stream: ClientReadableStream<unknown>
const timeNormalisedRequest =
replaceTimeValuesWithMillisecondNumber(data)
debug('TimeNormalisedRequest', timeNormalisedRequest)
try {
const metadata = await this.getAuthToken()

Expand Down
Loading

0 comments on commit 258296a

Please sign in to comment.