Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #317 from camunda-community-hub:fail-job-retries
Browse files Browse the repository at this point in the history
fixes #316
  • Loading branch information
jwulf committed May 31, 2023
2 parents 3ce0789 + 3235b65 commit 51c66ef
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 12 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# Version 8.2.3

## Fixes

_Things that were broken and are now fixed._

- The object signature for `job.fail()` did not correctly apply an explicit value for `retries`. As a result, job retries would decrement automatically if this signature and option were used. The value is now correctly parsed and applied, and job retry count can be explicitly set in the `job.fail()` command with the object signature. Thanks to [@patozgg](https://github.com/patozgg) for reporting this. See [#316](https://github.com/camunda-community-hub/zeebe-client-node-js/issues/316) for more details.

# Version 8.2.2

## Chores
Expand Down
110 changes: 110 additions & 0 deletions src/__tests__/integration/Worker-Failure-Retries.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { ZBClient } from '../..'
import { CreateProcessInstanceResponse } from '../../lib/interfaces-grpc-1.0'

// const trace = <T>(res: T) => {
// // tslint:disable-next-line: no-console
// console.log(res)
// return res
// }
process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
jest.setTimeout(60000)

let zbc: ZBClient
let wf: CreateProcessInstanceResponse | undefined

beforeEach(() => {
zbc = new ZBClient()
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: any) {
// console.log('Caught NOT FOUND') // @DEBUG
} finally {
await zbc.close() // Makes sure we don't forget to close connection
}
})

test('Decrements the retries count by default', () =>
new Promise(async done => {
await zbc.deployProcess('./src/__tests__/testdata/Worker-Failure-Retries.bpmn')
wf = await zbc.createProcessInstance('worker-failure-retries', {
conditionVariable: true,
})
let called = false

const worker = zbc.createWorker({
taskType: 'service-task-worker-failure-retries',
taskHandler: job => {
if (!called) {
expect(job.retries).toBe(100)
called = true
return job.fail('Some reason')
}
expect(job.retries).toBe(99)
done(null)
return job.complete().then(async res => {
await worker.close()
return res
})
}
})
})
)

test('Set the retries to a specific number when provided with one via simple signature', () =>
new Promise(async done => {
await zbc.deployProcess('./src/__tests__/testdata/Worker-Failure-Retries.bpmn')
wf = await zbc.createProcessInstance('worker-failure-retries', {
conditionVariable: true,
})
let called = false

const worker = zbc.createWorker({
taskType: 'service-task-worker-failure-retries',
taskHandler: job => {
if (!called) {
expect(job.retries).toBe(100)
called = true
return job.fail('Some reason', 101)
}
expect(job.retries).toBe(101)
done(null)
return job.complete().then(async res => {
await worker.close()
return res
})
}
})
})
)

test('Set the retries to a specific number when provided with one via object signature', () =>
new Promise(async done => {
await zbc.deployProcess('./src/__tests__/testdata/Worker-Failure-Retries.bpmn')
wf = await zbc.createProcessInstance('worker-failure-retries', {
conditionVariable: true,
})
let called = false

const worker = zbc.createWorker({
taskType: 'service-task-worker-failure-retries',
taskHandler: job => {
if (!called) {
expect(job.retries).toBe(100)
called = true
return job.fail({ errorMessage: 'Some reason', retries: 101})
}
expect(job.retries).toBe(101)
done(null)
return job.complete().then(async res => {
await worker.close()
return res
})
}
})
})
)
8 changes: 8 additions & 0 deletions src/__tests__/integration/Worker-Failure.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,11 @@ test('Fails a process when the handler throws and options.failProcessOnException
}, 1500)
}
}))

test('Decrements the retries count by default', () => {

})

