Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
Fixes #213
Browse files Browse the repository at this point in the history
  • Loading branch information
jwulf committed Mar 9, 2021
1 parent 801f69b commit 0f1a525
Showing 1 changed file with 41 additions and 17 deletions.
58 changes: 41 additions & 17 deletions src/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { ClientReadableStreamImpl } from '@grpc/grpc-js/build/src/call'
import { Chalk } from 'chalk'
import chalk, { Chalk } from 'chalk'
import * as _debug from 'debug'
import { EventEmitter } from 'events'
import { Duration, MaybeTimeDuration } from 'typed-duration'
import * as uuid from 'uuid'
import { parseVariablesAndCustomHeadersToJSON } from '../lib'
import * as ZB from '../lib/interfaces-1.0'
import { JOB_ACTION_ACKNOWLEDGEMENT } from '../lib/interfaces-1.0'
import { StatefulLogInterceptor } from '../lib/StatefulLogInterceptor'
import { ConnectionStatusEvent, ZBClient } from '../zb/ZBClient'
import {
Expand Down Expand Up @@ -275,10 +274,26 @@ export class ZBWorkerBase<
protected makeCompleteHandlers<T>(
thisJob: ZB.Job
): ZB.CompleteFn<T> & ZB.JobCompletionInterface<T> {
let methodCalled: string | undefined
const errorMsgOnPriorMessageCall = (
thisMethod: string,
wrappedFunction: any
) => {
if (methodCalled !== undefined) {
// tslint:disable-next-line: no-console
console.log(
chalk.red(`WARNING: Call to ${thisMethod}() after ${methodCalled}() was called.
You should call only one job action method in a worker handler code branch. This is a bug in your worker handler.`)
)
return wrappedFunction
}
methodCalled = thisMethod
return wrappedFunction
}
const cancelWorkflow = (job: ZB.Job) => () =>
this.zbClient
.cancelProcessInstance(job.processInstanceKey)
.then(() => JOB_ACTION_ACKNOWLEDGEMENT)
.then(() => ZB.JOB_ACTION_ACKNOWLEDGEMENT)
const failJob = (job: ZB.Job) => (
errorMessage: string,
retries?: number
Expand All @@ -298,19 +313,28 @@ export class ZBWorkerBase<
})
return {
cancelWorkflow: cancelWorkflow(thisJob),
error: errorJob(thisJob),
failure: failJob(thisJob),
fail: failJob(thisJob),
forwarded: () => {
complete: errorMsgOnPriorMessageCall(
'job.complete',
succeedJob(thisJob)
),
error: errorMsgOnPriorMessageCall('error', errorJob(thisJob)),
fail: errorMsgOnPriorMessageCall('job.fail', failJob(thisJob)),
failure: errorMsgOnPriorMessageCall(
'complete.failure',
failJob(thisJob)
),
forward: errorMsgOnPriorMessageCall('job.forward', () => {
this.drainOne()
return JOB_ACTION_ACKNOWLEDGEMENT
},
forward: () => {
return ZB.JOB_ACTION_ACKNOWLEDGEMENT
}),
forwarded: errorMsgOnPriorMessageCall('complete.forwarded', () => {
this.drainOne()
return JOB_ACTION_ACKNOWLEDGEMENT
},
complete: succeedJob(thisJob),
success: succeedJob(thisJob),
return ZB.JOB_ACTION_ACKNOWLEDGEMENT
}),
success: errorMsgOnPriorMessageCall(
'complete.success',
succeedJob(thisJob)
),
}
}

Expand All @@ -329,7 +353,7 @@ export class ZBWorkerBase<
jobKey: job.key,
retries: retries ?? job.retries - 1,
})
.then(() => JOB_ACTION_ACKNOWLEDGEMENT)
.then(() => ZB.JOB_ACTION_ACKNOWLEDGEMENT)
.finally(() => {
this.logger.logDebug(`Failed job ${job.key} - ${errorMessage}`)
this.drainOne()
Expand All @@ -354,7 +378,7 @@ export class ZBWorkerBase<
)
return e
})
.then(() => JOB_ACTION_ACKNOWLEDGEMENT)
.then(() => ZB.JOB_ACTION_ACKNOWLEDGEMENT)
.finally(() => {
this.drainOne()
})
Expand Down Expand Up @@ -386,7 +410,7 @@ export class ZBWorkerBase<
})
.then(() => {
this.drainOne()
return JOB_ACTION_ACKNOWLEDGEMENT
return ZB.JOB_ACTION_ACKNOWLEDGEMENT
})
}

Expand Down

0 comments on commit 0f1a525

Please sign in to comment.