/
QueueService.js
133 lines (112 loc) · 3.93 KB
/
QueueService.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"use strict";
const Rascal = require('rascal');
/**
* Okanjo Message Queue Service
* @class
*/
class QueueService {
/**
* Queue management service
* @param {OkanjoApp} app
* @param {*} [config] service config
* @constructor
*/
constructor(app, config) {
this.app = app;
this.config = config;
if (!this.config || !this.config.rascal) {
throw new Error('okanjo-app-queue: No rascal configuration set for QueueService!');
}
this.broker = null; // set on connect
// Register the connection with the app
app._serviceConnectors.push(async () => {
await this.connect();
});
}
/**
* Connects to RabbitMQ and binds the necessary exchanges and queues
*/
async connect() {
try {
this.broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(this.config.rascal));
this.broker.on('error', this._handleBrokerError.bind(this));
} catch(err) {
this.app.report('okanjo-app-queue: Failed to create Rascal broker:', err);
throw err;
}
}
/**
* Publishes a message to the given queue
* @param {string} queue - The queue name to publish to
* @param {*} data - The message data to queue
* @param [options] - The message data to queue
* @param [callback] - Fired when done
* @returns {Promise}
*/
publishMessage(queue, data, options, callback) {
// Overload - e.g. publishMessage(queue, data, callback);
if (arguments.length === 3 && typeof options === "function") {
callback = options;
options = {};
}
return new Promise(async (resolve, reject) => {
let pub;
try {
pub = await this.broker.publish(queue, data, options);
} catch (err) {
this.app.report('okanjo-app-queue: Failed to publish queue message', err, { queue, data, options });
if (callback) return callback(err);
return reject(err);
}
if (callback) return callback(null, pub);
return resolve(pub);
});
}
_handleBrokerError(err) {
this.app.report('okanjo-app-queue: Rascal Broker error', err);
}
}
/**
* Expose the QueueWorker helper
* @type {QueueWorker}
*/
QueueService.QueueWorker = require('./QueueWorker');
/**
* Expose the BatchQueueWorker helper
* @type {BatchQueueWorker}
*/
QueueService.BatchQueueWorker = require('./BatchQueueWorker');
/**
* Helper to generate a vhost config for Rascal based on old queue-name only setup
* @param queueNames – Array of string queue names
* @param [config] – Optional vhost config to append to
* @param {{exchangeDefaults:*, queueDefaults:*, bindingDefaults:*}} [options] – Additional options for exchanges, queues, and bindings
* @returns {*|{exchanges: Array, queues: {}, bindings: {}, subscriptions: {}, publications: {}}}
*/
QueueService.generateConfigFromQueueNames = (queueNames, config, options={}) => {
config = config || {};
config.exchanges = config.exchanges || {};
config.queues = config.queues || {};
config.bindings = config.bindings || {};
config.subscriptions = config.subscriptions || {};
config.publications = config.publications || {};
queueNames.forEach((name) => {
config.exchanges[name] = {
...options.exchangeDefaults
};
config.queues[name] = {
...options.queueDefaults
};
config.bindings[name] = {
bindingKey: "", // typically defaults to #, does this matter?
destinationType: "queue",
...options.bindingDefaults,
source: name,
destination: name,
};
config.subscriptions[name] = { queue: name };
config.publications[name] = { exchange: name };
});
return config;
};
module.exports = QueueService;