Skip to content

Commit

Permalink
fix Request.broadcast logic
Browse files Browse the repository at this point in the history
Add default timeout to Request.broadcas
Add default timeout to Remote.createPathFind
Handle 'slowDown' error in many places
  • Loading branch information
darkdarkdragon committed Oct 13, 2015
1 parent edb31a0 commit b56680e
Show file tree
Hide file tree
Showing 5 changed files with 440 additions and 366 deletions.
16 changes: 15 additions & 1 deletion src/core/remote.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ function Remote(options = {}) {
if (typeof this.submission_timeout !== 'number') {
throw new TypeError('submission_timeout must be a number');
}
if (typeof this.pathfind_timeout !== 'number') {
throw new TypeError('pathfind_timeout must be a number');
}
if (typeof this.automatic_resubmission !== 'boolean') {
throw new TypeError('automatic_resubmission must be a boolean');
}
Expand Down Expand Up @@ -197,6 +200,7 @@ Remote.DEFAULTS = {
max_fee: 1000000, // 1 XRP
max_attempts: 10,
submission_timeout: 1000 * 20,
pathfind_timeout: 1000 * 10,
automatic_resubmission: true,
last_ledger_offset: 3,
servers: [ ],
Expand Down Expand Up @@ -1852,8 +1856,15 @@ Remote.prototype.createPathFind = function(options, callback) {
}

if (callback) {
const updateTimeout = setTimeout(() => {
callback(new RippleError('tejTimeout'));
pathFind.close();
this._cur_path_find = null;
}, this.pathfind_timeout);

pathFind.on('update', (data) => {
if (data.full_reply && !data.closed) {
clearTimeout(updateTimeout);
this._cur_path_find = null;
callback(null, data);
// "A client can only have one pathfinding request open at a time.
Expand All @@ -1869,7 +1880,10 @@ Remote.prototype.createPathFind = function(options, callback) {
}
}
});
pathFind.on('error', callback);
pathFind.on('error', (error) => {
this._cur_path_find = null;
callback(error);
});
}

this._cur_path_find = pathFind;
Expand Down
103 changes: 87 additions & 16 deletions src/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,23 @@ Request.prototype.request = function(servers, callback_) {
// just in case
this.emit = _.noop;
this.cancel();
this.remote.removeListener('connected', doRequest);
}, this._timeout);

function onResponse() {
clearTimeout(timeout);
}

if (this.remote.isConnected()) {
this.remote.on('connected', doRequest);
}

this.once('response', onResponse);
function onRemoteError(error) {
self.emit('error', error);
}
this.remote.once('error', onRemoteError); // e.g. rate-limiting slowDown error

this.once('response', () => {
clearTimeout(timeout);
this.remote.removeListener('connected', doRequest);
this.remote.removeListener('error', onRemoteError);
});

doRequest();

Expand Down Expand Up @@ -126,6 +132,7 @@ Request.prototype.broadcast = function(isResponseSuccess = isResponseNotError) {
return this;
}

this.on('error', function() {});
let lastResponse = new Error('No servers available');
const connectTimeouts = { };
const emit = this.emit;
Expand All @@ -142,23 +149,44 @@ Request.prototype.broadcast = function(isResponseSuccess = isResponseNotError) {
}
};

let serversCallbacks = { };
let serversTimeouts = { };
let serversClearConnectHandlers = { };

