Skip to content

Commit

Permalink
push socket creation into connect and listen methods
Browse files Browse the repository at this point in the history
  • Loading branch information
aprock committed Dec 28, 2015
1 parent 0d08f22 commit bceff85
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 102 deletions.
21 changes: 13 additions & 8 deletions TcpServer.js
Expand Up @@ -22,20 +22,21 @@ function TcpServer(connectionListener: (socket: Socket) => void) {
return new TcpServer(connectionListener);
}

// $FlowFixMe: suppressing this error flow doesn't like EventEmitter
EventEmitter.call(this);
if (EventEmitter instanceof Function) {
EventEmitter.call(this);
}

var self = this;

this._socket = new Socket();

// $FlowFixMe: suppressing this error flow doesn't like EventEmitter
this._socket.on('connect', function() {
self.emit('listening');
});
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter
this._socket.on('connection', function(socket) {
self._connections++;

self.emit('connection', socket);
});
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter
Expand All @@ -45,7 +46,6 @@ function TcpServer(connectionListener: (socket: Socket) => void) {
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter
this._socket.on('error', function(error) {
self.emit('error', error);
self._socket.destroy();
});

if (typeof connectionListener === 'function') {
Expand All @@ -64,16 +64,21 @@ TcpServer.prototype._debug = function() {
}
};

TcpServer.prototype.listen = function(options: { port: number, hostname: ?string }, callback: ?() => void) : TcpServer {
// TODO : determine how to properly overload this with flow
TcpServer.prototype.listen = function() : TcpServer {
var args = this._socket._normalizeConnectArgs(arguments);
var options = args[0];
var callback = args[1];

var port = options.port;
var hostname = options.hostname || 'localhost';
var host = options.host || 'localhost';

if (callback) {
this.on('listening', callback);
}

Sockets.createSocket(this._socket._id);
Sockets.listen(this._socket._id, hostname, port);
this._socket._registerEvents();
Sockets.listen(this._socket._id, host, port);

return this;
};
Expand Down
120 changes: 73 additions & 47 deletions TcpSocket.js
Expand Up @@ -19,45 +19,42 @@ var Sockets = NativeModules.TcpSockets;
var base64 = require('base64-js');
var Base64Str = require('./base64-str');
var noop = function () {};
var usedIds = [];
var instances = 0;
var STATE = {
DISCONNECTED: 0,
CONNECTING: 1,
CONNECTED: 2
};

function TcpSocket(options: ?{ id: ?number }) {
// $FlowFixMe: suppressing this error flow doesn't like EventEmitter
EventEmitter.call(this);
if (!(this instanceof TcpSocket)) {
return new TcpSocket(options);
}

if (EventEmitter instanceof Function) {
EventEmitter.call(this);
}

if (options && options.id) {
// native generated sockets range from 5000-6000
// e.g. incoming server connections
this._id = Number(options.id);

if (usedIds.indexOf(this._id) !== -1) {
if (this._id <= instances) {
throw new Error('Socket id ' + this._id + 'already in use');
}
} else {
// javascript generated sockets range from 1-1000
this._id = Math.floor((Math.random() * 1000) + 1);
while (usedIds.indexOf(this._id) !== -1) {
this._id = Math.floor((Math.random() * 1000) + 1);
}
this._id = instances++;
}

usedIds.push(this._id);

// these will be set once there is a connection
this.readable = this.writable = false;

this._registerEvents();

// ensure compatibility with node's EventEmitter
if (!this.on) {
this.on = this.addListener.bind(this);
}

// these will be set once there is a connection
this.writable = this.readable = false;

this._state = STATE.DISCONNECTED;
}

Expand All @@ -71,24 +68,21 @@ TcpSocket.prototype._debug = function() {
}
};

TcpSocket.prototype.connect = function(options: ?{ port: ?number | ?string, host: ?string, localAddress: ?string, localPort: ?number }, callback: ?() => void) {
if (this._state !== STATE.DISCONNECTED) {
throw new Error('Socket is already bound');
// TODO : determine how to properly overload this with flow
TcpSocket.prototype.connect = function(options, callback) : TcpSocket {
this._registerEvents();

if (options === null || typeof options !== 'object') {
// Old API:
// connect(port, [host], [cb])
var args = this._normalizeConnectArgs(arguments);
return TcpSocket.prototype.connect.apply(this, args);
}

if (typeof callback === 'function') {
this.once('connect', callback);
}

if (!options) {
options = {
host: 'localhost',
port: 0,
localAddress: null,
localPort: null
};
}

var host = options.host || 'localhost';
var port = options.port || 0;
var localAddress = options.localAddress;
Expand All @@ -107,7 +101,7 @@ TcpSocket.prototype.connect = function(options: ?{ port: ?number | ?string, host
throw new TypeError('"port" option should be a number or string: ' + port);
}

port = Number(port);
port = +port;

if (!isLegalPort(port)) {
throw new RangeError('"port" option should be >= 0 and < 65536: ' + port);
Expand All @@ -117,8 +111,10 @@ TcpSocket.prototype.connect = function(options: ?{ port: ?number | ?string, host
this._state = STATE.CONNECTING;
this._debug('connecting, host:', host, 'port:', port);

Sockets.createSocket(this._id);
this._destroyed = false;
Sockets.connect(this._id, host, Number(port), options);

return this;
};

// Check that the port number is not NaN when coerced to a number,
Expand Down Expand Up @@ -181,6 +177,10 @@ TcpSocket.prototype.destroy = function() {
};

TcpSocket.prototype._registerEvents = function(): void {
if (this._subs && this._subs.length > 0) {
return;
}

this._subs = [
DeviceEventEmitter.addListener(
'tcp-' + this._id + '-connect', this._onConnect.bind(this)
Expand All @@ -207,27 +207,20 @@ TcpSocket.prototype._unregisterEvents = function(): void {
this._subs = [];
};

TcpSocket.prototype._onConnect = function(address: { port: string, address: string, family: string }): void {
TcpSocket.prototype._onConnect = function(address: { port: number, address: string, family: string }): void {
this._debug('received', 'connect');

this.writable = this.readable = true;
this._state = STATE.CONNECTED;
this._address = address;
this._address.port = Number(this._address.port);

setConnected(this, address);
this.emit('connect');
};

TcpSocket.prototype._onConnection = function(info: { id: number, address: { port: string, address: string, family: string } }): void {
TcpSocket.prototype._onConnection = function(info: { id: number, address: { port: number, address: string, family: string } }): void {
this._debug('received', 'connection');

var socket = new TcpSocket({ id: info.id });

socket.writable = this.readable = true;
socket._state = STATE.CONNECTED;
socket._address = info.address;
socket._address.port = Number(socket._address.port);

socket._registerEvents();
setConnected(socket, info.address);
this.emit('connection', socket);
};

Expand All @@ -250,17 +243,14 @@ TcpSocket.prototype._onData = function(data: string): void {
TcpSocket.prototype._onClose = function(hadError: boolean): void {
this._debug('received', 'close');

this._unregisterEvents();

this._state = STATE.DISCONNECTED;

this.emit('close', hadError);
setDisconnected(this, hadError);
};

TcpSocket.prototype._onError = function(error: string): void {
this._debug('received', 'error');

this.emit('error', normalizeError(error));
this.destroy();
};

TcpSocket.prototype.write = function(buffer: any, callback: ?(err: ?Error) => void) : boolean {
Expand Down Expand Up @@ -303,6 +293,22 @@ TcpSocket.prototype.write = function(buffer: any, callback: ?(err: ?Error) => vo
return true;
};

function setConnected(socket: TcpSocket, address: { port: number, address: string, family: string } ) {
socket.writable = socket.readable = true;
socket._state = STATE.CONNECTED;
socket._address = address;
}

function setDisconnected(socket: TcpSocket, hadError: boolean): void {
if (socket._state === STATE.DISCONNECTED) {
return;
}

socket._unregisterEvents();
socket._state = STATE.DISCONNECTED;
socket.emit('close', hadError);
}

function normalizeError(err) {
if (err) {
if (typeof err === 'string') {
Expand All @@ -313,4 +319,24 @@ function normalizeError(err) {
}
}

// Returns an array [options] or [options, cb]
// It is the same as the argument of Socket.prototype.connect().
TcpSocket.prototype._normalizeConnectArgs = function(args) {
var options = {};

if (args[0] !== null && typeof args[0] === 'object') {
// connect(options, [cb])
options = args[0];
} else {
// connect(port, [host], [cb])
options.port = args[0];
if (typeof args[1] === 'string') {
options.host = args[1];
}
}

var cb = args[args.length - 1];
return typeof cb === 'function' ? [options, cb] : [options];
};

module.exports = TcpSocket;
6 changes: 3 additions & 3 deletions TcpSockets.js
Expand Up @@ -17,10 +17,10 @@ exports.createServer = function(connectionListener: (socket: Socket) => void) :
return new Server(connectionListener);
};

exports.connect = exports.createConnection = function(options: ?{ port: ?number | ?string, host: ?string, localAddress: ?string, localPort: ?number}, callback: ?() => void) : Socket {
// TODO : determine how to properly overload this with flow
exports.connect = exports.createConnection = function() : Socket {
var tcpSocket = new Socket();
tcpSocket.connect(options, callback);
return tcpSocket;
return Socket.prototype.connect.apply(tcpSocket, tcpSocket._normalizeConnectArgs(arguments));
};

exports.isIP = function(input: string) : number {
Expand Down
30 changes: 21 additions & 9 deletions examples/rctsockets/.flowconfig
Expand Up @@ -7,12 +7,24 @@
# Some modules have their own node_modules with overlap
.*/node_modules/node-haste/.*

# Ignore react-tools where there are overlaps, but don't ignore anything that
# react-native relies on
.*/node_modules/react-tools/src/React.js
.*/node_modules/react-tools/src/renderers/shared/event/EventPropagators.js
.*/node_modules/react-tools/src/renderers/shared/event/eventPlugins/ResponderEventPlugin.js
.*/node_modules/react-tools/src/shared/vendor/core/ExecutionEnvironment.js
# Ugh
.*/node_modules/babel.*
.*/node_modules/babylon.*
.*/node_modules/invariant.*

# Ignore react and fbjs where there are overlaps, but don't ignore
# anything that react-native relies on
.*/node_modules/fbjs-haste/.*/__tests__/.*
.*/node_modules/fbjs-haste/__forks__/Map.js
.*/node_modules/fbjs-haste/__forks__/Promise.js
.*/node_modules/fbjs-haste/__forks__/fetch.js
.*/node_modules/fbjs-haste/core/ExecutionEnvironment.js
.*/node_modules/fbjs-haste/core/isEmpty.js
.*/node_modules/fbjs-haste/crypto/crc32.js
.*/node_modules/fbjs-haste/stubs/ErrorUtils.js
.*/node_modules/react-haste/React.js
.*/node_modules/react-haste/renderers/dom/ReactDOM.js
.*/node_modules/react-haste/renderers/shared/event/eventPlugins/ResponderEventPlugin.js

# Ignore commoner tests
.*/node_modules/commoner/test/.*
Expand Down Expand Up @@ -43,9 +55,9 @@ suppress_type=$FlowIssue
suppress_type=$FlowFixMe
suppress_type=$FixMe

suppress_comment=\\(.\\|\n\\)*\\$FlowFixMe\\($\\|[^(]\\|(\\(>=0\\.\\(1[0-7]\\|[0-9]\\).[0-9]\\)? *\\(site=[a-z,_]*react_native[a-z,_]*\\)?)\\)
suppress_comment=\\(.\\|\n\\)*\\$FlowIssue\\((\\(>=0\\.\\(1[0-7]\\|[0-9]\\).[0-9]\\)? *\\(site=[a-z,_]*react_native[a-z,_]*\\)?)\\)? #[0-9]+
suppress_comment=\\(.\\|\n\\)*\\$FlowFixMe\\($\\|[^(]\\|(\\(>=0\\.\\(2[0-0]\\|1[0-9]\\|[0-9]\\).[0-9]\\)? *\\(site=[a-z,_]*react_native[a-z,_]*\\)?)\\)
suppress_comment=\\(.\\|\n\\)*\\$FlowIssue\\((\\(>=0\\.\\(2[0-0]\\|1[0-9]\\|[0-9]\\).[0-9]\\)? *\\(site=[a-z,_]*react_native[a-z,_]*\\)?)\\)?:? #[0-9]+
suppress_comment=\\(.\\|\n\\)*\\$FlowFixedInNextDeploy

[version]
0.17.0
0.20.1

0 comments on commit bceff85

Please sign in to comment.