Skip to content

Commit

Permalink
Merge pull request #601 from clark800/replace-core
Browse files Browse the repository at this point in the history
Add Connection class
  • Loading branch information
clark800 committed Oct 20, 2015
2 parents 5fa20dc + 15eb4c2 commit 01ecd19
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 0 deletions.
192 changes: 192 additions & 0 deletions src/api/common/connection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
'use strict';
const {EventEmitter} = require('events');
const WebSocket = require('ws');

function isStreamMessageType(type) {
return type === 'ledgerClosed' ||
type === 'transaction' ||
type === 'path_find';
}

class RippledError extends Error {
constructor(message) {
super(message);
this.name = this.constructor.name;
this.message = message;
Error.captureStackTrace(this, this.constructor.name);
}
}

class ConnectionError extends Error {
constructor(message) {
super(message);
this.name = this.constructor.name;
this.message = message;
Error.captureStackTrace(this, this.constructor.name);
}
}

class DisconnectedError extends ConnectionError {
constructor(message) {
super(message);
}
}

class TimeoutError extends ConnectionError {
constructor(message) {
super(message);
}
}

class UnexpectedError extends ConnectionError {
constructor(message) {
super(message);
}
}

class Connection extends EventEmitter {
constructor(url, options = {}) {
super();
this._url = url;
this._timeout = options.timeout || (20 * 1000);
this._ws = null;
this._nextRequestID = 1;
}

_onMessage(message) {
try {
const data = JSON.parse(message);
if (data.type === 'response') {
if (!(Number.isInteger(data.id) && data.id >= 0)) {
throw new UnexpectedError('valid id not found in response');
}
this.emit(data.id.toString(), data);
} else if (isStreamMessageType(data.type)) {
this.emit(data.type, data);
} else if (data.type === undefined && data.error) {
this.emit('error', data.error, data.error_message); // e.g. slowDown
} else {
throw new UnexpectedError('unrecognized message type: ' + data.type);
}
} catch (error) {
this.emit('error', 'badMessage', message);
}
}

get state() {
return this._ws ? this._ws.readyState : WebSocket.CLOSED;
}

_onUnexpectedClose() {
this.connect().then();
}

connect() {
return new Promise((resolve) => {
if (this.state === WebSocket.OPEN) {
resolve();
} else if (this.state === WebSocket.CONNECTING) {
this._ws.once('open', resolve);
} else {
this._ws = new WebSocket(this._url);
this._ws.on('message', this._onMessage.bind(this));
this._ws.once('close', () => this._onUnexpectedClose);
this._ws.once('open', resolve);
}
});
}

disconnect() {
return new Promise((resolve) => {
if (this.state === WebSocket.CLOSED) {
resolve();
} else if (this.state === WebSocket.CLOSING) {
this._ws.once('close', resolve);
} else {
this._ws.removeListener('close', this._onUnexpectedClose);
this._ws.once('close', resolve);
this._ws.close();
}
});
}

reconnect() {
return this.disconnect().then(() => this.connect());
}

_send(message) {
return new Promise((resolve, reject) => {
this._ws.send(message, undefined, (error, result) => {
if (error) {
reject(new DisconnectedError(error.message));
} else {
resolve(result);
}
});
});
}

_sendWhenReady(message) {
return new Promise((resolve, reject) => {
if (this.state === WebSocket.OPEN) {
this._send(message).then(resolve, reject);
} else {
this._ws.once('open', () => this._send(message).then(resolve, reject));
}
});
}

request(request, timeout) {
return new Promise((resolve, reject) => {
let timer = null;
const self = this;
const id = this._nextRequestID;
this._nextRequestID += 1;
const eventName = id.toString();

function onDisconnect() {
clearTimeout(timer);
self.removeAllListeners(eventName);
reject(new DisconnectedError());
}

function cleanup() {
clearTimeout(timer);
self.removeAllListeners(eventName);
self._ws.removeListener('close', onDisconnect);
}

function _resolve(response) {
cleanup();
resolve(response);
}

function _reject(error) {
cleanup();
reject(error);
}

this.once(eventName, response => {
if (response.status === 'error') {
_reject(new RippledError(response.error));
} else if (response.status === 'success') {
_resolve(response.result);
} else {
_reject(new UnexpectedError(
'unrecognized status: ' + response.status));
}
});

this._ws.once('close', onDisconnect);

const message = JSON.stringify(Object.assign({}, request, {id}));

this._sendWhenReady(message).then(() => {
const delay = timeout || this._timeout;
timer = setTimeout(() => _reject(new TimeoutError()), delay);
}).catch(_reject);
});
}
}

module.exports = Connection;
1 change: 1 addition & 0 deletions src/api/common/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
const utils = require('./utils');

module.exports = {
Connection: require('./connection'),
core: utils.core,
constants: require('./constants'),
errors: require('./errors'),
Expand Down
48 changes: 48 additions & 0 deletions test/integration/connection-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';
const Connection = require('../../src/api/common/connection');

const request1 = {
command: 'server_info'
};

const request2 = {
command: 'account_info',
account: 'r9cZA1mLK5R5Am25ArfXFmqgNwjZgnfk59'
};

const request3 = {
command: 'account_info'
};

const request4 = {
command: 'account_info',
account: 'invalid'
};

function makeRequest(connection, request) {
return connection.request(request).then((response) => {
console.log(request);
console.log(JSON.stringify(response, null, 2));
}).catch((error) => {
console.log(request);
console.log(error);
});
}

function main() {
const connection = new Connection('wss://s1.ripple.com');
connection.connect().then(() => {
console.log('Connected');
Promise.all([
makeRequest(connection, request1),
makeRequest(connection, request2),
makeRequest(connection, request3),
makeRequest(connection, request4)
]).then(() => {
console.log('Done');
process.exit();
});
});
}

main();

0 comments on commit 01ecd19

Please sign in to comment.