Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace custom deferred class with p-defer #21

Merged
merged 1 commit into from May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -29,6 +29,7 @@
"homepage": "https://github.com/bringg/node-arnavmq#readme",
"dependencies": {
"amqplib": "^0.7.0",
"p-defer": "^3.0.0",
"serialize-error": "^8.0.1",
"uuid": "^8.3.1"
},
Expand Down
10 changes: 0 additions & 10 deletions src/classes/deferred.js

This file was deleted.

98 changes: 60 additions & 38 deletions src/modules/producer.js
@@ -1,10 +1,10 @@
const uuid = require('uuid');
const pDefer = require('p-defer');
const utils = require('./utils');
const parsers = require('./message-parsers');
const Deferred = require('../classes/deferred');

const ERRORS = {
TIMEOUT: 'Timeout reached'
TIMEOUT: 'Timeout reached',
};

const loggerAlias = 'arnav_mq:producer';
Expand Down Expand Up @@ -49,14 +49,21 @@ class Producer {
const responsePromise = rpcQueue[correlationId];

if (responsePromise === undefined) {
this._connection.config.transport.error(loggerAlias,
new Error(`Receiving RPC message from previous session: callback no more in memory. ${queue}`));
this._connection.config.transport.error(
loggerAlias,
new Error(
`Receiving RPC message from previous session: callback no more in memory. ${queue}`
)
);

return;
}

// if we found one, we execute the callback and delete it because it will never be received again anyway
this._connection.config.transport.info(loggerAlias, `[${queue}] < answer`);
this._connection.config.transport.info(
loggerAlias,
`[${queue}] < answer`
);

try {
responsePromise.resolve(parsers.in(msg));
Expand All @@ -83,25 +90,32 @@ class Producer {
// ie. if hostname is gateway-http and queue is service-oauth, response queue will be service-oauth:gateway-http:res
// it is important to have different hostname or no hostname on each module sending message or there will be conflicts
const resQueue = `${queue}:${this._connection.config.hostname}:${process.pid}:res`;
rpcQueue.queue = this._connection.get().then((channel) => channel.assertQueue(resQueue, {
durable: true,
exclusive: true
})
.then((q) => {
rpcQueue.queue = q.queue;

// if channel is closed, we want to make sure we cleanup the queue so future calls will recreate it
this._connection.addListener('close', () => {
delete rpcQueue.queue;
this.createRpcQueue(queue);
});

return channel.consume(q.queue, this.maybeAnswer(queue), { noAck: true });
})
.then(() => rpcQueue.queue))
rpcQueue.queue = this._connection
.get()
.then((channel) => channel
.assertQueue(resQueue, {
durable: true,
exclusive: true,
})
.then((q) => {
rpcQueue.queue = q.queue;

// if channel is closed, we want to make sure we cleanup the queue so future calls will recreate it
this._connection.addListener('close', () => {
delete rpcQueue.queue;
this.createRpcQueue(queue);
});

return channel.consume(q.queue, this.maybeAnswer(queue), {
noAck: true,
});
})
.then(() => rpcQueue.queue))
.catch(() => {
delete rpcQueue.queue;
return utils.timeoutPromise(this._connection.config.timeout).then(() => this.createRpcQueue(queue));
return utils
.timeoutPromise(this._connection.config.timeout)
.then(() => this.createRpcQueue(queue));
});

return rpcQueue.queue;
Expand Down Expand Up @@ -153,7 +167,7 @@ class Producer {

this.publishOrSendToQueue(queue, msg, options);
// defered promise that will resolve when response is received
const responsePromise = new Deferred();
const responsePromise = pDefer();
this.amqpRPCQueues[queue][corrId] = responsePromise;

// Using given timeout or default one
Expand Down Expand Up @@ -206,25 +220,33 @@ class Producer {
}

_sendToQueue(queue, message, settings, currentRetryNumber) {
return this._connection.get().then((channel) => {
this.channel = channel;
return this._connection
.get()
.then((channel) => {
this.channel = channel;

// undefined can't be serialized/buffered :p
if (!message) message = null;
// undefined can't be serialized/buffered :p
if (!message) message = null;

this._connection.config.transport.info(loggerAlias, `[${queue}] > `, message);
this._connection.config.transport.info(
loggerAlias,
`[${queue}] > `,
message
);

return this.checkRpc(queue, parsers.out(message, settings), settings);
}).catch((err) => {
if (!this._shouldRetry(err, currentRetryNumber)) {
throw err;
}
return this.checkRpc(queue, parsers.out(message, settings), settings);
})
.catch((err) => {
if (!this._shouldRetry(err, currentRetryNumber)) {
throw err;
}

// add timeout between retries because we don't want to overflow the CPU
this._connection.config.transport.error(loggerAlias, err);
return utils.timeoutPromise(this._connection.config.timeout)
.then(() => this._sendToQueue(queue, message, settings, currentRetryNumber + 1));
});
// add timeout between retries because we don't want to overflow the CPU
this._connection.config.transport.error(loggerAlias, err);
return utils
.timeoutPromise(this._connection.config.timeout)
.then(() => this._sendToQueue(queue, message, settings, currentRetryNumber + 1));
});
}

_shouldRetry(error, currentRetryNumber) {
Expand Down