Skip to content

Commit

Permalink
Merge pull request #31 from hapinessjs/next
Browse files Browse the repository at this point in the history
release(version): v1.4.0
  • Loading branch information
akanass committed Apr 24, 2018
2 parents aa935e1 + a97aa8a commit dc9cbd2
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/module/managers/channel-manager.ts
Expand Up @@ -32,7 +32,7 @@ export class ChannelManager extends EventEmitter {
return this._global;
}

public canCreateChannel() {
public canCreateChannel(): boolean {
return this._connectionManager.isConnected() && !this._connectionManager.isConnecting() && !MessageStore.isShutdownRunning();
}

Expand Down
9 changes: 9 additions & 0 deletions src/module/managers/connection-manager.ts
Expand Up @@ -21,6 +21,7 @@ export class ConnectionManager extends EventEmitter {
private _connect: typeof connect;
private _defaultPrefetch: number;
private _channelStore: ChannelStore;
private _isSIGTERMReceived: boolean;

constructor(config?: RabbitMQConfigConnection) {
super();
Expand Down Expand Up @@ -51,6 +52,10 @@ export class ConnectionManager extends EventEmitter {
this._uri = `amqp://${credentials}${host}:${port}${vhost}${params}`;
}

// Will block new connection if SIGTERM is received
process.once('SIGTERM', () => this._isSIGTERMReceived = true);
process.once('SIGINT', () => this._isSIGTERMReceived = true);

this.setDefaultPrefetch(this._options.default_prefetch);

// Create a channel store for this connection
Expand Down Expand Up @@ -104,6 +109,10 @@ export class ConnectionManager extends EventEmitter {
return Observable.of(null);
}

if (this._isSIGTERMReceived) {
return Observable.of(null);
}

this._isConnecting = true;

debug('Connecting', this._uri);
Expand Down
3 changes: 2 additions & 1 deletion src/module/managers/message-store.ts
Expand Up @@ -4,6 +4,7 @@ import { Observable } from 'rxjs';
import { Channel as ChannelInterface } from 'amqplib';
import { RabbitMessage } from '../interfaces/rabbit-message';
import { Config } from '@hapiness/config';
import { ConnectionManager } from './connection-manager';

const debug = require('debug')('hapiness:rabbitmq');

Expand Down Expand Up @@ -78,7 +79,7 @@ export class MessageStoreClass extends EventEmitter {
this.shutdown_running = false;
}

shutdown(connection): Observable<any> {
shutdown(connection: ConnectionManager): Observable<any> {
if (this.shutdown_running) {
debug('shutdown already running');
return Observable.of(null);
Expand Down
7 changes: 4 additions & 3 deletions src/module/managers/queue-manager.ts
Expand Up @@ -107,7 +107,7 @@ export class QueueManager {
dispatcher,
options,
defaultDispatch
}) {
}): Observable<Replies.Consume> {
const consumerChannel = this._ch.getChannel();
return Observable.fromPromise(
this._ch.getChannel().consume(this.getName(), message => {
Expand Down Expand Up @@ -150,6 +150,8 @@ export class QueueManager {
)
.do(res => MessageStore.addConsumer(consumerChannel, res.consumerTag))
.do(res => {
// If channel is closed, has en error or is reconnected the consumerTag is not valid
// and needs to be removed
['close', 'error', 'reconnected'].forEach(event => this._ch.once(event, () => {
debug('removing consumer', res.consumerTag);
MessageStore.removeConsumer(res.consumerTag);
Expand All @@ -161,8 +163,7 @@ export class QueueManager {
message: RabbitMessage,
{ storeMessage, options, err, consumerChannel }:
{ storeMessage: StoreMessage,
// options: { errorHandler: (err: Error, message: RabbitMessage, ch: ChannelInterface) => {} },
options: any,
options: { errorHandler: (err: Error, message: RabbitMessage, ch: ChannelInterface) => {} },
err: Error, consumerChannel: ChannelInterface }
) {
if (MessageStore.isShutdownRunning()) {
Expand Down

0 comments on commit dc9cbd2

Please sign in to comment.