Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #313 from camunda-community-hub:8.2.1
Browse files Browse the repository at this point in the history
8.2.1
  • Loading branch information
jwulf committed May 2, 2023
2 parents bf220b2 + f16efaa commit 15c6e81
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
services:
zeebe:
image: camunda/zeebe:8.1.6
image: camunda/zeebe:8.2.3
ports:
- 26500:26500
steps:
Expand Down
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# Version 8.2.1

## New Features

_New shiny stuff._
- Add `ZBClient.broadcastSignal`, enabling the client to broadcast a signal. See [#312](https://github.com/camunda-community-hub/zeebe-client-node-js/issues/312) for more details.

## Fixes

_Things that were broken and are now fixed._

- Previously, the `timeToLive` property of `ZBClient.publishMessage` was required, although it was documented as optional. In this release, both `timeToLive` and `variables` have been made optional. If no value is supplied for `timeToLive`, it defaults to 0. Thanks to [@nhomble]() for raising this issue. See [#311](https://github.com/camunda-community-hub/zeebe-client-node-js/issues/311) for more details.

# Version 8.2.0

## New Features
Expand Down
56 changes: 56 additions & 0 deletions proto/zeebe.proto
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,30 @@ message ModifyProcessInstanceResponse {

}

message DeleteResourceRequest {
// The key of the resource that should be deleted. This can either be the key
// of a process definition, or the key of a decision requirements definition.
int64 resourceKey = 1;
}

message DeleteResourceResponse {

}

message BroadcastSignalRequest {
// The name of the signal
string signalName = 1;

// the signal variables as a JSON document; to be valid, the root of the document must be an
// object, e.g. { "a": "foo" }. [ "foo" ] would not be valid.
string variables = 2;
}

message BroadcastSignalResponse {
// the unique ID of the signal that was broadcasted.
int64 key = 1;
}

service Gateway {
/*
Iterates through all known partitions round-robin and activates up to the requested
Expand Down Expand Up @@ -770,4 +794,36 @@ service Gateway {
rpc ModifyProcessInstance (ModifyProcessInstanceRequest) returns (ModifyProcessInstanceResponse) {

}

/*
Deletes a resource from the state. Once a resource has been deleted it cannot
be recovered. If the resource needs to be available again, a new deployment
of the resource is required.
Deleting a process will cancel any running instances of this process
definition. New instances of a deleted process are created using
the lastest version that hasn't been deleted. Creating a new
process instance is impossible when all versions have been
deleted.
Deleting a decision requirement definitions could cause incidents in process
instances referencing these decisions in a business rule task. A decision
will be evaluated with the latest version that hasn't been deleted. If all
versions of a decision have been deleted the evaluation is rejected.
Errors:
NOT_FOUND:
- No resource exists with the given key
*/
rpc DeleteResource (DeleteResourceRequest) returns (DeleteResourceResponse) {

}

/*
Broadcasts a signal.
*/
rpc BroadcastSignal (BroadcastSignalRequest) returns (BroadcastSignalResponse) {

}
}
50 changes: 50 additions & 0 deletions src/__tests__/integration/Client-BroadcastSignal.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { ZBClient } from '../..'
// import { createUniqueTaskType } from '../../lib/createUniqueTaskType'
import { CreateProcessInstanceResponse } from '../../lib/interfaces-grpc-1.0'

process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
jest.setTimeout(60000)


let zbc: ZBClient
let wf: CreateProcessInstanceResponse | undefined

beforeEach(() => {
zbc = new ZBClient()
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: any) {
// console.log('Caught NOT FOUND') // @DEBUG
} finally {
await zbc.close() // Makes sure we don't forget to close connection
}
})

test('Can start a process with a signal', () => new Promise(async resolve => {
zbc.createWorker({
taskType: 'signal-service-task',
taskHandler: job => {
const ack = job.complete()
expect (job.variables.success).toBe(true)
resolve(null)
return ack
}
})
await zbc.deployResource({
processFilename: `./src/__tests__/testdata/Signal.bpmn`
})
const res = await zbc.broadcastSignal({
signalName: 'test-signal',
variables: {
success: true
}
})
expect(res.key).toBeTruthy()
})
)

54 changes: 54 additions & 0 deletions src/__tests__/integration/Client-PublishMessage.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { v4 as uuid } from 'uuid'
import { ZBClient } from '../..'
import { createUniqueTaskType } from '../../lib/createUniqueTaskType'

process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
jest.setTimeout(45000)
let zbc: ZBClient

beforeEach(async () => {
zbc = new ZBClient()
})

afterEach(async () => {
await zbc.close()
})

test('Can publish a message', () =>
new Promise(async done => {
const { bpmn, taskTypes, processId, messages } = createUniqueTaskType({
bpmnFilePath: './src/__tests__/testdata/Client-MessageStart.bpmn',
messages: ['MSG-START_JOB'],
taskTypes: ['console-log-msg-start'],
})

const deploy = await zbc.deployProcess({
definition: bpmn,
name: `Client-MessageStart-${processId}.bpmn`,
})
expect(deploy.key).toBeTruthy()

const randomId = uuid()

// Wait 1 second to make sure the deployment is complete
await new Promise(res => setTimeout(() => res(null), 1000))

await zbc.publishMessage({
name: messages['MSG-START_JOB'],
variables: {
testKey: randomId,
},
correlationKey: 'something'
})

zbc.createWorker({
taskType: taskTypes['console-log-msg-start'],
taskHandler: async job => {
const res = await job.complete()
expect(job.variables.testKey).toBe(randomId) // Makes sure the worker isn't responding to another message
done(null)
return res
},
loglevel: 'NONE',
})
}))
50 changes: 50 additions & 0 deletions src/__tests__/testdata/Signal.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0ifk6nh" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.10.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.2.0">
<bpmn:process id="Signal" name="Signal" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start">
<bpmn:outgoing>Flow_0dwell7</bpmn:outgoing>
<bpmn:signalEventDefinition id="SignalEventDefinition_01pvmb3" signalRef="Signal_3c1lk35" />
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0dwell7" sourceRef="StartEvent_1" targetRef="Activity_0iq5pp7" />
<bpmn:endEvent id="Event_0jluwfy" name="End">
<bpmn:incoming>Flow_16lx2ly</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_16lx2ly" sourceRef="Activity_0iq5pp7" targetRef="Event_0jluwfy" />
<bpmn:serviceTask id="Activity_0iq5pp7" name="Signal Service Task">
<bpmn:extensionElements>
<zeebe:taskDefinition type="signal-service-task" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0dwell7</bpmn:incoming>
<bpmn:outgoing>Flow_16lx2ly</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmn:signal id="Signal_3c1lk35" name="test-signal" />
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Signal">
<bpmndi:BPMNShape id="Event_164ft84_di" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="185" y="142" width="24" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0jluwfy_di" bpmnElement="Event_0jluwfy">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="440" y="142" width="20" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1vptiif_di" bpmnElement="Activity_0iq5pp7">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0dwell7_di" bpmnElement="Flow_0dwell7">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_16lx2ly_di" bpmnElement="Flow_16lx2ly">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
7 changes: 5 additions & 2 deletions src/lib/GrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ export class GrpcClient extends EventEmitter {
if (this.closing) {
// tslint:disable-next-line: no-console
console.log('Short-circuited on channel closed') // @DEBUG

return
}
let stream
Expand All @@ -290,6 +289,7 @@ export class GrpcClient extends EventEmitter {
)
this.setReady()
} catch (error: any) {
debug(`${methodName}Stream error: ${error.code}`, error.message)
this.emit(MiddlewareSignals.Log.Error, error.message)
this.emit(MiddlewareSignals.Event.Error)
this.setNotReady()
Expand Down Expand Up @@ -324,7 +324,7 @@ export class GrpcClient extends EventEmitter {
*/
stream.on('error', (error: GrpcStreamError) => {
clearTimeout(clientSideTimeout)
debug(`Error`, error)
debug(`${methodName}Stream error emitted by stream`, error)
this.emit(MiddlewareSignals.Event.Error)
if (error.message.includes('14 UNAVAILABLE')) {
this.emit(
Expand Down Expand Up @@ -363,6 +363,7 @@ export class GrpcClient extends EventEmitter {
debug(`Calling ${methodName}Sync...`)

if (this.closing) {
debug(`Aborting ${methodName}Sync due to client closing.`)
return
}
const timeNormalisedRequest = replaceTimeValuesWithMillisecondNumber(
Expand All @@ -378,6 +379,7 @@ export class GrpcClient extends EventEmitter {
(err, dat) => {
// This will error on network or business errors
if (err) {
debug(`${methodName}Sync error: ${err.code}`)
const isNetworkError =
err.code === GrpcError.UNAVAILABLE
if (isNetworkError) {
Expand All @@ -389,6 +391,7 @@ export class GrpcClient extends EventEmitter {
}
this.emit(MiddlewareSignals.Event.Ready)
this.setReady()
debug(`${methodName}Sync completed`)
resolve(dat)
}
)
Expand Down
8 changes: 8 additions & 0 deletions src/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ You should call only one job action method in the worker handler. This is a bug
workerIsClosing ||
insufficientCapacityAvailable
) {
debug('Worker polling blocked', {
pollAlreadyInProgress,
workerIsClosing,
insufficientCapacityAvailable
})
return
}

Expand Down Expand Up @@ -502,9 +507,11 @@ You should call only one job action method in the worker handler. This is a bug

private async activateJobs(id: string) {
if (this.stalled) {
debug('Stalled')
return { stalled: true }
}
if (this.closing) {
debug('Closing')
return {
closing: true,
}
Expand Down Expand Up @@ -553,6 +560,7 @@ You should call only one job action method in the worker handler. This is a bug
}

if (stream.error) {
debug(`Stream error`, stream.error)
return { error: stream.error }
}

Expand Down
18 changes: 18 additions & 0 deletions src/lib/interfaces-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import {
ModifyProcessInstanceRequest,
ModifyProcessInstanceResponse,
ProcessInstanceCreationStartInstruction,
BroadcastSignalResponse,
BroadcastSignalRequest,
} from './interfaces-grpc-1.0'
import { Loglevel, ZBCustomLogger } from './interfaces-published-contract'

Expand Down Expand Up @@ -405,6 +407,21 @@ export interface ZBWorkerConfig<
*/
jobBatchMinSize?: number
}

export interface BroadcastSignalReq {
// The name of the signal
signalName: string;

// the signal variables as a JSON document; to be valid, the root of the document must be an
// object, e.g. { "a": "foo" }. [ "foo" ] would not be valid.
variables?: JSONDoc;
}

export interface BroadcastSignalRes {
// the unique ID of the signal that was broadcasted.
key: string
}

export interface ZBGrpc extends GrpcClient {
completeJobSync: any
activateJobsStream: any
Expand Down Expand Up @@ -440,4 +457,5 @@ export interface ZBGrpc extends GrpcClient {
resolveIncidentSync(
resolveIncidentRequest: ResolveIncidentRequest
): Promise<void>
broadcastSignalSync(signal: BroadcastSignalRequest): Promise<BroadcastSignalResponse>
}
Loading

0 comments on commit 15c6e81

Please sign in to comment.