function iterator(server, callback) {
// Iterator is called in parallel

if (server.isConnected()) {
// Listen for proxied success/error event and apply filter
self.once('proposed', function(res) {
lastResponse = res;
callback(isResponseSuccess(res));
});
const serverID = server.getServerID();

serversCallbacks[serverID] = callback;

function doRequest() {
return server._request(self);
}

if (server.isConnected()) {
const timeout = setTimeout(() => {
lastResponse = new RippleError('tejTimeout',
JSON.stringify(self.message));

server.removeListener('connect', doRequest);
delete serversCallbacks[serverID];
delete serversClearConnectHandlers[serverID];

callback(false);
}, self._timeout);

serversTimeouts[serverID] = timeout;
serversClearConnectHandlers[serverID] = function() {
server.removeListener('connect', doRequest);
};

server.on('connect', doRequest);
return doRequest();
}

// Server is disconnected but should reconnect. Wait for it to reconnect,
// and abort after a timeout
const serverID = server.getServerID();

function serverReconnected() {
clearTimeout(connectTimeouts[serverID]);
connectTimeouts[serverID] = null;
Expand All @@ -173,13 +201,59 @@ Request.prototype.broadcast = function(isResponseSuccess = isResponseNotError) {
server.once('connect', serverReconnected);
}

// Listen for proxied success/error event and apply filter
function onProposed(result, server) {
const serverID = server.getServerID();
lastResponse = result;

const callback = serversCallbacks[serverID];
delete serversCallbacks[serverID];

clearTimeout(serversTimeouts[serverID]);
delete serversTimeouts[serverID];

if (serversClearConnectHandlers[serverID] !== undefined) {
serversClearConnectHandlers[serverID]();
delete serversClearConnectHandlers[serverID];
}

if (callback !== undefined) {
callback(isResponseSuccess(result));
}
}

this.on('proposed', onProposed);

let complete_ = null;

// e.g. rate-limiting slowDown error
function onRemoteError(error) {
serversCallbacks = {};
_.forEach(serversTimeouts, clearTimeout);
serversTimeouts = {};
_.forEach(serversClearConnectHandlers, (handler) => {
handler();
});
serversClearConnectHandlers = {};

lastResponse = error instanceof RippleError ? error :
new RippleError(error);
complete_(false);
}

function complete(success) {
self.removeListener('proposed', onProposed);
self.remote.removeListener('error', onRemoteError);
// Emit success if the filter is satisfied by any server
// Emit error if the filter is not satisfied by any server
// Include the last response
emit.call(self, success ? 'success' : 'error', lastResponse);
}

complete_ = complete;

this.remote.once('error', onRemoteError);

const servers = this.remote._servers.filter(function(server) {
// Pre-filter servers that are disconnected and should not reconnect
return (server.isConnected() || server._shouldConnect)
Expand Down Expand Up @@ -241,7 +315,6 @@ Request.prototype.callback = function(callback, successEvent, errorEvent) {
let called = false;

function requestError(error) {
self.remote.removeListener('error', requestError);
if (!called) {
called = true;

Expand All @@ -254,14 +327,12 @@ Request.prototype.callback = function(callback, successEvent, errorEvent) {
}

function requestSuccess(message) {
self.remote.removeListener('error', requestError);
if (!called) {
called = true;
callback.call(self, null, message);
}
}

this.remote.once('error', requestError); // e.g. rate-limiting slowDown error
this.once(this.successEvent, requestSuccess);
this.once(this.errorEvent, requestError);

Expand Down
29 changes: 24 additions & 5 deletions src/core/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,21 @@ Server.prototype.connect = function() {
self.emit('message', message);
};

function onRemoteError() {}

ws.onopen = function onOpen() {
if (ws === self._ws) {
self.emit('socket_open');

// e.g. rate-limiting slowDown error
self._remote.once('error', onRemoteError);

// Subscribe to events
self._request(self._remote._serverPrepareSubscribe(self));
const request = self._remote._serverPrepareSubscribe(self);
request.once('response', () => {
self._remote.removeListener('error', onRemoteError);
});
self._request(request);
}
};

Expand Down Expand Up @@ -676,7 +686,7 @@ Server.prototype._handleResponse = function(message) {
const result = message.result;
const responseEvent = 'response_' + command;

request.emit('success', result);
request.emit('success', result, this);

[this, this._remote].forEach(function(emitter) {
emitter.emit(responseEvent, result, request, message);
Expand All @@ -690,9 +700,9 @@ Server.prototype._handleResponse = function(message) {
error: 'remoteError',
error_message: 'Remote reported an error.',
remote: message
});
}, this);
}
request.emit('response', message);
request.emit('response', message, this);
};

Server.prototype._handlePathFind = function(message) {
Expand Down Expand Up @@ -800,7 +810,16 @@ Server.prototype._sendMessage = function(message) {
if (this._remote.trace) {
log.info(this.getServerID(), 'request:', message);
}
this._ws.send(JSON.stringify(message));
this._ws.send(JSON.stringify(message), (error) => {
// sometimes gives 'not opened'
// without callback it wil throw
if (error) {
// resend in case of error
this.once('connect', () => {
this._sendMessage(message);
});
}
});
}
};

Expand Down

0 comments on commit b56680e

Please sign in to comment.