Skip to content

Commit

Permalink
feat(zeebe): add multi-tenant support to workers (#176)
Browse files Browse the repository at this point in the history
adds tenantIds: string[] to stream and polling worker config

fixes #171
  • Loading branch information
jwulf committed Jun 7, 2024
1 parent 9eff183 commit 434f697
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 28 deletions.
44 changes: 28 additions & 16 deletions src/__tests__/config/jest.cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,44 @@ export const cleanUp = async () => {
const processIds = (bpmn as any[]).map(
(b) => b?.['bpmn:definitions']?.['bpmn:process']?.['@_id']
)
const operate = new OperateApiClient()
const zeebe = new ZeebeGrpcClient({
config: {
zeebeGrpcSettings: { ZEEBE_CLIENT_LOG_LEVEL: 'NONE' },
},
})
for (const id of processIds) {
if (id) {
const res = await operate.searchProcessInstances({
filter: { bpmnProcessId: id, state: 'ACTIVE' },
})
const instancesKeys = res.items.map((instance) => instance.key)
if (instancesKeys.length > 0) {
console.log(`Cancelling ${instancesKeys.length} instances for ${id}`)
}
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
// Are we running in a multi-tenant environment?
const multiTenant = !!process.env.CAMUNDA_TENANT_ID
const tenantIds = multiTenant
? ['<default>', 'red', 'green']
: [undefined]
for (const tenantId of tenantIds) {
const operate = new OperateApiClient({
config: {
CAMUNDA_TENANT_ID: tenantId,
},
})
const res = await operate.searchProcessInstances({
filter: { bpmnProcessId: id, state: 'ACTIVE' },
})
const instancesKeys = res.items.map((instance) => instance.key)
if (instancesKeys.length > 0) {
console.log(
`Don't worry about it - Operate is eventually consistent.`
`Cancelling ${instancesKeys.length} instances for ${id} in tenant '${tenantId}'...`
)
}
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
if (!(e as Error).message.startsWith('5 NOT_FOUND')) {
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
}
}
}
}
}
}
Expand Down
48 changes: 48 additions & 0 deletions src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1o5c8zw" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="multi-tenant-stream-worker-test" name="Multi-tenant Stream Worker Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start multi-tenancy worker test">
<bpmn:outgoing>Flow_0r8p543</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0r8p543" sourceRef="StartEvent_1" targetRef="Activity_1an5aay" />
<bpmn:endEvent id="Event_1hylnf3" name="Multi-tenancy worker test complete">
<bpmn:incoming>Flow_08wm3o9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_08wm3o9" sourceRef="Activity_1an5aay" targetRef="Event_1hylnf3" />
<bpmn:serviceTask id="Activity_1an5aay" name="multi-tenant-stream-work">
<bpmn:extensionElements>
<zeebe:taskDefinition type="multi-tenant-stream-work" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0r8p543</bpmn:incoming>
<bpmn:outgoing>Flow_08wm3o9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="multi-tenant-stream-worker-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="160" y="142" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hylnf3_di" bpmnElement="Event_1hylnf3">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="417" y="142" width="66" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0cx6d07_di" bpmnElement="Activity_1an5aay">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0r8p543_di" bpmnElement="Flow_0r8p543">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08wm3o9_di" bpmnElement="Flow_08wm3o9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
48 changes: 48 additions & 0 deletions src/__tests__/testdata/multi-tenant-worker-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1o5c8zw" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="multi-tenant-worker-test" name="Multi-tenant Worker Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start multi-tenancy worker test">
<bpmn:outgoing>Flow_0r8p543</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0r8p543" sourceRef="StartEvent_1" targetRef="Activity_1an5aay" />
<bpmn:endEvent id="Event_1hylnf3" name="Multi-tenancy worker test complete">
<bpmn:incoming>Flow_08wm3o9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_08wm3o9" sourceRef="Activity_1an5aay" targetRef="Event_1hylnf3" />
<bpmn:serviceTask id="Activity_1an5aay" name="multi-tenant-work">
<bpmn:extensionElements>
<zeebe:taskDefinition type="multi-tenant-work" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0r8p543</bpmn:incoming>
<bpmn:outgoing>Flow_08wm3o9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="multi-tenant-worker-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="160" y="142" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hylnf3_di" bpmnElement="Event_1hylnf3">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="417" y="142" width="66" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0cx6d07_di" bpmnElement="Activity_1an5aay">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0r8p543_di" bpmnElement="Flow_0r8p543">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08wm3o9_di" bpmnElement="Flow_08wm3o9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
110 changes: 110 additions & 0 deletions src/__tests__/zeebe/multitenancy/multitenant-worker-mt.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe/index'

jest.setTimeout(10000)

beforeAll(() => {
suppressZeebeLogging()
})

afterAll(() => {
restoreZeebeLogging()
})