test('Set the retries to a specific number when provided with one', () => {

})
55 changes: 55 additions & 0 deletions src/__tests__/testdata/Worker-Failure-Retries.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" id="Definitions_1vwghmj" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.10.0">
<bpmn:process id="worker-failure-retries" name="Worker Failure Retries" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start&#10;&#10;">
<bpmn:outgoing>SequenceFlow_0fp53hs</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:serviceTask id="ServiceTask_0g6tf5f" name="Say Hello World">
<bpmn:extensionElements>
<zeebe:taskDefinition type="service-task-worker-failure-retries" retries="100" />
<zeebe:taskHeaders>
<zeebe:header key="message" value="Hello World" />
</zeebe:taskHeaders>
</bpmn:extensionElements>
<bpmn:incoming>SequenceFlow_0fp53hs</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_112zghv</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:sequenceFlow id="SequenceFlow_0fp53hs" sourceRef="StartEvent_1" targetRef="ServiceTask_0g6tf5f" />
<bpmn:endEvent id="EndEvent_16r84dr" name="End">
<bpmn:incoming>SequenceFlow_112zghv</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="SequenceFlow_112zghv" sourceRef="ServiceTask_0g6tf5f" targetRef="EndEvent_16r84dr" />
</bpmn:process>
<bpmn:message id="Message_0remzna" name="Waiting For A Train That Never Arrives">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=none" />
</bpmn:extensionElements>
</bpmn:message>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="worker-failure-retries">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="173" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="179" y="145" width="24" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="ServiceTask_0g6tf5f_di" bpmnElement="ServiceTask_0g6tf5f">
<dc:Bounds x="302" y="80" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="EndEvent_16r84dr_di" bpmnElement="EndEvent_16r84dr">
<dc:Bounds x="482" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="490" y="145" width="20" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="SequenceFlow_0fp53hs_di" bpmnElement="SequenceFlow_0fp53hs">
<di:waypoint x="209" y="120" />
<di:waypoint x="302" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="SequenceFlow_112zghv_di" bpmnElement="SequenceFlow_112zghv">
<di:waypoint x="402" y="120" />
<di:waypoint x="482" y="120" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
25 changes: 13 additions & 12 deletions src/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ export class ZBWorkerBase<
thisJob: ZB.Job
): ZB.JobCompletionInterface<T> & ZB.JobCompletionInterface<T> {
let methodCalled: string | undefined

/**
* This is a wrapper that allows us to throw an error if a job acknowledgement function is called more than once,
* for these functions should be called once only (and only one should be called, but we don't handle that case).
* */
const errorMsgOnPriorMessageCall = (
thisMethod: string,
wrappedFunction: any
Expand All @@ -298,26 +303,22 @@ You should call only one job action method in the worker handler. This is a bug
return wrappedFunction(...args)
}
}

const cancelWorkflow = (job: ZB.Job) => () =>
this.zbClient
.cancelProcessInstance(job.processInstanceKey)
.then(() => ZB.JOB_ACTION_ACKNOWLEDGEMENT)

const failJob = (job: ZB.Job) => (
errorMessageOrFailureConfig: string | ZB.JobFailureConfiguration,
conf: string | ZB.JobFailureConfiguration,
retries?: number
) => {
const errorMessage =
typeof errorMessageOrFailureConfig === 'string'
? errorMessageOrFailureConfig
: (errorMessageOrFailureConfig as ZB.JobFailureConfiguration)
.errorMessage
const retryBackOff =
typeof errorMessageOrFailureConfig === 'string'
? 0
: (errorMessageOrFailureConfig as ZB.JobFailureConfiguration)
.retryBackOff ?? 0
return this.failJob({ job, errorMessage, retries, retryBackOff })
const isFailureConfig = (_conf: string | ZB.JobFailureConfiguration): _conf is ZB.JobFailureConfiguration =>
typeof _conf === 'object'
const errorMessage = isFailureConfig(conf) ? conf.errorMessage : conf
const retryBackOff = isFailureConfig(conf) ? conf.retryBackOff ?? 0 : 0
const _retries = isFailureConfig(conf) ? conf.retries ?? 0 : retries
return this.failJob({ job, errorMessage, retries: _retries, retryBackOff })
}

const succeedJob = (job: ZB.Job) => (completedVariables?: T) =>
Expand Down

0 comments on commit 51c66ef

Please sign in to comment.