Skip to content

Commit

Permalink
Alpha (#178)
Browse files Browse the repository at this point in the history
* feat(zeebe): add updateJobTimeout method (#172)

* feat(zeebe): add updateJobTimeout method

fixes #171

* feat(zeebe): support StreamActivatedJobs RPC (#160)

* feat(zeebe): support StreamActivatedJobs RPC

fixes #17

* test(zeebe): add inputVariableDto to test

* refactor(zeebe): incorporate DTO decoding

* test(zeebe): fix StreamJobs test

* docs(zeebe): document StreamJobs

* feat(zeebe): enable compression

* test(zeebe): fix StreamJobs test

* test(zeebe): add delay in StreamJobs test

* test(zeebe): close Zeebe cllent in StreamJob test

* fix(zeebe): cleanup Job Streams on close

* feat(zeebe): return close method from StreamJobs

* chore(release): 8.5.5-alpha.1 [skip ci]

## [8.5.5-alpha.1](v8.5.4...v8.5.5-alpha.1) (2024-06-05)

### Features

* **zeebe:** add updateJobTimeout method ([#172](#172)) ([5eff624](5eff624)), closes [#171](#171)
* **zeebe:** support StreamActivatedJobs RPC ([#160](#160)) ([258296a](258296a)), closes [#17](#17)

* chore(release): 8.5.5-alpha.1 [skip ci]

## [8.5.5-alpha.1](v8.5.4...v8.5.5-alpha.1) (2024-06-05)

### Features

* **zeebe:** add updateJobTimeout method ([#172](#172)) ([5eff624](5eff624)), closes [#171](#171)
* **zeebe:** support StreamActivatedJobs RPC ([#160](#160)) ([258296a](258296a)), closes [#17](#17)

* feat(zeebe): add multi-tenant support to workers (#175)

adds tenantIds: string[] to stream and polling worker config

fixes #171

* chore(release): 8.6.1-alpha.1 [skip ci]

## [8.6.1-alpha.1](v8.6.0...v8.6.1-alpha.1) (2024-06-07)

### Features

* **zeebe:** add multi-tenant support to workers ([#175](#175)) ([28450a5](28450a5)), closes [#171](#171)
* **zeebe:** add updateJobTimeout method ([#172](#172)) ([5eff624](5eff624)), closes [#171](#171)
* **zeebe:** support StreamActivatedJobs RPC ([#160](#160)) ([258296a](258296a)), closes [#17](#17)

* chore(release): 8.6.1-alpha.1 [skip ci]

## [8.6.1-alpha.1](v8.6.0...v8.6.1-alpha.1) (2024-06-07)

### Features

* **zeebe:** add multi-tenant support to workers ([#175](#175)) ([28450a5](28450a5)), closes [#171](#171)
* **zeebe:** add updateJobTimeout method ([#172](#172)) ([5eff624](5eff624)), closes [#171](#171)
* **zeebe:** support StreamActivatedJobs RPC ([#160](#160)) ([258296a](258296a)), closes [#17](#17)

* ci(repo): add dispatch workflow to actions

---------

Co-authored-by: semantic-release-bot <semantic-release-bot@martynus.net>
  • Loading branch information
jwulf and semantic-release-bot authored Jun 10, 2024
1 parent a52af19 commit 05b3ecb
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 32 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
name: Pull Request tests

on: [pull_request]
on:
pull_request:
workflow_dispatch:

jobs:
unit-tests:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Publish a new version

on:
workflow_dispatch:
push:
branches:
- main
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## [8.6.1-alpha.1](https://github.com/camunda/camunda-8-js-sdk/compare/v8.6.0...v8.6.1-alpha.1) (2024-06-07)


### Features

* **zeebe:** add multi-tenant support to workers ([#175](https://github.com/camunda/camunda-8-js-sdk/issues/175)) ([28450a5](https://github.com/camunda/camunda-8-js-sdk/commit/28450a50a2cbb70b5f8958e1d94c144f817a8758)), closes [#171](https://github.com/camunda/camunda-8-js-sdk/issues/171)
* **zeebe:** add updateJobTimeout method ([#172](https://github.com/camunda/camunda-8-js-sdk/issues/172)) ([5eff624](https://github.com/camunda/camunda-8-js-sdk/commit/5eff6243dbce5fd296daeedcf6191ef4c4d4b609)), closes [#171](https://github.com/camunda/camunda-8-js-sdk/issues/171)
* **zeebe:** support StreamActivatedJobs RPC ([#160](https://github.com/camunda/camunda-8-js-sdk/issues/160)) ([258296a](https://github.com/camunda/camunda-8-js-sdk/commit/258296aef6558f976dd299ea977514d58d822141)), closes [#17](https://github.com/camunda/camunda-8-js-sdk/issues/17)

# [8.6.0](https://github.com/camunda/camunda-8-js-sdk/compare/v8.5.4...v8.6.0) (2024-06-05)

## [8.5.5-alpha.1](https://github.com/camunda/camunda-8-js-sdk/compare/v8.5.4...v8.5.5-alpha.1) (2024-06-05)
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@camunda8/sdk",
"version": "8.6.0",
"version": "8.6.1-alpha.1",
"description": "",
"main": "dist/index.js",
"scripts": {
Expand Down
44 changes: 28 additions & 16 deletions src/__tests__/config/jest.cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,44 @@ export const cleanUp = async () => {
const processIds = (bpmn as any[]).map(
(b) => b?.['bpmn:definitions']?.['bpmn:process']?.['@_id']
)
const operate = new OperateApiClient()
const zeebe = new ZeebeGrpcClient({
config: {
zeebeGrpcSettings: { ZEEBE_CLIENT_LOG_LEVEL: 'NONE' },
},
})
for (const id of processIds) {
if (id) {
const res = await operate.searchProcessInstances({
filter: { bpmnProcessId: id, state: 'ACTIVE' },
})
const instancesKeys = res.items.map((instance) => instance.key)
if (instancesKeys.length > 0) {
console.log(`Cancelling ${instancesKeys.length} instances for ${id}`)
}
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
// Are we running in a multi-tenant environment?
const multiTenant = !!process.env.CAMUNDA_TENANT_ID
const tenantIds = multiTenant
? ['<default>', 'red', 'green']
: [undefined]
for (const tenantId of tenantIds) {
const operate = new OperateApiClient({
config: {
CAMUNDA_TENANT_ID: tenantId,
},
})
const res = await operate.searchProcessInstances({
filter: { bpmnProcessId: id, state: 'ACTIVE' },
})
const instancesKeys = res.items.map((instance) => instance.key)
if (instancesKeys.length > 0) {
console.log(
`Don't worry about it - Operate is eventually consistent.`
`Cancelling ${instancesKeys.length} instances for ${id} in tenant '${tenantId}'...`
)
}
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
if (!(e as Error).message.startsWith('5 NOT_FOUND')) {
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
}
}
}
}
}
}
Expand Down
48 changes: 48 additions & 0 deletions src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1o5c8zw" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="multi-tenant-stream-worker-test" name="Multi-tenant Stream Worker Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start multi-tenancy worker test">
<bpmn:outgoing>Flow_0r8p543</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0r8p543" sourceRef="StartEvent_1" targetRef="Activity_1an5aay" />
<bpmn:endEvent id="Event_1hylnf3" name="Multi-tenancy worker test complete">
<bpmn:incoming>Flow_08wm3o9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_08wm3o9" sourceRef="Activity_1an5aay" targetRef="Event_1hylnf3" />
<bpmn:serviceTask id="Activity_1an5aay" name="multi-tenant-stream-work">
<bpmn:extensionElements>
<zeebe:taskDefinition type="multi-tenant-stream-work" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0r8p543</bpmn:incoming>
<bpmn:outgoing>Flow_08wm3o9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="multi-tenant-stream-worker-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="160" y="142" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hylnf3_di" bpmnElement="Event_1hylnf3">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="417" y="142" width="66" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0cx6d07_di" bpmnElement="Activity_1an5aay">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0r8p543_di" bpmnElement="Flow_0r8p543">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08wm3o9_di" bpmnElement="Flow_08wm3o9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
48 changes: 48 additions & 0 deletions src/__tests__/testdata/multi-tenant-worker-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1o5c8zw" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="multi-tenant-worker-test" name="Multi-tenant Worker Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start multi-tenancy worker test">
<bpmn:outgoing>Flow_0r8p543</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0r8p543" sourceRef="StartEvent_1" targetRef="Activity_1an5aay" />
<bpmn:endEvent id="Event_1hylnf3" name="Multi-tenancy worker test complete">
<bpmn:incoming>Flow_08wm3o9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_08wm3o9" sourceRef="Activity_1an5aay" targetRef="Event_1hylnf3" />
<bpmn:serviceTask id="Activity_1an5aay" name="multi-tenant-work">
<bpmn:extensionElements>
<zeebe:taskDefinition type="multi-tenant-work" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0r8p543</bpmn:incoming>
<bpmn:outgoing>Flow_08wm3o9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="multi-tenant-worker-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="160" y="142" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hylnf3_di" bpmnElement="Event_1hylnf3">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="417" y="142" width="66" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0cx6d07_di" bpmnElement="Activity_1an5aay">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0r8p543_di" bpmnElement="Flow_0r8p543">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08wm3o9_di" bpmnElement="Flow_08wm3o9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
110 changes: 110 additions & 0 deletions src/__tests__/zeebe/multitenancy/multitenant-worker-mt.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe/index'

jest.setTimeout(10000)

beforeAll(() => {
suppressZeebeLogging()
})

afterAll(() => {
restoreZeebeLogging()
})

test('A worker can be multi-tenant', async () => {
const client = new ZeebeGrpcClient()

await client.deployResource({
processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn',
tenantId: '<default>',
})

await client.deployResource({
processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn',
tenantId: 'green',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-worker-test',
variables: { foo: 'bar' },
tenantId: '<default>',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-worker-test',
variables: { foo: 'bar' },
tenantId: 'green',
})

let greenTenant = false,
defaultTenant = false
await new Promise((resolve) =>
client.createWorker({
taskHandler: (job) => {
greenTenant = greenTenant || job.tenantId === 'green'
defaultTenant = defaultTenant || job.tenantId === '<default>'
if (greenTenant && defaultTenant) {
resolve(null)
}
return job.complete()
},
taskType: 'multi-tenant-work',
tenantIds: ['<default>', 'green'],
})
)

await client.close()
})

test('A stream worker can be multi-tenant', async () => {
const client = new ZeebeGrpcClient()

await client.deployResource({
processFilename:
'./src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn',
tenantId: '<default>',
})

await client.deployResource({
processFilename:
'./src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn',
tenantId: 'green',
})

let greenTenant = false,
defaultTenant = false
// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
client.streamJobs({
taskHandler: async (job) => {
greenTenant = greenTenant || job.tenantId === 'green'
defaultTenant = defaultTenant || job.tenantId === '<default>'
const res = await job.complete()
if (greenTenant && defaultTenant) {
resolve(null)
}
return res
},
type: 'multi-tenant-stream-work',
tenantIds: ['<default>', 'green'],
worker: 'stream-worker',
timeout: 2000,
})

await new Promise((resolve) => setTimeout(resolve, 2000))

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-stream-worker-test',
variables: { foo: 'bar' },
tenantId: '<default>',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-stream-worker-test',
variables: { foo: 'bar' },
tenantId: 'green',
})
})

await client.close()
})
7 changes: 0 additions & 7 deletions src/zeebe/lib/ZBStreamWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ export class ZBStreamWorker implements IZBJobWorker {
stream.on('error', (e) => {
console.error(e)
})
// stream.on('pause', () => console.log('paused'))
// stream.on('metadata', (m) => console.log(m))
// stream.on('readable', () => console.log('readable'))
// stream.on('status', () => console.log('status'))
// stream.on('close', () => console.log('close'))
// stream.on('end', () => console.log('end'))
// stream.on('resume', (n) => console.log('resume', n))
stream.on('data', (res: ActivatedJob) => {
// Make handlers
const job: Job<WorkerInputVariables, CustomHeaderShape> =
Expand Down
8 changes: 5 additions & 3 deletions src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface ZBWorkerConstructorConfig<
inputVariableDto?: { new (...args: any[]): Readonly<WorkerInputVariables> }
// eslint-disable-next-line @typescript-eslint/no-explicit-any
customHeadersDto?: { new (...args: any[]): Readonly<CustomHeaderShape> }
tenantIds: string[] | [string] | undefined
}

export class ZBWorkerBase<
Expand Down Expand Up @@ -101,7 +102,6 @@ export class ZBWorkerBase<
private pollMutex: boolean = false
private backPressureRetryCount: number = 0
private fetchVariable: (keyof WorkerInputVariables)[] | undefined
private tenantId?: string
private inputVariableDto: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (obj: any): WorkerInputVariables
Expand All @@ -110,6 +110,7 @@ export class ZBWorkerBase<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (...args: any[]): CustomHeaderShape
}
private tenantIds: string[] | [string] | undefined

constructor({
grpcClient,
Expand All @@ -121,13 +122,15 @@ export class ZBWorkerBase<
zbClient,
inputVariableDto,
customHeadersDto,
tenantIds,
}: ZBWorkerConstructorConfig<
WorkerInputVariables,
CustomHeaderShape,
WorkerOutputVariables
>) {
super()
options = options || {}
this.tenantIds = tenantIds
if (!taskType) {
throw new Error('Missing taskType')
}
Expand All @@ -146,7 +149,6 @@ export class ZBWorkerBase<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (obj: any): CustomHeaderShape
})
this.tenantId = options.tenantId
this.taskHandler = taskHandler
this.taskType = taskType
this.maxJobsToActivate =
Expand Down Expand Up @@ -560,7 +562,7 @@ You should call only one job action method in the worker handler. This is a bug
type: this.taskType,
worker: this.id,
fetchVariable: this.fetchVariable as string[],
tenantIds: this.tenantId ? [this.tenantId] : undefined,
tenantIds: this.tenantIds,
}

this.logger.logDebug(
Expand Down
Loading

0 comments on commit 05b3ecb

Please sign in to comment.