test('A worker can be multi-tenant', async () => {
const client = new ZeebeGrpcClient()

await client.deployResource({
processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn',
tenantId: '<default>',
})

await client.deployResource({
processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn',
tenantId: 'green',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-worker-test',
variables: { foo: 'bar' },
tenantId: '<default>',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-worker-test',
variables: { foo: 'bar' },
tenantId: 'green',
})

let greenTenant = false,
defaultTenant = false
await new Promise((resolve) =>
client.createWorker({
taskHandler: (job) => {
greenTenant = greenTenant || job.tenantId === 'green'
defaultTenant = defaultTenant || job.tenantId === '<default>'
if (greenTenant && defaultTenant) {
resolve(null)
}
return job.complete()
},
taskType: 'multi-tenant-work',
tenantIds: ['<default>', 'green'],
})
)

await client.close()
})

test('A stream worker can be multi-tenant', async () => {
const client = new ZeebeGrpcClient()

await client.deployResource({
processFilename:
'./src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn',
tenantId: '<default>',
})

await client.deployResource({
processFilename:
'./src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn',
tenantId: 'green',
})

let greenTenant = false,
defaultTenant = false
// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
client.streamJobs({
taskHandler: async (job) => {
greenTenant = greenTenant || job.tenantId === 'green'
defaultTenant = defaultTenant || job.tenantId === '<default>'
const res = await job.complete()
if (greenTenant && defaultTenant) {
resolve(null)
}
return res
},
type: 'multi-tenant-stream-work',
tenantIds: ['<default>', 'green'],
worker: 'stream-worker',
timeout: 2000,
})

await new Promise((resolve) => setTimeout(resolve, 2000))

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-stream-worker-test',
variables: { foo: 'bar' },
tenantId: '<default>',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-stream-worker-test',
variables: { foo: 'bar' },
tenantId: 'green',
})
})

await client.close()
})
7 changes: 0 additions & 7 deletions src/zeebe/lib/ZBStreamWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ export class ZBStreamWorker implements IZBJobWorker {
stream.on('error', (e) => {
console.error(e)
})
// stream.on('pause', () => console.log('paused'))
// stream.on('metadata', (m) => console.log(m))
// stream.on('readable', () => console.log('readable'))
// stream.on('status', () => console.log('status'))
// stream.on('close', () => console.log('close'))
// stream.on('end', () => console.log('end'))
// stream.on('resume', (n) => console.log('resume', n))
stream.on('data', (res: ActivatedJob) => {
// Make handlers
const job: Job<WorkerInputVariables, CustomHeaderShape> =
Expand Down
8 changes: 5 additions & 3 deletions src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface ZBWorkerConstructorConfig<
inputVariableDto?: { new (...args: any[]): Readonly<WorkerInputVariables> }
// eslint-disable-next-line @typescript-eslint/no-explicit-any
customHeadersDto?: { new (...args: any[]): Readonly<CustomHeaderShape> }
tenantIds: string[] | [string] | undefined
}

export class ZBWorkerBase<
Expand Down Expand Up @@ -101,7 +102,6 @@ export class ZBWorkerBase<
private pollMutex: boolean = false
private backPressureRetryCount: number = 0
private fetchVariable: (keyof WorkerInputVariables)[] | undefined
private tenantId?: string
private inputVariableDto: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (obj: any): WorkerInputVariables
Expand All @@ -110,6 +110,7 @@ export class ZBWorkerBase<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (...args: any[]): CustomHeaderShape
}
private tenantIds: string[] | [string] | undefined

constructor({
grpcClient,
Expand All @@ -121,13 +122,15 @@ export class ZBWorkerBase<
zbClient,
inputVariableDto,
customHeadersDto,
tenantIds,
}: ZBWorkerConstructorConfig<
WorkerInputVariables,
CustomHeaderShape,
WorkerOutputVariables
>) {
super()
options = options || {}
this.tenantIds = tenantIds
if (!taskType) {
throw new Error('Missing taskType')
}
Expand All @@ -146,7 +149,6 @@ export class ZBWorkerBase<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (obj: any): CustomHeaderShape
})
this.tenantId = options.tenantId
this.taskHandler = taskHandler
this.taskType = taskType
this.maxJobsToActivate =
Expand Down Expand Up @@ -560,7 +562,7 @@ You should call only one job action method in the worker handler. This is a bug
type: this.taskType,
worker: this.id,
fetchVariable: this.fetchVariable as string[],
tenantIds: this.tenantId ? [this.tenantId] : undefined,
tenantIds: this.tenantIds,
}

this.logger.logDebug(
Expand Down
8 changes: 8 additions & 0 deletions src/zeebe/lib/interfaces-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ export interface Job<
* All visible variables in the task scope, computed at activation time.
*/
readonly variables: Readonly<Variables>
/**
* TenantId of the job in a multi-tenant cluster
*/
readonly tenantId: string
}

export interface ZBWorkerOptions<InputVars = IInputVariables> {
Expand Down Expand Up @@ -373,6 +377,10 @@ export interface ZBWorkerConfig<
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
customHeadersDto?: { new (...args: any[]): Readonly<CustomHeaderShape> }
/**
* An optional array of tenantIds if you want this to be a multi-tenant worker.
*/
tenantIds?: string[]
}

export interface BroadcastSignalReq {
Expand Down
Loading

0 comments on commit 434f697

Please sign in to comment.