/
BatchQueueWorker.js
134 lines (109 loc) · 4.24 KB
/
BatchQueueWorker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"use strict";
const Async = require('async');
const QueueWorker = require('./QueueWorker');
/**
* Worker for processing batches of messages at a time, ideal for things that benefit from bulk loading (e.g. elasticsearch!)
*/
class BatchQueueWorker extends QueueWorker {
constructor(app, options) {
if (!options) {
throw new Error('BatchQueueWorker: options are required');
}
const batchSize = options.batchSize || 5;
const skipInit = options.skipInit || false;
// Set the base QueueWorker options based on the given batch size
options.queueSubscriptionOptions = options.queueSubscriptionOptions || {};
options.queueSubscriptionOptions.prefetch = batchSize * 2;
options.queueSubscriptionOptions.retry = options.queueSubscriptionOptions.retry || { delay: 1000 };
// Don't initialize until we're all setup
options.skipInit = true;
// Initialize underlying QueueWorker
super(app, options);
/**
* Message batch size - How many messages we'll be sent at a time
* @type {number}
*/
this.batchSize = batchSize;
// Get the aggregator setup
this._setupCargo();
// Start accepting messages
if (!skipInit) {
this.init();
}
}
/* istanbul ignore next: must be implemented or this does nothing */
//noinspection JSMethodCanBeStatic
/**
* This is the batch handler you need to override!
*
* @param {[*]} messages – Array of message objects, NOT payloads, that would be messages[0].message
* @param {function(err:*=null, recovery:*=null)} defaultAckOrNack - Fire when done processing the batch
*/
handleMessageBatch(messages, defaultAckOrNack) {
// YOU MUST IMPLEMENT THIS TO BE USEFUL
// individually ack or nack a message in the batch
// messages[i].ackOrNack(); // individually ack or nack a message in the batch
// ack or nack the unhandled messages in the batch
// defaultAckOrNack(); // ack all
// defaultAckOrNack(err); // err all w/ default strategy
// defaultAckOrNack(err, recovery); // err all w/ specific strategy
defaultAckOrNack(true, this.nack.drop); // err all w/ drop strategy
}
/**
* Initialize the cargo data structure
* @private
*/
_setupCargo() {
this._cargo = Async.cargo(this._processCargoBatch.bind(this), this.batchSize);
}
/**
* Internal handler for shipping a batch of messages to the application
* @param {[CargoMessage]} messages
* @param callback
* @private
*/
_processCargoBatch(messages, callback) {
// Pass the batch to the handler
this.handleMessageBatch(messages, (err, recovery) => {
// Ack/Nack any unhandled message
messages.forEach((message) => {
if (!message._handled) {
message.ackOrNack(err, recovery);
}
});
callback();
});
}
/**
* Override the default message handler system. Routes messages into async cargo.
*
* @param message - Message object
* @param content – Message body
* @param ackOrNack – Callback to ack or nack the message
*/
onMessage(message, content, ackOrNack) {
/**
* Wrapped Cargo Message
* @typedef {{message: *, content: *, ackOrNack: function(err:*,recovery:*), _handled: boolean}} CargoMessage
*/
const payload = {
message,
content,
// wrapper around given ackOrNack, if message individually handled, flag it
ackOrNack: (...params) => {
payload._handled = true;
ackOrNack.apply(null, params);
},
_handled: false
};
// Queue this into the current batch
this._cargo.push(payload);
}
/**
* Do not use this method on this QueueWorker
*/
handleMessage(message, content, ackOrNack) { /* eslint-disable-line no-unused-vars */
throw new Error('BatchQueueWorker: This method does not apply to this worker. Do not use it.');
}
}
module.exports = BatchQueueWorker;