diff --git a/lib/amqp.js b/lib/amqp.js index 8e5c8ae..b958bd7 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -205,21 +205,23 @@ class Amqp { await this._ensureConsumerChannel(); const { consumerTag } = await this.consumerChannel.consume(queue, async (amqpMessage) => { + if (!amqpMessage) { + log.warn('Consumer cancelled by rabbitmq'); + return; + } let message; try { message = this._decodeMessage(amqpMessage); - } catch (e) { - log.error(e, - 'Error occurred while parsing message #%j payload', - amqpMessage.fields.deliveryTag - ); + } catch (err) { + log.error({ err, deliveryTag: amqpMessage.fields.deliveryTag }, + 'Error occurred while parsing message payload'); this.reject(amqpMessage); return; } try { await messageHandler(message, amqpMessage); - } catch (e) { - log.error(e, 'Failed to process message #%j, reject', amqpMessage.fields.deliveryTag); + } catch (err) { + log.error({ err, deliveryTag: amqpMessage.fields.deliveryTag }, 'Failed to process message, reject'); this.reject(amqpMessage); } }); diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index 5e99d52..f373e3b 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -1255,12 +1255,10 @@ describe('Integration Test', () => { threadId }); try { - await Promise.all([ - runner.putOutToSea(settings.readFrom(env), ipc), - amqpHelper.removeListenQueue() - ]); + await amqpHelper.removeListenQueue(); + await runner.putOutToSea(settings.readFrom(env), ipc); } catch (e) { - expect(e).to.be.ok; + expect(e.message).to.match(/BasicConsume; 404/); await runner.__test__.disconnectOnly(); return; }