Skip to content

Commit

Permalink
feat(zeebe): add updateJobTimeout method
Browse files Browse the repository at this point in the history
fixes #171
  • Loading branch information
jwulf committed Jun 4, 2024
1 parent d3c9b57 commit b1cc89c
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 0 deletions.
47 changes: 47 additions & 0 deletions src/__tests__/testdata/Client-Update-Job-Timeout.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0eh4n28" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="update-job-timeout-process" name="Update Job Timeout" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start Update Job Timeout Test">
<bpmn:outgoing>Flow_1kydzz1</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1kydzz1" sourceRef="StartEvent_1" targetRef="Activity_0rpbuc4" />
<bpmn:endEvent id="Event_1hdghel" name="Update Job Timeout Test Completed">
<bpmn:incoming>Flow_0zqeiin</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0zqeiin" sourceRef="Activity_0rpbuc4" targetRef="Event_1hdghel" />
<bpmn:serviceTask id="Activity_0rpbuc4" name="test job">
<bpmn:extensionElements>
<zeebe:taskDefinition type="update-job-timeout" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_1kydzz1</bpmn:incoming>
<bpmn:outgoing>Flow_0zqeiin</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="update-job-timeout-process">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="156" y="142" width="83" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hdghel_di" bpmnElement="Event_1hdghel">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="419" y="142" width="63" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_17t8oqh_di" bpmnElement="Activity_0rpbuc4">
<dc:Bounds x="270" y="77" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1kydzz1_di" bpmnElement="Flow_1kydzz1">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0zqeiin_di" bpmnElement="Flow_0zqeiin">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
54 changes: 54 additions & 0 deletions src/__tests__/zeebe/integration/Client-Update-Job-Timeout.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses'
import { CreateProcessInstanceResponse } from '../../../zeebe/lib/interfaces-grpc-1.0'

process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
jest.setTimeout(60000)

let zbc: ZeebeGrpcClient
let wf: CreateProcessInstanceResponse | undefined

beforeAll(() => suppressZeebeLogging())
afterAll(() => restoreZeebeLogging())

beforeEach(() => {
zbc = new ZeebeGrpcClient()
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: unknown) {
// console.log('Caught NOT FOUND') // @DEBUG
} finally {
await zbc.close() // Makes sure we don't forget to close connection
}
})

test('can update Job Timeout', async () => {
const res = await zbc.deployResource({
processFilename: './src/__tests__/testdata/Client-Update-Job-Timeout.bpmn',
})
await cancelProcesses(res.deployments[0].process.processDefinitionKey)
wf = await zbc.createProcessInstance({
bpmnProcessId: 'update-job-timeout-process',
variables: {},
})

const worker = zbc.createWorker({
taskType: 'update-job-timeout',
taskHandler: async (job) => {
await zbc.updateJobTimeout({
jobKey: job.key,
timeout: 3000,
})
return job.complete().then(async (res) => {
await worker.close()
return res
})
},
})
})
4 changes: 4 additions & 0 deletions src/zeebe/lib/interfaces-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
ThrowErrorRequest,
TopologyResponse,
UpdateJobRetriesRequest,
UpdateJobTimeoutRequest,
} from './interfaces-grpc-1.0'
import { Loglevel, ZBCustomLogger } from './interfaces-published-contract'

Expand Down Expand Up @@ -403,6 +404,9 @@ export interface ZBGrpc extends GrpcClient {
updateJobRetriesSync(
updateJobRetriesRequest: UpdateJobRetriesRequest
): Promise<void>
updateJobTimeoutSync(
updateJobTimeoutRequest: UpdateJobTimeoutRequest
): Promise<void>
deleteResourceSync: (
deleteResourceRequest: DeleteResourceRequest
) => Promise<Record<string, never>>
Expand Down
6 changes: 6 additions & 0 deletions src/zeebe/lib/interfaces-grpc-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,12 @@ export interface UpdateJobRetriesRequest {
retries: number
}

export interface UpdateJobTimeoutRequest {
readonly jobKey: string
/** the duration of the new timeout in ms, starting from the current moment */
timeout: number
}

export interface FailJobRequest {
readonly jobKey: string
retries: number
Expand Down
19 changes: 19 additions & 0 deletions src/zeebe/zb/ZeebeGrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,25 @@ export class ZeebeGrpcClient extends TypedEmitter<
)
}

/**
Updates the deadline of a job using the timeout (in ms) provided. This can be used
for extending or shortening the job deadline.
Errors:
NOT_FOUND:
- no job exists with the given key
INVALID_STATE:
- no deadline exists for the given job key
*/
public updateJobTimeout(
updateJobTimeoutRequest: Grpc.UpdateJobTimeoutRequest
): Promise<void> {
return this.executeOperation('updateJobTimeout', async () =>
(await this.grpc).updateJobTimeoutSync(updateJobTimeoutRequest)
)
}

private constructGrpcClient({
grpcConfig,
logConfig,
Expand Down

0 comments on commit b1cc89c

Please sign in to comment.