Skip to content

Commit

Permalink
Merge pull request #2855 from ethereum/fix/websocket-reconnecting
Browse files Browse the repository at this point in the history
WebsocketProvider improvements
  • Loading branch information
nivida committed Jun 4, 2019
2 parents c3f4845 + c8ba201 commit ef84067
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ export default class TransactionObserver {
this.getTransactionReceiptMethod.parameters = [transactionHash];

const receipt = await this.getTransactionReceiptMethod.execute();

// on parity nodes you can get the receipt without it being mined
// so the receipt may not have a block number at this point
if (receipt && receipt.blockNumber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
* @date 2018
*/

import isFunction from 'lodash/isFunction';
import EventEmitter from 'eventemitter3';

/**
Expand Down Expand Up @@ -80,7 +79,9 @@ export default class AbstractSubscription extends EventEmitter {
* @callback callback callback(error, result)
* @returns {AbstractSubscription}
*/
subscribe(callback) {
subscribe(callback = null) {
this.callback = callback;

this.beforeSubscription(this.moduleInstance);
let subscriptionParameters = [];

Expand All @@ -93,34 +94,13 @@ export default class AbstractSubscription extends EventEmitter {
.then((subscriptionId) => {
this.id = subscriptionId;

this.moduleInstance.currentProvider.once('error', (error) => {
this.moduleInstance.currentProvider.removeAllListeners(this.id);

if (isFunction(callback)) {
callback(error, false);

return;
}

this.emit('error', error);
this.removeAllListeners();
});

this.moduleInstance.currentProvider.on(this.id, (response) => {
const formattedOutput = this.onNewSubscriptionItem(response.result);

if (isFunction(callback)) {
callback(false, formattedOutput);

return;
}

this.emit('data', formattedOutput);
});
// TODO: Improve listener handling for subscriptions
this.moduleInstance.currentProvider.on('error', this.errorListener.bind(this));
this.moduleInstance.currentProvider.on(this.id, this.subscriptionListener.bind(this));
})
.catch((error) => {
if (isFunction(callback)) {
callback(error, null);
if (this.callback) {
this.callback(error, null);

return;
}
Expand All @@ -132,6 +112,42 @@ export default class AbstractSubscription extends EventEmitter {
return this;
}

/**
* Listens to the provider errors
*
* @method errorListener
*
* @param {Error} error
*/
errorListener(error) {
if (this.callback) {
this.callback(error, false);

return;
}

this.emit('error', error);
}

/**
* Listens to the subscription
*
* @method subscriptionListener
*
* @param {Object} response
*/
subscriptionListener(response) {
const formattedOutput = this.onNewSubscriptionItem(response.result);

if (this.callback) {
this.callback(false, formattedOutput);

return;
}

this.emit('data', formattedOutput);
}

/**
* Unsubscribes subscription
*
Expand All @@ -148,17 +164,20 @@ export default class AbstractSubscription extends EventEmitter {
.then((response) => {
if (!response) {
const error = new Error('Error on unsubscribe!');
if (isFunction(callback)) {
if (callback) {
callback(error, null);
}

throw error;
}

this.moduleInstance.currentProvider.removeListener('error', this.errorListener);
this.moduleInstance.currentProvider.removeListener(this.id, this.subscriptionListener);

this.id = null;
this.removeAllListeners('data');
this.removeAllListeners();

if (isFunction(callback)) {
if (callback) {
callback(false, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ describe('AbstractSubscriptionTest', () => {
});

it('calls subscribe and emits a error from the provider error listener', (done) => {
moduleInstanceMock.currentProvider.once = jest.fn((event, callback) => {
expect(event).toEqual('error');

callback(new Error('ERROR'));
moduleInstanceMock.currentProvider.on = jest.fn((event, callback) => {
if (event === 'error') {
callback(new Error('ERROR'));
}
});

const subscription = abstractSubscription.subscribe();

subscription.on('error', (error) => {
expect(error).toEqual(new Error('ERROR'));

expect(moduleInstanceMock.currentProvider.removeAllListeners).toHaveBeenCalledWith('MY_ID');
expect(moduleInstanceMock.currentProvider.on).toHaveBeenCalledTimes(1);

done();
});
Expand Down Expand Up @@ -108,27 +108,11 @@ describe('AbstractSubscriptionTest', () => {
});
});

it('calls subscribe and returns a error from the provider error listener', (done) => {
moduleInstanceMock.currentProvider.once = jest.fn((event, callback) => {
expect(event).toEqual('error');

callback(new Error('ERROR'));
});

abstractSubscription.subscribe((error) => {
expect(error).toEqual(new Error('ERROR'));

expect(moduleInstanceMock.currentProvider.removeAllListeners).toHaveBeenCalledWith('MY_ID');

done();
});
});

it('calls subscribe with a callback and it returns the expected value', (done) => {
moduleInstanceMock.currentProvider.on = jest.fn((id, callback) => {
expect(id).toEqual('MY_ID');

callback({result: 'SUBSCRIPTION_ITEM'});
if (id === 'MY_ID') {
callback({result: 'SUBSCRIPTION_ITEM'});
}
});

const callback = jest.fn((error, response) => {
Expand All @@ -138,7 +122,7 @@ describe('AbstractSubscriptionTest', () => {

expect(response).toEqual('SUBSCRIPTION_ITEM');

expect(moduleInstanceMock.currentProvider.once).toHaveBeenCalled();
expect(moduleInstanceMock.currentProvider.on).toHaveBeenCalledTimes(2);

done();
});
Expand All @@ -149,9 +133,9 @@ describe('AbstractSubscriptionTest', () => {

it('calls subscribe and emits the data event', (done) => {
moduleInstanceMock.currentProvider.on = jest.fn((id, callback) => {
expect(id).toEqual('MY_ID');

callback({result: 'SUBSCRIPTION_ITEM'});
if (id === 'MY_ID') {
callback({result: 'SUBSCRIPTION_ITEM'});
}
});

abstractSubscription.options = null;
Expand All @@ -160,13 +144,14 @@ describe('AbstractSubscriptionTest', () => {
subscription.on('data', (data) => {
expect(data).toEqual('SUBSCRIPTION_ITEM');

expect(moduleInstanceMock.currentProvider.once).toHaveBeenCalled();
expect(moduleInstanceMock.currentProvider.on).toHaveBeenCalledTimes(2);

done();
});
});

it('calls unsubscribe and returns with a resolved promise', async () => {
moduleInstanceMock.currentProvider.removeListener = jest.fn();
moduleInstanceMock.currentProvider.unsubscribe = jest.fn((id, type) => {
expect(id).toEqual('ID');

Expand All @@ -188,6 +173,18 @@ describe('AbstractSubscriptionTest', () => {

expect(abstractSubscription.listenerCount('data')).toEqual(0);

expect(moduleInstanceMock.currentProvider.removeListener).toHaveBeenNthCalledWith(
1,
'error',
abstractSubscription.errorListener
);

expect(moduleInstanceMock.currentProvider.removeListener).toHaveBeenNthCalledWith(
2,
'ID',
abstractSubscription.subscriptionListener
);

expect(abstractSubscription.id).toEqual(null);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ describe('LogSubscriptionTest', () => {
});

socketProviderMock.on = jest.fn((subscriptionId, callback) => {
expect(subscriptionId).toEqual('MY_ID');

callback('SUBSCRIPTION_ITEM');
if (subscriptionId === 'MY_ID') {
callback('SUBSCRIPTION_ITEM');
}
});

moduleInstanceMock.currentProvider = socketProviderMock;
Expand Down Expand Up @@ -161,9 +161,9 @@ describe('LogSubscriptionTest', () => {
});

socketProviderMock.on = jest.fn((subscriptionId, callback) => {
expect(subscriptionId).toEqual('MY_ID');

callback('SUBSCRIPTION_ITEM');
if (subscriptionId === 'MY_ID') {
callback('SUBSCRIPTION_ITEM');
}
});

moduleInstanceMock.currentProvider = socketProviderMock;
Expand All @@ -175,6 +175,8 @@ describe('LogSubscriptionTest', () => {

expect(logSubscription.id).toEqual('MY_ID');

expect(socketProviderMock.on).toHaveBeenCalledTimes(2);

done();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ export default class AbstractSocketProvider extends EventEmitter {
this.emit(this.ERROR, error);
this.emit(this.SOCKET_ERROR, error);
this.removeAllSocketListeners();
this.removeAllListeners();
}

/**
Expand Down Expand Up @@ -201,7 +200,9 @@ export default class AbstractSocketProvider extends EventEmitter {
this.subscriptions[key].parameters.slice(1)
);

delete this.subscriptions[subscriptionId];
if (key !== subscriptionId) {
delete this.subscriptions[subscriptionId];
}

this.subscriptions[key].id = subscriptionId;
}
Expand Down
5 changes: 4 additions & 1 deletion packages/web3-providers/src/batch-request/BatchRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ export default class BatchRequest {
let mappedResult;

// TODO: Find a better handling for custom behaviours in a batch request (afterBatchRequest?)
if (method.Type === 'eth-send-transaction-method' || method.Type === 'observed-transaction-method') {
if (
method.Type === 'eth-send-transaction-method' ||
method.Type === 'observed-transaction-method'
) {
mappedResult = responseItem.result;
} else {
mappedResult = method.afterExecution(responseItem.result);
Expand Down
8 changes: 7 additions & 1 deletion packages/web3-providers/src/providers/IpcProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,17 @@ export default class IpcProvider extends AbstractSocketProvider {
id = payload.id;
}

this.once(id, resolve);
this.once(id, (response) => {
resolve(response);

this.removeListener('error', reject);
});

return;
}

this.removeListener('error', reject);

return reject(new Error("Connection error: Couldn't write on the socket with Socket.write(payload)"));
});
}
Expand Down
20 changes: 17 additions & 3 deletions packages/web3-providers/src/providers/WebsocketProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,17 @@ export default class WebsocketProvider extends AbstractSocketProvider {
let timeout, id;

if (this.connection.readyState !== this.connection.OPEN) {
this.removeListener('error', reject);

return reject(new Error('Connection error: Connection is not open on send()'));
}

try {
this.connection.send(JSON.stringify(payload));
} catch (error) {
reject(error);
this.removeListener('error', reject);

return reject(error);
}

if (this.timeout) {
Expand All @@ -230,6 +234,8 @@ export default class WebsocketProvider extends AbstractSocketProvider {
clearTimeout(timeout);
}

this.removeListener('error', reject);

return resolve(response);
});

Expand All @@ -238,8 +244,16 @@ export default class WebsocketProvider extends AbstractSocketProvider {

this.once('connect', () => {
this.sendPayload(payload)
.then(resolve)
.catch(reject);
.then((response) => {
this.removeListener('error', reject);

return resolve(response);
})
.catch((error) => {
this.removeListener('error', reject);

return reject(error);
});
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ describe('AbstractSocketProviderTest', () => {
abstractSocketProvider.removeAllListeners = jest.fn();

abstractSocketProvider.onError('not ready!');

expect(abstractSocketProvider.removeAllListeners).toHaveBeenCalled();
});

it('calls onClose and close event will be emitted', (done) => {
Expand Down

0 comments on commit ef84067

Please sign in to comment.