From d931bb1fd5762746a4a8039cbafb72e17053bf70 Mon Sep 17 00:00:00 2001 From: Antoine Gomez Date: Fri, 20 Apr 2018 17:34:09 +0200 Subject: [PATCH 1/5] Refactor channel management That will allow to handle close/error events and rabbit reconnection better --- package-lock.json | 2 +- package.json | 2 +- src/module/managers/channel-manager.ts | 66 +++++++++++++++++-- src/module/managers/channel-store.ts | 7 +-- src/module/managers/connection-manager.ts | 11 +++- src/module/managers/queue-manager.ts | 77 ++++++++++++++++------- src/module/register-annotations.ts | 10 ++- 7 files changed, 133 insertions(+), 42 deletions(-) diff --git a/package-lock.json b/package-lock.json index 200c744..09a8948 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "@hapiness/rabbitmq", - "version": "1.3.2", + "version": "1.4.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 6522e00..a58d003 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@hapiness/rabbitmq", - "version": "1.3.2", + "version": "1.4.0", "description": "Hapiness module for rabbitmq", "main": "commonjs/index.js", "types": "index.d.ts", diff --git a/src/module/managers/channel-manager.ts b/src/module/managers/channel-manager.ts index f069dd9..08845f5 100644 --- a/src/module/managers/channel-manager.ts +++ b/src/module/managers/channel-manager.ts @@ -1,30 +1,61 @@ import { Observable } from 'rxjs'; import { Channel as ChannelInterface, Connection, Replies } from 'amqplib'; import { ConnectionManager } from './connection-manager'; +import * as EventEmitter from 'events'; +import { MessageStore } from './message-store'; -export class ChannelManager { +const debug = require('debug')('hapiness:rabbitmq'); + +export class ChannelManager extends EventEmitter { private _connectionManager: ConnectionManager; private _connection: Connection; private ch: ChannelInterface; + private _isConnected: boolean; + private _reconnecting: boolean; + private _prefetch: number; + private _global: boolean; + private _key: any; - constructor(connectionManager: ConnectionManager) { + constructor(connectionManager: ConnectionManager, prefetch?: number, global?: boolean) { + super(); this._connectionManager = connectionManager; this._connection = connectionManager.connection; + this._prefetch = prefetch; + this._global = global; + } + + get prefetch(): number { + return this._prefetch; + } + + get global(): boolean { + return this._global; + } + + public canCreateChannel() { + return this._connectionManager.isConnected() && !this._connectionManager.isConnecting() && !MessageStore.isShutdownRunning(); } - public create(prefetch?: number, global?: boolean): Observable { + public create(): Observable { const obs = Observable.fromPromise(this._connection.createChannel()); return obs.map(ch => { this.ch = ch; + this.ch.on('error', err => this.defaultErrorHandler(err, 'error')); + this.ch.on('close', err => this.defaultErrorHandler(err, 'close')); + this._key = new Date(); + debug('channel created'); return ch; - }).switchMap(ch => this.prefetch(prefetch, global).map(() => ch)); + }).switchMap(ch => this.setPrefetch(this._prefetch, this._global).map(() => ch)); } - public prefetch(_count: number, global: boolean = false): Observable { + public setPrefetch(_count: number, global: boolean = false): Observable { if (!this.ch) { return Observable.throw(new Error('Create channel before setting prefetch')); } + this._prefetch = this._prefetch; + this._global = this._global; + const count = (_count === null || isNaN(_count)) ? this._connectionManager.getDefaultPrefetch() : _count; return Observable.fromPromise(this.ch.prefetch(count, global)); } @@ -35,6 +66,31 @@ export class ChannelManager { } public getChannel(): ChannelInterface { + debug('get channel', this._key); return this.ch; } + + public isConnected(): boolean { + return this._isConnected; + } + + private defaultErrorHandler(err, origin) { + this._isConnected = false; + if (!this._reconnecting && origin === 'close' && err && err.code !== 404 && this.canCreateChannel()) { + this._reconnecting = true; + debug(`recreating channel after ${origin} event`, { err }); + this.create() + .do(() => this.emit('reconnected')) + .catch(_err => { + debug(`could not recreate channel after ${origin} event`, { err: _err }); + this.emit('error', _err); + return Observable.of(null); + }) + .do(() => { + this._reconnecting = false; + }).subscribe(); + } else { + debug(`Channel ${origin} ${(err && [':', err.message].join(' ')) || ''}`, { err }); + } + } } diff --git a/src/module/managers/channel-store.ts b/src/module/managers/channel-store.ts index 48fb0fa..c3e0e17 100644 --- a/src/module/managers/channel-store.ts +++ b/src/module/managers/channel-store.ts @@ -17,16 +17,15 @@ export class ChannelStore { public create(key = 'default', { prefetch, global }: CreateChannelOptions = {}): Observable { const existing = this.get(key); - if (existing) { + if (existing && existing.isConnected()) { debug('channel existing returning', key); return Observable.of(existing); } debug('create channel', key, prefetch, global); - const channel = new ChannelManager(this._connectionManager); + const channel = new ChannelManager(this._connectionManager, prefetch, global); return channel .create() - .flatMap(ch => (isNaN(prefetch) ? Observable.of(ch) : channel.prefetch(prefetch, global).map(_ => ch))) .map(ch => { this._channels[key] = channel; return channel; @@ -35,7 +34,7 @@ export class ChannelStore { public upsert(key = 'default', { prefetch, global }: CreateChannelOptions = {}): Observable { const ch = this.get(key); - if (!ch) { + if (!ch || !ch.isConnected()) { return this.create(key, { prefetch, global }); } diff --git a/src/module/managers/connection-manager.ts b/src/module/managers/connection-manager.ts index a88698f..a85ab2f 100644 --- a/src/module/managers/connection-manager.ts +++ b/src/module/managers/connection-manager.ts @@ -5,6 +5,7 @@ import { Channel as ChannelInterface, Connection, connect } from 'amqplib'; import { RabbitMQConfigConnection } from '../interfaces'; import { events } from '../events'; import { ChannelStore } from './channel-store'; +import { ChannelManager } from './channel-manager'; export const REGEX_URI = /^amqp:\/\/([^@\n]+:[^@\n]+@)?(\w+)(:?)(\d{0,6})(\/[\w%]+)?(\?(?:&?[^=&\s]*=[^=&\s]*)+)?$/; @@ -14,7 +15,7 @@ export class ConnectionManager extends EventEmitter { private _connection: Connection; private _isConnecting: boolean; private _isConnected: boolean; - private _defaultChannel: ChannelInterface; + private _defaultChannel: ChannelManager; private _options: RabbitMQConfigConnection; private _uri: string; private _connect: typeof connect; @@ -111,17 +112,17 @@ export class ConnectionManager extends EventEmitter { const obs = this.openConnection(); return obs .flatMap(con => { + debug('connected, creating default channel ...'); this._connection = con; const createChannelObs = this.channelStore.create('default'); this._handleDisconnection(); - debug('connected, creating default channel ...'); this.emitEvent('opened', { connection: con }); return createChannelObs; }) .map(ch => { this._isConnected = true; this._isConnecting = false; - this._defaultChannel = ch.getChannel(); + this._defaultChannel = ch; debug('... channel created, RabbitMQ ready'); this.emitEvent('connected'); this.emitEvent('ready'); @@ -148,6 +149,10 @@ export class ConnectionManager extends EventEmitter { } get defaultChannel(): ChannelInterface { + return this._defaultChannel.getChannel(); + } + + get defaultChannelManager(): ChannelManager { return this._defaultChannel; } diff --git a/src/module/managers/queue-manager.ts b/src/module/managers/queue-manager.ts index 26e8a9b..b083390 100644 --- a/src/module/managers/queue-manager.ts +++ b/src/module/managers/queue-manager.ts @@ -5,16 +5,16 @@ import { extractMetadataByDecorator, errorHandler } from '@hapiness/core'; import { sendMessage, decodeJSONContent } from '../message'; import { MessageResult, MessageOptions, RabbitMessage, QueueOptions, ConsumeOptions, QueueInterface } from '../interfaces'; import { Bind } from '../decorators'; -import { ExchangeDecoratorInterface } from '../index'; +import { ExchangeDecoratorInterface, ChannelManager } from '../index'; import { QueueWrapper } from './queue-wrapper'; import { events } from '../events'; -import { MessageStore } from './message-store'; +import { MessageStore, StoreMessage } from './message-store'; const debug = require('debug')('hapiness:rabbitmq'); export const QUEUE_OPTIONS = ['durable', 'exclusive', 'autoDelete', 'arguments']; export class QueueManager { - private _ch: ChannelInterface; + private _ch: ChannelManager; private _name: string; private _queue: QueueInterface; private _binds: Array; @@ -22,7 +22,7 @@ export class QueueManager { private _isAsserted: boolean; private _options; - constructor(ch: ChannelInterface, queue: QueueWrapper | QueueOptions) { + constructor(ch: ChannelManager, queue: QueueWrapper | QueueOptions) { this._ch = ch; if (queue instanceof QueueWrapper) { @@ -52,9 +52,8 @@ export class QueueManager { } assert(): Observable { - const obs = Observable.fromPromise(this._ch.assertQueue(this.getName(), this._options)); debug(`asserting queue ${this.getName()}...`); - return obs.map(_ => { + return Observable.fromPromise(this._ch.getChannel().assertQueue(this.getName(), this._options)).map(_ => { this._isAsserted = true; debug(`... queue ${this.getName()} asserted`); @@ -97,8 +96,25 @@ export class QueueManager { dispatcher = (ch: ChannelInterface, message: RabbitMessage) => Observable.of(() => defaultDispatch(message, ch)); } + // Reconsume queue when channel is reconnected + this._ch.on('reconnected', () => { + this._consume({ dispatcher, options, defaultDispatch }); + }); + return this._consume({ dispatcher, options, defaultDispatch }); + } + + private _consume({ + dispatcher, + options, + defaultDispatch + }) { + const consumerChannel = this._ch.getChannel(); return Observable.fromPromise( +<<<<<<< HEAD + this._ch.getChannel().consume(this.getName(), message => { +======= this._ch.consume(this.getName(), message => { +>>>>>>> d367cd4... Coverage 100%, add events for message receive, send events.message.emit('received', message); const storeMessage = MessageStore.addMessage(message); try { @@ -110,7 +126,7 @@ export class QueueManager { debug(`new message on queue ${this.getName()}`, _message.fields.deliveryTag); - return dispatcher(this._ch, _message).switchMap(dispatch => { + return dispatcher(consumerChannel, _message).switchMap(dispatch => { if (typeof dispatch !== 'function') { debug('dispatcher did not returned a function, using defaultDispatcher'); return defaultDispatch(_message, this._ch); @@ -119,34 +135,51 @@ export class QueueManager { } }).subscribe( _ => { - this.handleMessageResult(message, _); + try { + this.handleMessageResult(message, _, consumerChannel); + } catch (err) { + events.queueManager.emit('ack_error', err); + } MessageStore.remove(storeMessage); }, err => { - this.handleMessageError(message, { storeMessage, options, err }); + this.handleMessageError(message, { storeMessage, options, err, consumerChannel }); } ); } catch (err) { - this.handleMessageError(message, { storeMessage, options, err }); + this.handleMessageError(message, { storeMessage, options, err, consumerChannel }); } }) ) - .do(res => MessageStore.addConsumer(this._ch, res.consumerTag)); + .do(res => MessageStore.addConsumer(consumerChannel, res.consumerTag)) + .do(res => { + ['close', 'error', 'reconnected'].forEach(event => this._ch.once(event, () => { + debug('removing consumer', res.consumerTag); + MessageStore.removeConsumer(res.consumerTag); + })); + }); } - handleMessageError(message, { storeMessage, options, err }) { + handleMessageError( + message: RabbitMessage, + { storeMessage, options, err, consumerChannel }: + { storeMessage: StoreMessage, + // options: { errorHandler: (err: Error, message: RabbitMessage, ch: ChannelInterface) => {} }, + options: any, + err: Error, consumerChannel: ChannelInterface } + ) { if (MessageStore.isShutdownRunning()) { - this._ch.reject(message, true); + consumerChannel.reject(message, true); } else { (typeof options.errorHandler === 'function' ? - options.errorHandler(err, message, this._ch) : errorHandler(err)); - this._ch.reject(message, false); + options.errorHandler(err, message, consumerChannel) : errorHandler(err)); + consumerChannel.reject(message, false); } MessageStore.remove(storeMessage); } - handleMessageResult(message: RabbitMessage, result: MessageResult): void { + handleMessageResult(message: RabbitMessage, result: MessageResult, consumerChannel: ChannelInterface): void { if (result === false) { debug('dispatcher returned false, not acking/rejecting message'); return; @@ -155,17 +188,17 @@ export class QueueManager { if (typeof result === 'object' && result) { if (result.ack) { debug('message ack', message.fields.consumerTag, message.fields.deliveryTag); - this._ch.ack(message); + consumerChannel.ack(message); return; } else if (result.reject) { debug('message reject', message.fields.consumerTag, message.fields.deliveryTag); - this._ch.reject(message, result.requeue); + consumerChannel.reject(message, result.requeue); return; } } debug('fallback message ack'); - this._ch.ack(message); + consumerChannel.ack(message); } createBinds(binds?: Array): Observable { @@ -193,11 +226,11 @@ export class QueueManager { bind(exchangeName, routingKey?: string): Observable { debug(`binding queue ${this.getName()} on exchange ${exchangeName} with routingKey ${routingKey}`); - return Observable.fromPromise(this._ch.bindQueue(this.getName(), exchangeName, routingKey || '')); + return Observable.fromPromise(this._ch.getChannel().bindQueue(this.getName(), exchangeName, routingKey || '')); } check(): Observable { - return Observable.fromPromise(this._ch.checkQueue(this.getName())); + return Observable.fromPromise(this._ch.getChannel().checkQueue(this.getName())); } sendMessage(message: any, options: MessageOptions = {}): boolean { @@ -208,6 +241,6 @@ export class QueueManager { options ); - return sendMessage(this._ch, message, _options); + return sendMessage(this._ch.getChannel(), message, _options); } } diff --git a/src/module/register-annotations.ts b/src/module/register-annotations.ts index 2ce35cc..70d846d 100644 --- a/src/module/register-annotations.ts +++ b/src/module/register-annotations.ts @@ -1,7 +1,6 @@ import { Observable } from 'rxjs'; -import { Channel as ChannelInterface } from 'amqplib'; import { Type, CoreModule, extractMetadataByDecorator, errorHandler, DependencyInjection } from '@hapiness/core'; -import { ConnectionManager } from './managers'; +import { ConnectionManager, ChannelManager } from './managers'; import { QueueDecoratorInterface, ExchangeDecoratorInterface, MessageDecoratorInterface, ChannelOptions } from './decorators'; import { QueueManager } from './managers'; import { ExchangeManager, ExchangeWrapper, QueueWrapper } from './managers'; @@ -24,11 +23,10 @@ export class RegisterAnnotations { }); } - public static getChannel(module: CoreModule, connection: ConnectionManager, channel: ChannelOptions): Observable { + public static getChannel(module: CoreModule, connection: ConnectionManager, channel: ChannelOptions): Observable { return connection .channelStore - .upsert(channel.key, { prefetch: channel.prefetch, global: channel.global }) - .map(ch => ch.getChannel()); + .upsert(channel.key, { prefetch: channel.prefetch, global: channel.global }); } public static buildExchanges(modules: CoreModule[], connection: ConnectionManager): Observable { @@ -64,7 +62,7 @@ export class RegisterAnnotations { RegisterAnnotations .getChannel(_module, connection, metadata.data.channel) .map(channel => ({ instance, metadata, channel, _module })) : - Observable.of({ instance, metadata, channel: connection.defaultChannel, _module })) + Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module })) .mergeMap(({ instance, metadata, channel, _module }) => { const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data)); return Observable.forkJoin(queue.assert(), Observable.of(metadata), Observable.of(_module)); From ec0c276f3523ad66796fd930b0573b0833fdf96b Mon Sep 17 00:00:00 2001 From: Antoine Gomez Date: Mon, 23 Apr 2018 13:25:07 +0200 Subject: [PATCH 2/5] Fix tests --- src/module/managers/channel-manager.ts | 4 +- src/module/managers/queue-manager.ts | 1 + test/mocha.opts | 3 +- test/mocks/Channel.ts | 3 +- test/mocks/ConnectionManager.ts | 2 +- test/unit/extension/init-extension.test.ts | 26 ++-- test/unit/managers/channel.test.ts | 49 +++++++- test/unit/managers/queue.test.ts | 139 ++++++++++++--------- 8 files changed, 146 insertions(+), 81 deletions(-) diff --git a/src/module/managers/channel-manager.ts b/src/module/managers/channel-manager.ts index 08845f5..31d5d9c 100644 --- a/src/module/managers/channel-manager.ts +++ b/src/module/managers/channel-manager.ts @@ -44,6 +44,8 @@ export class ChannelManager extends EventEmitter { this.ch.on('close', err => this.defaultErrorHandler(err, 'close')); this._key = new Date(); debug('channel created'); + this._isConnected = true; + this.emit('created'); return ch; }).switchMap(ch => this.setPrefetch(this._prefetch, this._global).map(() => ch)); } @@ -76,7 +78,7 @@ export class ChannelManager extends EventEmitter { private defaultErrorHandler(err, origin) { this._isConnected = false; - if (!this._reconnecting && origin === 'close' && err && err.code !== 404 && this.canCreateChannel()) { + if (!this._reconnecting && origin === 'error' && err && err.code !== 404 && this.canCreateChannel()) { this._reconnecting = true; debug(`recreating channel after ${origin} event`, { err }); this.create() diff --git a/src/module/managers/queue-manager.ts b/src/module/managers/queue-manager.ts index b083390..230c9fe 100644 --- a/src/module/managers/queue-manager.ts +++ b/src/module/managers/queue-manager.ts @@ -138,6 +138,7 @@ export class QueueManager { try { this.handleMessageResult(message, _, consumerChannel); } catch (err) { + /* istanbul ignore next */ events.queueManager.emit('ack_error', err); } MessageStore.remove(storeMessage); diff --git a/test/mocha.opts b/test/mocha.opts index 8ee6d55..b164fc8 100644 --- a/test/mocha.opts +++ b/test/mocha.opts @@ -4,4 +4,5 @@ --recursive --require ts-node/register --throw-deprecation ---colors \ No newline at end of file +-b +--colors diff --git a/test/mocks/Channel.ts b/test/mocks/Channel.ts index b2ec0e4..13978b7 100644 --- a/test/mocks/Channel.ts +++ b/test/mocks/Channel.ts @@ -1,4 +1,5 @@ -export class ChannelMock { +export const EventEmitter = require('events'); +export class ChannelMock extends EventEmitter { private consumeCallback; assertExchange() { diff --git a/test/mocks/ConnectionManager.ts b/test/mocks/ConnectionManager.ts index 12f828d..c9939bd 100644 --- a/test/mocks/ConnectionManager.ts +++ b/test/mocks/ConnectionManager.ts @@ -13,7 +13,7 @@ export class ConnectionManagerMock extends ConnectionManager { this['_connection'] = new RabbitConnectionMock(); const ch = new ChannelManager(this); ch.setChannel(new ChannelMock()); - this['_defaultChannel'] = ch.getChannel(); + this['_defaultChannel'] = ch; this.channelStore['_channels']['default'] = ch; } diff --git a/test/unit/extension/init-extension.test.ts b/test/unit/extension/init-extension.test.ts index 58bc7f2..d57ea6a 100644 --- a/test/unit/extension/init-extension.test.ts +++ b/test/unit/extension/init-extension.test.ts @@ -1,9 +1,8 @@ import { test, suite } from 'mocha-typescript'; import * as unit from 'unit.js'; import { RegisterAnnotations } from '../../../src/module/register-annotations'; -import { QueueManager, QueueWrapper } from '../../../src/module/managers'; +import { QueueManager, QueueWrapper, ChannelManager } from '../../../src/module/managers'; import { ChannelMock } from '../../mocks/Channel'; -import { Channel } from 'amqplib'; import { MessageRouter } from '../../../src/module/message-router'; import { UserQueue } from '../../fixtures/Queues'; import { extractMetadataByDecorator } from '@hapiness/core'; @@ -17,14 +16,15 @@ import { ConnectionManagerMock } from '../../mocks/ConnectionManager'; @suite('- Unit InitExtension') export class InitExtensionUnitTest { - private ch: Channel; + private ch: ChannelManager; private queueWrapper: QueueWrapper; private messageRouter: MessageRouter; private queue: QueueManager; private userQueue; before() { - this.ch = new ChannelMock(); + this.ch = new ChannelManager({connection: {}}); + this.ch['ch'] = new ChannelMock(); this.userQueue = new UserQueue(); this.queueWrapper = new QueueWrapper(this.userQueue, extractMetadataByDecorator(UserQueue, 'Queue')); this.messageRouter = new MessageRouter(); @@ -41,15 +41,15 @@ export class InitExtensionUnitTest { @test('- Should test consumeQueue when there is no message found') testConsumeQueue(done) { - unit.spy(this.queue['_ch'], 'consume'); + const spy = unit.spy(this.queue['_ch'].getChannel(), 'consume'); unit.function(RegisterAnnotations.consumeQueue); RegisterAnnotations.consumeQueue(this.queue, this.messageRouter); - unit.object(this.queue['_ch']['consume']['firstCall']); - unit.array(this.queue['_ch']['consume']['firstCall']['args']); - unit.string(this.queue['_ch']['consume']['firstCall']['args'][0]).is('user.queue'); - unit.function(this.queue['_ch']['consume']['firstCall']['args'][1]); + unit.object(spy['firstCall']); + unit.array(spy['firstCall']['args']); + unit.string(spy['firstCall']['args'][0]).is('user.queue'); + unit.function(spy['firstCall']['args'][1]); const message = generateMessage({ foo: 'bar' }, { exchange: 'user.queue' }); - this.queue['_ch']['consume']['firstCall']['args'][1](message); + spy['firstCall']['args'][1](message); unit.number(this.userQueue.onMessage.callCount).is(1); done(); } @@ -57,7 +57,7 @@ export class InitExtensionUnitTest { @test('- Should test consumeQueue when queue.consume() returns error <>') testConsumeQueueSubscribeError(done) { unit.function(RegisterAnnotations.consumeQueue); - unit.stub(this.queue['_ch'], 'consume').returns(Promise.reject(new Error('Cannot consume queue'))); + unit.stub(this.queue['_ch'].getChannel(), 'consume').returns(Promise.reject(new Error('Cannot consume queue'))); RegisterAnnotations .consumeQueue(this.queue, this.messageRouter) .subscribe(_ => done(), err => done(err)); @@ -65,13 +65,13 @@ export class InitExtensionUnitTest { @test('- Should test consumeQueue when there is an error other than message not found') testConsumeQueueError() { - unit.spy(this.queue['_ch'], 'consume'); + const spy = unit.spy(this.queue['_ch'].getChannel(), 'consume'); unit.function(RegisterAnnotations.consumeQueue); RegisterAnnotations.consumeQueue(this.queue, this.messageRouter); const stub = unit.stub(this.messageRouter, 'getDispatcher'); stub.returns(Observable.throw(new Error('Oops, something terrible happened!'))); const message = generateMessage({ foo: 'bar' }, { exchange: 'user.queue' }); - const obs: Subscription = this.queue['_ch']['consume']['firstCall']['args'][1](message); + const obs: Subscription = spy['firstCall']['args'][1](message); unit.object(stub.firstCall.returnValue).isInstanceOf(ErrorObservable); unit .object(stub.firstCall.returnValue.error) diff --git a/test/unit/managers/channel.test.ts b/test/unit/managers/channel.test.ts index 9e61f56..d4fca88 100644 --- a/test/unit/managers/channel.test.ts +++ b/test/unit/managers/channel.test.ts @@ -4,6 +4,7 @@ import * as unit from 'unit.js'; import { ChannelManager } from '../../../src/module/managers'; import { ConnectionManagerMock } from '../../mocks/ConnectionManager'; import { ChannelMock } from '../../mocks/Channel'; +import { Observable } from 'rxjs'; @suite('- Unit Channel') export class ChannelUnitTest { @@ -11,31 +12,35 @@ export class ChannelUnitTest { testCreate(done) { const instance = new ChannelManager(new ConnectionManagerMock()); unit.function(instance.create); - unit.function(instance.prefetch); + unit.function(instance.setPrefetch); unit.function(instance.getChannel); + unit.function(instance.canCreateChannel); const obs = instance.create(); obs .flatMap(ch => { unit.spy(ch, 'prefetch'); unit.object(ch).isInstanceOf(ChannelMock); unit.object(instance.getChannel()).is(ch); - return instance.prefetch(1, true); + return instance.setPrefetch(1, true); }) .subscribe(_ => { const ch = instance.getChannel(); unit.array(ch.prefetch['firstCall'].args).is([1, true]); unit.object(instance.setChannel(ch)).isInstanceOf(ChannelManager); + unit.bool(instance.canCreateChannel()).isTrue(); done(); }); } @test('- Test create with prefetch') testCreateWithPrefetch(done) { - const instance = new ChannelManager(new ConnectionManagerMock()); - const spy = unit.spy(instance, 'prefetch'); - instance.create(2).subscribe(_ => { + const instance = new ChannelManager(new ConnectionManagerMock(), 2, true); + const spy = unit.spy(instance, 'setPrefetch'); + instance.create().subscribe(_ => { unit.number(spy.callCount).is(1); unit.number(spy.firstCall.args[0]).is(2); + unit.number(instance.prefetch).is(2); + unit.bool(instance.global).isTrue(); done(); }); } @@ -43,7 +48,7 @@ export class ChannelUnitTest { @test('- Should test prefetch without channel') testPrefetch(done) { const instance = new ChannelManager(new ConnectionManagerMock()); - const obs = instance.prefetch(10); + const obs = instance.setPrefetch(10); obs.subscribe( _ => { done(new Error('Cannot be here')); @@ -57,4 +62,36 @@ export class ChannelUnitTest { } ); } + + @test('- Should test channel error') + testChannelError(done) { + const instance = new ChannelManager(new ConnectionManagerMock()); + const spyCreate = unit.spy(instance, 'create'); + const spyEmit = unit.spy(instance, 'emit'); + instance.create() + .subscribe(() => { + instance.getChannel().emit('error', new Error('Channel closed by server')); + instance.getChannel().emit('close'); + instance.getChannel().emit('error', new Error('Channel closed by server')); + setTimeout(() => { + unit.number(spyEmit.callCount).is(3); + unit.number(spyCreate.callCount).is(2); + done(); + }, 1000); + }, err => done(err)); + } + + @test('- Should test channel error, cannot reconnect') + testChannelError2(done) { + const instance = new ChannelManager(new ConnectionManagerMock()); + instance.create() + .subscribe(() => { + unit.stub(instance, 'create').returns(Observable.throw(new Error('Cannot create channel'))); + instance.on('error', err => { + unit.object(err).isInstanceOf(Error).hasProperty('message', 'Cannot create channel'); + done(); + }); + instance.getChannel().emit('error', new Error('Channel closed by server')); + }, err => done(err)); + } } diff --git a/test/unit/managers/queue.test.ts b/test/unit/managers/queue.test.ts index 0c22f66..ab06e15 100644 --- a/test/unit/managers/queue.test.ts +++ b/test/unit/managers/queue.test.ts @@ -4,7 +4,7 @@ import * as unit from 'unit.js'; import * as Message from '../../../src/module/message'; import { Observable } from 'rxjs/Observable'; -import { QueueManager, QueueWrapper } from '../../../src/module/managers'; +import { QueueManager, QueueWrapper, ChannelManager } from '../../../src/module/managers'; import { ChannelMock } from '../../mocks/Channel'; import { UserQueue, AnotherQueue } from '../../fixtures/Queues'; import { UserExchange } from '../../fixtures/Exchanges'; @@ -16,7 +16,7 @@ import { MessageStore } from '../../../src'; export class QueueServiceUnitTest { static stub_sendMessage: any; - private ch; + private ch: ChannelManager; private userQueue; private anotherQueue; private userQueueWrapper; @@ -31,13 +31,14 @@ export class QueueServiceUnitTest { } before() { - this.ch = new ChannelMock(); - unit.spy(this.ch, 'assertQueue'); - unit.spy(this.ch, 'bindQueue'); - unit.spy(this.ch, 'checkQueue'); - unit.spy(this.ch, 'consume'); - unit.spy(this.ch, 'reject'); - unit.spy(this.ch, 'ack'); + this.ch = new ChannelManager({ connection: {}}); + this.ch.setChannel(new ChannelMock()); + unit.spy(this.ch.getChannel(), 'assertQueue'); + unit.spy(this.ch.getChannel(), 'bindQueue'); + unit.spy(this.ch.getChannel(), 'checkQueue'); + unit.spy(this.ch.getChannel(), 'consume'); + unit.spy(this.ch.getChannel(), 'reject'); + unit.spy(this.ch.getChannel(), 'ack'); this.userQueue = new UserQueue(); unit.spy(this.userQueue, 'onAsserted'); @@ -108,12 +109,12 @@ export class QueueServiceUnitTest { return instance.bind('another.exchange', 'baz'); }) .subscribe(_ => { - unit.number(this.ch.bindQueue['callCount']).is(5); - unit.array(this.ch.bindQueue['firstCall'].args).is(['user.queue', 'user.exchange', 'user.edited']); - unit.array(this.ch.bindQueue['secondCall'].args).is(['user.queue', 'user.exchange', 'user.created']); - unit.array(this.ch.bindQueue['thirdCall'].args).is(['user.queue', 'user.exchange', 'user.deleted']); - unit.array(this.ch.bindQueue.getCalls()[3].args).is(['user.queue', 'user.exchange', '']); - unit.array(this.ch.bindQueue.getCalls()[4].args).is(['user.queue', 'another.exchange', 'baz']); + unit.number(this.ch.getChannel().bindQueue['callCount']).is(5); + unit.array(this.ch.getChannel().bindQueue['firstCall'].args).is(['user.queue', 'user.exchange', 'user.edited']); + unit.array(this.ch.getChannel().bindQueue['secondCall'].args).is(['user.queue', 'user.exchange', 'user.created']); + unit.array(this.ch.getChannel().bindQueue['thirdCall'].args).is(['user.queue', 'user.exchange', 'user.deleted']); + unit.array(this.ch.getChannel().bindQueue['getCalls']()[3].args).is(['user.queue', 'user.exchange', '']); + unit.array(this.ch.getChannel().bindQueue['getCalls']()[4].args).is(['user.queue', 'another.exchange', 'baz']); done(); }); } @@ -133,9 +134,9 @@ export class QueueServiceUnitTest { return anotherInstance.createBinds(); }) .subscribe(_ => { - unit.number(this.ch.bindQueue['callCount']).is(2); - unit.array(this.ch.bindQueue['firstCall'].args).is(['user.queue', 'user.exchange', 'foo']); - unit.array(this.ch.bindQueue['secondCall'].args).is(['user.queue', 'user.exchange', 'bar']); + unit.number(this.ch.getChannel().bindQueue['callCount']).is(2); + unit.array(this.ch.getChannel().bindQueue['firstCall'].args).is(['user.queue', 'user.exchange', 'foo']); + unit.array(this.ch.getChannel().bindQueue['secondCall'].args).is(['user.queue', 'user.exchange', 'bar']); done(); }); } @@ -160,23 +161,45 @@ export class QueueServiceUnitTest { return instance.consume(); }) .subscribe(_ => { - unit.bool(this.ch.consume['calledOnce']).isTrue(); - unit.string(this.ch.consume['firstCall'].args[0]).is(instance.getName()); - unit.function(this.ch.consume['firstCall'].args[1]); + unit.bool(this.ch.getChannel().consume['calledOnce']).isTrue(); + unit.string(this.ch.getChannel().consume['firstCall'].args[0]).is(instance.getName()); + unit.function(this.ch.getChannel().consume['firstCall'].args[1]); const message1 = generateMessage({ hello: 'world', result: { ack: true } }, { exchange: instance.getName() }); const message2 = generateMessage({ hello: 'world', result: false }, { exchange: instance.getName() }); const message3 = generateMessage({ hello: 'world', result: { reject: true } }, { exchange: instance.getName() }); const message4 = generateMessage({ hello: 'world', result: {} }, { exchange: instance.getName() }); - this.ch.sendMessage(message1); - this.ch.sendMessage(message2); - this.ch.sendMessage(message3); - this.ch.sendMessage(message4); + this.ch.getChannel()['sendMessage'](message1); + this.ch.getChannel()['sendMessage'](message2); + this.ch.getChannel()['sendMessage'](message3); + this.ch.getChannel()['sendMessage'](message4); unit.number(this.userQueue.onMessage['callCount']).is(4); done(); }); } + @test('- Should test channel reconnect') + testReconnect(done) { + const instance = new QueueManager(this.ch, this.userQueueWrapper); + unit.value(instance.getName()).is('user.queue'); + const obs = instance.assert(); + obs + .flatMap(_ => { + unit.bool(instance.isAsserted()).isTrue(); + return instance.consume(); + }) + .subscribe(_ => { + unit.bool(this.ch.getChannel().consume['calledOnce']).isTrue(); + unit.string(this.ch.getChannel().consume['firstCall'].args[0]).is(instance.getName()); + unit.function(this.ch.getChannel().consume['firstCall'].args[1]); + + setTimeout(() => { + this.ch.emit('reconnected'); + done(); + }, 1000); + }); + } + @test('- Should test consuming with errorHandler and decodeMessageContent to non bool value') testConsumeErrorHandlerDecodeNonBool(done) { const errorHandler = unit.stub(); @@ -192,15 +215,15 @@ export class QueueServiceUnitTest { return instance.consume(dispatcher, { decodeMessageContent: '', errorHandler }); }) .subscribe(_ => { - unit.bool(this.ch.consume['calledOnce']).isTrue(); - unit.string(this.ch.consume['firstCall'].args[0]).is(instance.getName()); - unit.function(this.ch.consume['firstCall'].args[1]); + unit.bool(this.ch.getChannel().consume['calledOnce']).isTrue(); + unit.string(this.ch.getChannel().consume['firstCall'].args[0]).is(instance.getName()); + unit.function(this.ch.getChannel().consume['firstCall'].args[1]); const message1 = generateMessage({ hello: 'world', result: { ack: true } }, { exchange: instance.getName() }); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); unit.number(dispatcher.callCount).is(1); unit.number(errorHandler.callCount).is(1); - unit.array(errorHandler.firstCall.args).is([err, message1, this.ch]); + unit.array(errorHandler.firstCall.args).is([err, message1, this.ch.getChannel()]); done(); }); } @@ -219,12 +242,12 @@ export class QueueServiceUnitTest { return instance.consume(dispatcher, { decodeMessageContent: '' }); }) .subscribe(_ => { - unit.bool(this.ch.consume['calledOnce']).isTrue(); - unit.string(this.ch.consume['firstCall'].args[0]).is(instance.getName()); - unit.function(this.ch.consume['firstCall'].args[1]); + unit.bool(this.ch.getChannel().consume['calledOnce']).isTrue(); + unit.string(this.ch.getChannel().consume['firstCall'].args[0]).is(instance.getName()); + unit.function(this.ch.getChannel().consume['firstCall'].args[1]); const message1 = generateMessage({ hello: 'world', result: { ack: true } }, { exchange: instance.getName() }); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); unit.number(dispatcher.callCount).is(1); done(); }); @@ -248,18 +271,18 @@ export class QueueServiceUnitTest { return instance.consume(spy, { decodeMessageContent: false }); }) .subscribe(_ => { - unit.bool(this.ch.consume['calledOnce']).isTrue(); - unit.string(this.ch.consume['firstCall'].args[0]).is(instance.getName()); - unit.function(this.ch.consume['firstCall'].args[1]); + unit.bool(this.ch.getChannel().consume['calledOnce']).isTrue(); + unit.string(this.ch.getChannel().consume['firstCall'].args[0]).is(instance.getName()); + unit.function(this.ch.getChannel().consume['firstCall'].args[1]); const message1 = generateMessage({ hello: 'world', result: { ack: true } }, { exchange: instance.getName() }); const message2 = generateMessage({ hello: 'world', result: false }, { exchange: instance.getName() }); const message3 = generateMessage({ hello: 'world', result: { reject: true } }, { exchange: instance.getName() }); const message4 = generateMessage({ hello: 'world', result: {} }, { exchange: instance.getName() }); - this.ch.sendMessage(message1); - this.ch.sendMessage(message2); - this.ch.sendMessage(message3); - this.ch.sendMessage(message4); + this.ch.getChannel()['sendMessage'](message1); + this.ch.getChannel()['sendMessage'](message2); + this.ch.getChannel()['sendMessage'](message3); + this.ch.getChannel()['sendMessage'](message4); unit.number(spy['callCount']).is(4); done(); }); @@ -275,7 +298,7 @@ export class QueueServiceUnitTest { unit.bool(instance.isAsserted()).isTrue(); instance.consume(); const message1 = generateMessage({ hello: 'world', result: { ack: true } }, { exchange: instance.getName() }); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); done(); }); } @@ -291,13 +314,13 @@ export class QueueServiceUnitTest { unit.bool(instance.isAsserted()).isTrue(); instance.consume((ch, message) => dispatcher); const message1 = generateMessage(null, { exchange: instance.getName() }); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); dispatcher.subscribe(r => { unit.number(spy.callCount).is(1); unit.bool(spy.firstCall.args[1]).isFalse(); - unit.number(instance['_ch']['ack']['callCount']).is(0); - unit.number(instance['_ch']['reject']['callCount']).is(0); + unit.number(instance['_ch'].getChannel()['ack']['callCount']).is(0); + unit.number(instance['_ch'].getChannel()['reject']['callCount']).is(0); done(); }); }); @@ -314,12 +337,12 @@ export class QueueServiceUnitTest { unit.bool(instance.isAsserted()).isTrue(); instance.consume((ch, message) => dispatcher); const message1 = generateMessage(null, { exchange: instance.getName() }); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); dispatcher.subscribe(r => { unit.number(spy.callCount).is(1); unit.object(spy.firstCall.args[1]).is({ reject: true }); - unit.number(instance['_ch']['reject']['callCount']).is(1); + unit.number(instance['_ch'].getChannel()['reject']['callCount']).is(1); done(); }); }); @@ -337,12 +360,12 @@ export class QueueServiceUnitTest { unit.bool(instance.isAsserted()).isTrue(); instance.consume((ch, message) => dispatcher); const message1 = generateMessage(null, { exchange: instance.getName() }); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); dispatcher.subscribe(err => { unit.number(spy.callCount).is(1); unit.object(spy.firstCall.args[1]).hasProperty('storeMessage'); - unit.number(instance['_ch']['reject']['callCount']).is(1); + unit.number(instance['_ch'].getChannel()['reject']['callCount']).is(1); messageStoreStub.restore(); done(); }, err => done(err)); @@ -360,12 +383,12 @@ export class QueueServiceUnitTest { unit.bool(instance.isAsserted()).isTrue(); instance.consume((ch, message) => dispatcher); const message1 = generateMessage(null, { exchange: instance.getName() }); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); dispatcher.subscribe(r => { unit.number(spy.callCount).is(1); unit.object(spy.firstCall.args[1]).is({ foo: 'bar' }); - unit.number(instance['_ch']['ack']['callCount']).is(1); + unit.number(instance['_ch'].getChannel()['ack']['callCount']).is(1); done(); }); }); @@ -382,12 +405,12 @@ export class QueueServiceUnitTest { unit.bool(instance.isAsserted()).isTrue(); instance.consume((ch, message) => dispatcher); const message1 = generateMessage(null, { exchange: instance.getName() }); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); dispatcher.subscribe(r => { unit.number(spy.callCount).is(1); unit.value(spy.firstCall.args[1]).is(null); - unit.number(instance['_ch']['ack']['callCount']).is(1); + unit.number(instance['_ch'].getChannel()['ack']['callCount']).is(1); done(); }); }); @@ -405,7 +428,7 @@ export class QueueServiceUnitTest { instance.consume(null, { errorHandler: _errorHandler, force_json_decode: true }); const message1 = generateMessage(null, { exchange: instance.getName() }); message1.content = Buffer.from('xaxa'); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); unit.number(_errorHandler.callCount).is(1); unit.object(_errorHandler.firstCall.args[0]).isInstanceOf(Error).hasProperty('message', 'Cannot parse JSON message'); done(); @@ -424,7 +447,7 @@ export class QueueServiceUnitTest { instance.consume(null, { force_json_decode: true }); const message1 = generateMessage(null, { exchange: instance.getName() }); message1.content = Buffer.from('xaxa'); - this.ch.sendMessage(message1); + this.ch.getChannel()['sendMessage'](message1); done(); }); } @@ -434,7 +457,7 @@ export class QueueServiceUnitTest { const instance = new QueueManager(this.ch, this.userQueueWrapper); instance.sendMessage({ hello: 'world' }); unit.bool(QueueServiceUnitTest.stub_sendMessage.calledOnce).isTrue(); - unit.array(QueueServiceUnitTest.stub_sendMessage.firstCall.args).is([this.ch, { hello: 'world' }, { queue: 'user.queue' }]); + unit.array(QueueServiceUnitTest.stub_sendMessage.firstCall.args).is([this.ch.getChannel(), { hello: 'world' }, { queue: 'user.queue' }]); } @test('- Test check queue') @@ -442,8 +465,8 @@ export class QueueServiceUnitTest { const instance = new QueueManager(this.ch, this.userQueueWrapper); const obs = instance.check(); obs.subscribe(_ => { - unit.bool(this.ch.checkQueue['calledOnce']).isTrue(); - unit.array(this.ch.checkQueue['firstCall'].args).is(['user.queue']); + unit.bool(this.ch.getChannel().checkQueue['calledOnce']).isTrue(); + unit.array(this.ch.getChannel().checkQueue['firstCall'].args).is(['user.queue']); done(); }); } From 1c70b6a7a7d06bd07b148b182c617a882af4b779 Mon Sep 17 00:00:00 2001 From: Antoine Gomez Date: Mon, 23 Apr 2018 14:22:53 +0200 Subject: [PATCH 3/5] Fix TSlint error --- src/module/managers/channel-manager.ts | 6 +++--- test/unit/managers/queue.test.ts | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/module/managers/channel-manager.ts b/src/module/managers/channel-manager.ts index 31d5d9c..142fa7e 100644 --- a/src/module/managers/channel-manager.ts +++ b/src/module/managers/channel-manager.ts @@ -55,10 +55,10 @@ export class ChannelManager extends EventEmitter { return Observable.throw(new Error('Create channel before setting prefetch')); } - this._prefetch = this._prefetch; - this._global = this._global; - const count = (_count === null || isNaN(_count)) ? this._connectionManager.getDefaultPrefetch() : _count; + + this._prefetch = count; + this._global = global; return Observable.fromPromise(this.ch.prefetch(count, global)); } diff --git a/test/unit/managers/queue.test.ts b/test/unit/managers/queue.test.ts index ab06e15..f08a3e4 100644 --- a/test/unit/managers/queue.test.ts +++ b/test/unit/managers/queue.test.ts @@ -457,7 +457,8 @@ export class QueueServiceUnitTest { const instance = new QueueManager(this.ch, this.userQueueWrapper); instance.sendMessage({ hello: 'world' }); unit.bool(QueueServiceUnitTest.stub_sendMessage.calledOnce).isTrue(); - unit.array(QueueServiceUnitTest.stub_sendMessage.firstCall.args).is([this.ch.getChannel(), { hello: 'world' }, { queue: 'user.queue' }]); + unit.array(QueueServiceUnitTest.stub_sendMessage.firstCall.args) + .is([this.ch.getChannel(), { hello: 'world' }, { queue: 'user.queue' }]); } @test('- Test check queue') From c62e225c59f4be8273fabb89e0bd378f5bae4d49 Mon Sep 17 00:00:00 2001 From: Antoine Gomez Date: Mon, 23 Apr 2018 15:53:06 +0200 Subject: [PATCH 4/5] CS --- src/module/managers/channel-manager.ts | 2 +- src/module/managers/message-store.ts | 3 ++- src/module/managers/queue-manager.ts | 7 ++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/module/managers/channel-manager.ts b/src/module/managers/channel-manager.ts index 142fa7e..08e349a 100644 --- a/src/module/managers/channel-manager.ts +++ b/src/module/managers/channel-manager.ts @@ -32,7 +32,7 @@ export class ChannelManager extends EventEmitter { return this._global; } - public canCreateChannel() { + public canCreateChannel(): boolean { return this._connectionManager.isConnected() && !this._connectionManager.isConnecting() && !MessageStore.isShutdownRunning(); } diff --git a/src/module/managers/message-store.ts b/src/module/managers/message-store.ts index 1143539..31d5ea6 100644 --- a/src/module/managers/message-store.ts +++ b/src/module/managers/message-store.ts @@ -4,6 +4,7 @@ import { Observable } from 'rxjs'; import { Channel as ChannelInterface } from 'amqplib'; import { RabbitMessage } from '../interfaces/rabbit-message'; import { Config } from '@hapiness/config'; +import { ConnectionManager } from './connection-manager'; const debug = require('debug')('hapiness:rabbitmq'); @@ -78,7 +79,7 @@ export class MessageStoreClass extends EventEmitter { this.shutdown_running = false; } - shutdown(connection): Observable { + shutdown(connection: ConnectionManager): Observable { if (this.shutdown_running) { debug('shutdown already running'); return Observable.of(null); diff --git a/src/module/managers/queue-manager.ts b/src/module/managers/queue-manager.ts index 230c9fe..26dd3c1 100644 --- a/src/module/managers/queue-manager.ts +++ b/src/module/managers/queue-manager.ts @@ -107,7 +107,7 @@ export class QueueManager { dispatcher, options, defaultDispatch - }) { + }): Observable { const consumerChannel = this._ch.getChannel(); return Observable.fromPromise( <<<<<<< HEAD @@ -154,6 +154,8 @@ export class QueueManager { ) .do(res => MessageStore.addConsumer(consumerChannel, res.consumerTag)) .do(res => { + // If channel is closed, has en error or is reconnected the consumerTag is not valid + // and needs to be removed ['close', 'error', 'reconnected'].forEach(event => this._ch.once(event, () => { debug('removing consumer', res.consumerTag); MessageStore.removeConsumer(res.consumerTag); @@ -165,8 +167,7 @@ export class QueueManager { message: RabbitMessage, { storeMessage, options, err, consumerChannel }: { storeMessage: StoreMessage, - // options: { errorHandler: (err: Error, message: RabbitMessage, ch: ChannelInterface) => {} }, - options: any, + options: { errorHandler: (err: Error, message: RabbitMessage, ch: ChannelInterface) => {} }, err: Error, consumerChannel: ChannelInterface } ) { if (MessageStore.isShutdownRunning()) { From 4bc4fc47429c0d7536f77358922c90f3bdca116a Mon Sep 17 00:00:00 2001 From: Antoine Gomez Date: Mon, 23 Apr 2018 17:14:15 +0200 Subject: [PATCH 5/5] Add blocking condition to forbid new connection when SIGTERM is received --- src/module/managers/connection-manager.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/module/managers/connection-manager.ts b/src/module/managers/connection-manager.ts index a85ab2f..15398a2 100644 --- a/src/module/managers/connection-manager.ts +++ b/src/module/managers/connection-manager.ts @@ -21,6 +21,7 @@ export class ConnectionManager extends EventEmitter { private _connect: typeof connect; private _defaultPrefetch: number; private _channelStore: ChannelStore; + private _isSIGTERMReceived: boolean; constructor(config?: RabbitMQConfigConnection) { super(); @@ -51,6 +52,10 @@ export class ConnectionManager extends EventEmitter { this._uri = `amqp://${credentials}${host}:${port}${vhost}${params}`; } + // Will block new connection if SIGTERM is received + process.once('SIGTERM', () => this._isSIGTERMReceived = true); + process.once('SIGINT', () => this._isSIGTERMReceived = true); + this.setDefaultPrefetch(this._options.default_prefetch); // Create a channel store for this connection @@ -104,6 +109,10 @@ export class ConnectionManager extends EventEmitter { return Observable.of(null); } + if (this._isSIGTERMReceived) { + return Observable.of(null); + } + this._isConnecting = true; debug('Connecting', this._uri);