Skip to content

Commit

Permalink
better event handling; +tests improved
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikhus committed Dec 30, 2019
1 parent c48ff08 commit 23a9c19
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 66 deletions.
73 changes: 48 additions & 25 deletions src/PgPubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,14 @@ export class PgPubSub extends EventEmitter {
this.pgClient = (this.options.pgClient || new Client(this.options)) as
PgClient;

this.pgClient.on('end', this.safeFailure('end'));
this.pgClient.on('error', this.safeFailure('error'));
this.pgClient.on('notification', this.onNotification.bind(this));
this.pgClient.on('end', () => this.emit('end'));
this.pgClient.on('error', () => this.emit('error'));

this.onNotification = this.onNotification.bind(this);
this.reconnect = this.reconnect.bind(this);
this.onReconnect = this.onReconnect.bind(this);

this.pgClient.on('notification', this.onNotification);
}

/**
Expand All @@ -324,7 +327,7 @@ export class PgPubSub extends EventEmitter {
* @return {Promise<void>}
*/
public async connect(): Promise<void> {
this.pgClient.once('end', this.reconnect);
this.setOnceHandler(['end', 'error'], this.reconnect);
this.pgClient.once('connect', async () => {
await this.setAppName();
this.emit('connect');
Expand All @@ -340,6 +343,7 @@ export class PgPubSub extends EventEmitter {
*/
public async close(): Promise<void> {
this.pgClient.removeListener('end', this.reconnect);
this.pgClient.removeListener('error', this.reconnect);
await this.pgClient.end();
this.pgClient.removeAllListeners();
this.emit('close');
Expand Down Expand Up @@ -490,9 +494,41 @@ export class PgPubSub extends EventEmitter {
this.removeAllListeners();
}

/**
* Safely sets given handler for given pg client events, making sure
* we won't flood events with non-fired same stack of handlers
*
* @access private
* @param {string[]} events - list of events to set handler for
* @param {(...args: any[]) => any} handler - handler reference
* @return {PgPubSub}
*/
private setOnceHandler(
events: string[],
handler: (...args: any) => any,
): PgPubSub {
for (const event of events) {
// make sure we won't flood events with given handler,
// so do a cleanup first
const listeners = this.pgClient.listeners(event);

for (const listener of listeners) {
if (listener === handler) {
this.pgClient.removeListener(event, handler);
}
}

// now set event handler
this.pgClient.once(event, handler);
}

return this;
}

/**
* Database notification event handler
*
* @access private
* @param {Notification} notification - database message data
* @return {Promise<void>}
*/
Expand All @@ -515,25 +551,10 @@ export class PgPubSub extends EventEmitter {
this.channels.emit(notification.channel, payload);
}

/**
* Failure handler
*
* @param {string} event
* @return {() => Promise<void>}
*/
private safeFailure(event: string): () => Promise<void> {
return async () => {
if (this.options.singleListener) {
await this.release();
}

this.emit(event);
};
}

/**
* On reconnect event emitter
*
* @access private
* @return {Promise<void>}
*/
private async onReconnect(): Promise<void> {
Expand All @@ -549,19 +570,20 @@ export class PgPubSub extends EventEmitter {
* Reconnect routine, used for implementation of auto-reconnecting db
* connection
*
* @access private
* @return {number}
*/
private reconnect(): Timeout {
return setTimeout(async () => {
if (this.options.retryLimit <= ++this.retry) {
const msg = `Connect failed after ${this.retry} retries...`;

this.emit('error', new Error(msg));
this.emit('error', new Error(
`Connect failed after ${this.retry} retries...`,
));

return this.close();
return await this.close();
}

this.once('connect', this.onReconnect.bind(this));
this.setOnceHandler(['connect'], this.onReconnect);
await this.connect();
},

Expand Down Expand Up @@ -610,6 +632,7 @@ export class PgPubSub extends EventEmitter {
/**
* Sets application_name for this connection as unique identifier
*
* @access private
* @return {Promise<void>}
*/
private async setAppName(): Promise<void> {
Expand Down
146 changes: 105 additions & 41 deletions test/src/PgPubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import '../mocks';
import { expect } from 'chai';
import { Client } from 'pg';
import * as sinon from 'sinon';
import { PgClient, PgIpLock, PgPubSub } from '../../src';
import { PgClient, PgIpLock, PgPubSub, RETRY_LIMIT } from '../../src';

describe('PgPubSub', () => {
let pgClient: Client;
Expand Down Expand Up @@ -78,60 +78,82 @@ describe('PgPubSub', () => {
it('should support automatic reconnect', done => {
let counter = 0;

pubSub.on('connect', () => counter++);
pubSub.connect().then(() => pgClient.emit('end'));
// emulate termination
(pgClient as any).connect = () => {
counter++;
pgClient.emit('end');
};

setTimeout(() => {
expect(counter).equals(2);
pubSub.on('error', err => {
expect(err.message).equals(
`Connect failed after ${counter} retries...`,
);
done();
}, 210);
});

pubSub.connect();
});
it('should emit error and end if retry limit reached', done => {
let err: Error;
it('should support automatic reconnect on errors', done => {
let counter = 0;

// emulate termination
(pgClient as any).connect = () => {
counter++;
pgClient.emit('error');
};

pubSub.on('error', err => {
if (err) {
expect(err.message).equals(
`Connect failed after ${counter} retries...`,
);
done();
}
});

pubSub.options.retryLimit = 3;
pubSub.connect();
});
it('should emit error and end if retry limit reached', async () => {
// emulate connection failure
pubSub.connect = async () => {
pgClient.once('end', (pubSub as any).reconnect);
(pgClient as any).connect = async () => {
pgClient.emit('end');
};
pubSub.on('error', e => (err = e));
pubSub.connect();

setTimeout(() => {
try {
await pubSub.connect();
} catch (err) {
expect(err).to.be.instanceOf(Error);
expect(err).to.match(/failed after 3 retries/);
done();
}, 500);
expect(err.message).equals(
`Connect failed after ${RETRY_LIMIT} retries...`,
);
}
});
it('should re-subscribe all channels', done => {
let counter = 0;

pubSub.listen('TestOne');
pubSub.listen('TestTwo');

const spy = sinon.spy(pubSub, 'listen');

pubSub.on('connect', () => counter++);
pubSub.connect().then(() => pgClient.emit('end'));

setTimeout(() => {
expect(spy.calledTwice).to.be.true;
done();
}, 210);
}, 30);
});
});
describe('close()', () => {
it('should not reconnect if called', done => {
it('should not reconnect if called', async () => {
let counter = 0;

pubSub.on('connect', () => counter++);
pubSub.connect().then(() => pubSub.close());
pubSub.on('connect', () => {
counter++;
pubSub.close();
});

setTimeout(() => {
expect(counter).equals(1);
done();
}, 210);
await pubSub.connect();

expect(counter).equals(1);
});
});
describe('listen()', () => {
Expand Down Expand Up @@ -165,25 +187,22 @@ describe('PgPubSub', () => {
done();
});
});
it('should not handle messages from db with no lock', done => {
it('should not handle messages from db with no lock', async () => {
pubSub.options.singleListener = true;

const spy = sinon.spy(pubSub, 'emit');

pubSub.listen('TestChannel').then(() => {
(pubSub as any).locks.TestChannel.release();

pgClient.emit('notification', {
channel: 'TestChannel',
payload: 'true',
});
await pubSub.listen('TestChannel');
await (pubSub as any).locks.TestChannel.release();

setTimeout(() => {
expect(spy.calledWith('message', 'TestChannel', true))
.to.be.false;
done();
}, 100);
pgClient.emit('notification', {
channel: 'TestChannel',
payload: 'true',
});

await new Promise(resolve => setTimeout(resolve, 20));

expect(spy.calledWith('message', 'TestChannel', true)).to.be.false;
});
});
describe('unlisten()', () => {
Expand Down Expand Up @@ -319,6 +338,51 @@ describe('PgPubSub', () => {
});
});
});
describe('release()', () => {
it('should release all locks acquired', async () => {
await pubSub.listen('One');
await pubSub.listen('Two');

const spies = [
sinon.spy((pubSub as any).locks.One, 'release'),
sinon.spy((pubSub as any).locks.Two, 'release'),
];

await (pubSub as any).release();
spies.forEach(spy => expect(spy.called).to.be.true);
});
it('should skip locks which was not acquired', async () => {
await pubSub.listen('One');
await pubSub.listen('Two');

await (pubSub as any).locks.One.release();
await (pubSub as any).locks.Two.release();

const spies = [
sinon.spy((pubSub as any).locks.One, 'release'),
sinon.spy((pubSub as any).locks.Two, 'release'),
];

await (pubSub as any).release();
spies.forEach(spy => expect(spy.called).to.be.false);
});
it('should release only acquired locks', async () => {
await pubSub.listen('One');
await pubSub.listen('Two');

await (pubSub as any).locks.One.release();

const [one, two] = [
sinon.spy((pubSub as any).locks.One, 'release'),
sinon.spy((pubSub as any).locks.Two, 'release'),
];

await (pubSub as any).release();

expect(one.called).to.be.false;
expect(two.called).to.be.true;
});
});
describe('destroy()', () => {
it('should properly handle destruction', async () => {
const spies = [
Expand Down

0 comments on commit 23a9c19

Please sign in to comment.