/
channel-manager.ts
102 lines (88 loc) · 3.49 KB
/
channel-manager.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import { Observable } from 'rxjs';
import { Channel as ChannelInterface, Connection, Replies } from 'amqplib';
import { ConnectionManager } from './connection-manager';
import * as EventEmitter from 'events';
import { MessageStore } from './message-store';
const debug = require('debug')('hapiness:rabbitmq');
export class ChannelManager extends EventEmitter {
private _connectionManager: ConnectionManager;
private _connection: Connection;
private ch: ChannelInterface;
private _isConnected: boolean;
private _reconnecting: boolean;
private _prefetch: number;
private _global: boolean;
private _key: any;
constructor(connectionManager: ConnectionManager, prefetch?: number, global?: boolean) {
super();
this._connectionManager = connectionManager;
this._connection = connectionManager.connection;
this._prefetch = prefetch;
this._global = global;
this._connectionManager.on('error', () => {
this._isConnected = false;
});
this.setMaxListeners(0);
}
get prefetch(): number {
return this._prefetch;
}
get global(): boolean {
return this._global;
}
public canCreateChannel(): boolean {
return this._connectionManager.isConnected() && !this._connectionManager.isConnecting() && !MessageStore.isShutdownRunning();
}
public create(): Observable<ChannelInterface> {
const obs = Observable.fromPromise(this._connection.createChannel());
return obs.map(ch => {
this.ch = ch;
this.ch.on('error', err => this.defaultErrorHandler(err, 'error'));
this.ch.on('close', err => this.defaultErrorHandler(err, 'close'));
this._key = new Date();
debug('channel created');
this._isConnected = true;
this.emit('created');
return ch;
}).switchMap(ch => this.setPrefetch(this._prefetch, this._global).map(() => ch));
}
public setPrefetch(_count: number, global: boolean = false): Observable<Replies.Empty> {
if (!this.ch) {
return Observable.throw(new Error('Create channel before setting prefetch'));
}
const count = (_count === null || isNaN(_count)) ? this._connectionManager.getDefaultPrefetch() : _count;
this._prefetch = count;
this._global = global;
return Observable.fromPromise(this.ch.prefetch(count, global));
}
public setChannel(ch): ChannelManager {
this.ch = ch;
return this;
}
public getChannel(): ChannelInterface {
debug('get channel', this._key);
return this.ch;
}
public isConnected(): boolean {
return this._isConnected;
}
private defaultErrorHandler(err, origin) {
this._isConnected = false;
if (!this._reconnecting && origin === 'error' && err && err.code !== 404 && this.canCreateChannel()) {
this._reconnecting = true;
debug(`recreating channel after ${origin} event`, { err });
this.create()
.do(() => this.emit('reconnected'))
.catch(_err => {
debug(`could not recreate channel after ${origin} event`, { err: _err });
this.emit('error', _err);
return Observable.of(null);
})
.do(() => {
this._reconnecting = false;
}).subscribe();
} else {
debug(`Channel ${origin} ${(err && [':', err.message].join(' ')) || ''}`, { err });
}
}
}