Skip to content

Commit

Permalink
Merge pull request #37 from hapinessjs/next
Browse files Browse the repository at this point in the history
version(v1.4.3)
  • Loading branch information
akanass committed Aug 20, 2018
2 parents 829c9e3 + 35e134d commit 26a3b10
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 5 deletions.
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -453,6 +453,8 @@ To set up your development environment:
[Back to top](#table-of-contents)

## Change History
* v1.4.3 (2018-08-20)
* Emit RETRY_LIMIT_EXCEEDED error on ConnectionManager
* v1.4.2 (2018-06-11)
* Do not retry to connect if closing server
* v1.4.1 (2018-05-31)
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "@hapiness/rabbitmq",
"version": "1.4.2",
"version": "1.4.3",
"description": "Hapiness module for rabbitmq",
"main": "commonjs/index.js",
"types": "index.d.ts",
Expand Down
9 changes: 7 additions & 2 deletions src/module/managers/connection-manager.ts
Expand Up @@ -8,6 +8,8 @@ import { ChannelStore } from './channel-store';
import { ChannelManager } from './channel-manager';

const debug = require('debug')('hapiness:rabbitmq');
const retryLimitErr: any = new Error('Retry limit exceeded');
retryLimitErr.code = 'RETRY_LIMIT_EXCEEDED';

export class ConnectionManager extends EventEmitter {
private _connection: Connection;
Expand Down Expand Up @@ -99,7 +101,7 @@ export class ConnectionManager extends EventEmitter {
.delay(this._options.retry.delay)
.takeWhile((attempts) => attempts < this._options.retry.maximum_attempts && !this._closingServer)
.take(this._options.retry.maximum_attempts)
.concat(Observable.throw(new Error('Retry limit exceeded')))
.concat(Observable.throw(retryLimitErr))
});
}

Expand All @@ -108,7 +110,6 @@ export class ConnectionManager extends EventEmitter {
return Observable.of(null);
}


this._closingServer = false;
this._isConnecting = true;

Expand All @@ -117,6 +118,10 @@ export class ConnectionManager extends EventEmitter {
this.emitEvent('connecting');
const obs = this.openConnection();
return obs
.catch(err => {
this.emitEvent('error', err);
return Observable.throw(err);
})
.flatMap(con => {
debug('connected, creating default channel ...');
this._connection = con;
Expand Down
4 changes: 3 additions & 1 deletion src/module/rabbitmq.extension.ts
Expand Up @@ -61,7 +61,9 @@ export class RabbitMQExt implements OnExtensionLoad, OnModuleInstantiated, OnShu
connection
.connect()
.flatMap(() => RegisterAnnotations.bootstrap(module, connection))
.subscribe(() => {}, err => errorHandler(err));
.subscribe(() => {}, err => {
errorHandler(err);
});
});

return RegisterAnnotations.bootstrap(module, connection);
Expand Down
19 changes: 19 additions & 0 deletions test/unit/managers/connection.test.ts
Expand Up @@ -136,4 +136,23 @@ export class ConnectionUnitTest {
instance.setDefaultPrefetch(5);
unit.number(instance.getDefaultPrefetch()).is(5);
}

@test(' - Test openConnection error')
testOpenConnectionError(done) {
const instance = new ConnectionManager({ retry: { delay: 100, maximum_attempts: 1 } });
instance['_connection'] = <any>new RabbitConnectionMock();
unit.stub(instance, '_connect').returns(Promise.reject(Observable.throw(new Error('Woopsie'))));

Observable.forkJoin([
Observable.fromEvent(instance, 'error').map(err => {
unit.object(err).isInstanceOf(Error).hasProperty('code', 'RETRY_LIMIT_EXCEEDED');
}),
instance.connect()
.catch(err => {
unit.object(err).isInstanceOf(Error).hasProperty('code', 'RETRY_LIMIT_EXCEEDED');
return Observable.throw(null);
})
.flatMap(() => Observable.throw(new Error('Cannot be here')))
]).subscribe(() => done(), err => done(err));
}
}

0 comments on commit 26a3b10

Please sign in to comment.