Implement your external task workers for Camunda in NodeJS.
Compatible with Camunda
>= 7.8
. Requires NodeJS>= 8.6
.
This library exposes a simple API to implement external task workers for Camunda.
var Worker = require('camunda-worker-node');
var Backoff = require('camunda-worker-node/lib/backoff');
var engineEndpoint = 'http://localhost:8080/engine-rest';
var worker = Worker(engineEndpoint, {
workerId: 'some-worker-id',
use: [
Backoff
]
});
// a work subscription may access and modify process variables
worker.subscribe('work:A', [ 'numberVar' ], async function(context) {
var newNumber = context.variables.numberVar + 1;
// fail with an error if things go awry
if (ooops) {
throw new Error('no work done');
}
// complete with update variables
return {
variables: {
numberVar: newNumber
}
};
});
// stop the worker instance with the application
worker.stop();
Make sure you properly configured the external tasks in your BPMN 2.0 diagram:
<bpmn:serviceTask
id="Task_A"
camunda:type="external"
camunda:topicName="work:A" />
- Subscribe to work node-style or via
async
functions - Complete tasks with updated variables or fail with errors
- Trigger BPMN errors
- Configure and extend task locks
- Configure logging and authentication
- Configure task fetching
- Control the worker life-cycle
- Extend via plug-ins
- Customize Variable Serialization
Implement your workers via async
, promise returning functions or pass results via node-style callbacks.
Use the provided callback to pass task execution errors and data, node-style:
// report work results via a node-style callback
workes.subscribe('work:B', function(context, callback) {
var newNumber = context.variables.numberVar + 1;
// indicate an error
callback(
new Error('no work done');
);
// or return actual result
callback(null, {
variables: {
numberVar: newNumber
}
});
});
ES6 style async/await to implement work is fully supported:
// implement work via a Promise returning async function
worker.subscribe('work:B', async function(context) {
// await async increment
var newNumber = await increment(context.variables.numberVar);
// indicate an error
throw new Error('no work done');
// or return actual result
return {
variables: {
numberVar: newNumber
}
};
});
You may indicate BPMN errors to trigger business defined exception handling:
worker.subscribe('work:B', async function(context) {
// trigger business aka BPMN errors
return {
errorCode: 'some-bpmn-error'
};
});
You may configure the initial lock time (defaults to 10 seconds) on worker registration.
At the same time you may use the method extendLock
, provided via the task context,
to increase the lock time while the worker is busy.
// configure three seconds as initial lock time
worker.subscribe('work:B', {
lockDuration: 3000,
variables: [ 'a' ]
}, async function(context) {
var extendLock = context.extendLock;
// extend the lock for another five seconds
await extendLock(5000);
// complete task
return {};
});
Read more about external task locking in the Camunda Documentation.
We provide middlewares for basic auth as well as token based authentication.
Provide your client credentials via the BasicAuth
middleware:
var worker = Worker(engineEndpoint, {
use: [
BasicAuth('walt', 'SECRET_PASSWORD')
]
};
Provide your tokens via the Auth
middleware:
var worker = Worker(engineEndpoint, {
use: [
Auth('Bearer', 'BEARER_TOKEN')
]
};
To support custom authentication options add additional request headers to authenticate your task worker via the requestOptions
configuration:
var worker = Worker(engineEndpoint, {
requestOptions: {
headers: {
Hello: 'Authenticated?'
}
}
})
We employ debug for logging.
Use the Logger
extension in combination with DEBUG=*
to capture a full trace of what's going on under the hood:
DEBUG=* node start-workers.js
Task fetching is controlled by two configuration properties:
maxTasks
- maximum number of tasks to be fetched and locked with a single pollpollingInterval
- interval in milliseconds between polls
You may configure both properties on worker creation and at run-time:
var worker = Worker(engineEndpoint, {
maxTasks: 2,
pollingInterval: 1500
});
// dynamically increase max tasks
worker.configure({
maxTasks: 5
});
This way you can configure the task fetching behavior both statically and dynamically.
Roll your own middleware to dynamically configure the worker instance or let
the Backoff
middleware take care of it.
As an alternative to configuring these values you may stop and re-start the Worker instance as needed, too.
Per default the worker instance will start to poll for work immediately.
Configure this behavior via the autoPoll
option and start, stop and re-start
the instance programatically if you need to:
var worker = Worker(engineEndpoint, {
autoPoll: false
});
// manually start polling
worker.start();
// stop later on
await worker.stop();
// re-start at some point in time
worker.start();
A worker may be extended via the use
config parameter.
Worker(engineEndpoint, {
use: [
Logger,
Backoff,
Metrics,
BasicAuth('Walt', 'SECRET_PASSWORD')
]
});
Logger
- adds verbose logging of what is going onBackoff
- dynamically adjust poll times based on Camunda REST api availability, fetched tasks and poll processing timesMetrics
- collect and periodically log utilization metricsBasicAuth
- authorize against REST api with username + passwordAuth
- authorize against REST api with arbitrary tokens
Variables, when being read, modified and passed back on taks completion will preserve
their serialized form (indicated via valueInfo
as documented in the Camunda REST API documentation). Newly added variables will be serialized without valueInfo
using the types String
, Date
, Boolean
, Json
or Double
as appropriate.
You may wrap variables with SerializedVariable
if you would like to take full control over variable serialization:
var Serialized = require('camunda-worker-node/lib/serialized-variable');
worker.subscribe('shop:create-customer', async function(context) {
return {
variables: {
// wrap user to indicate it is already serialized
customer: Serialized({
type: 'Object',
value: JSON.stringify({
name: 'Hugo'
}),
valueInfo: {
serializationDataFormat: 'application/json',
objectTypeName: 'my.example.Customer'
}
})
}
};
});
It is possible to dynamically unregister a work subscription any time.
var subscription = worker.subscribe('someTopic', async function(context) {
// do work
console.log('doing work!');
});
// later
subscription.remove();
npm i --save camunda-worker-node
Install dependencies:
npm install
Lint and run all tests:
DEBUG=worker* npm run all
Note: You need a Camunda BPM REST API exposed on localhost:8080/engine-rest
for the tests to pass. An easy way to get it up running is via Docker.
MIT