Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge pull request #366 from bookchin/master
Browse files Browse the repository at this point in the history
v3.1.0
  • Loading branch information
bookchin committed Aug 29, 2016
2 parents dba60b2 + a175a98 commit af28691
Show file tree
Hide file tree
Showing 26 changed files with 566 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .jshintrc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"undef": true,
"unused": true,
"maxparams": 4,
"maxstatements": 12,
"maxstatements": 14,
"maxcomplexity": 6,
"maxdepth": 3,
"maxlen": 80,
Expand Down
20 changes: 11 additions & 9 deletions doc/data-channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,9 @@ channel.addEventListener('open', function() {
channel.send(new Blob([/* ... */]));
});
channel.addEventListener('message', function(e) {
var data = JSON.parse(e.data);
if (data.code && data.code !== 200) {
console.error('Error consigning data:', data.message);
channel.addEventListener('close', function(e) {
if (e.code !== 1000) {
console.error('Error consigning data:', e.reason);
} else {
console.log('Successfully consigned data!');
}
Expand Down Expand Up @@ -116,11 +114,15 @@ channel.addEventListener('message', function(e) {
fileparts.push(e.data);
});
channel.addEventListener('close', function() {
var file = new Blob(fileparts, { type: '<mime_type>' });
var url = URL.createObjectURL(file);
channel.addEventListener('close', function(e) {
if (e.code !== 1000) {
console.error(e.reason);
} else {
var file = new Blob(fileparts, { type: '<mime_type>' });
var url = URL.createObjectURL(file);
location.href = url;
location.href = url;
}
});
```

Expand Down
20 changes: 20 additions & 0 deletions doc/tunnel-connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ nodes do not need to use the tunnel to contact other nodes on the network, but
rather only *to be contacted*.

### Tunneling Diagram

![assets/tunneling.png](assets/tunneling.png)

### Announcing Willingness
Expand Down Expand Up @@ -208,6 +209,25 @@ appropriate signal format and write the result back to the tunnel server. The
tunnel server must then use the signal metadata to multiplex the streams out to
their respective data channel originators.

When the loopback data channel connection is terminated on the farmer side, a
special message needs to be sent to the tunneler to indicate that the
connection should be closed. In this case the tunneled node will send a JSON
payload as the last frame that includes a `code` and `message`. This frame
should be tagged as a textual frame to differentiate it from previous parts of
the transferred data.

An example of this JSON in the termination signal might be:

```
{"code":1000,"message":"Finished"}
```

So serialized in the multiplexed tunnel format, that would look like:

```
<Buffer 0d 01 9d b4 a0 58 5f 31 65 22 3a 22 46 69 6e 69 73 68 65 64 22 7d ... >
```

### Reference

* {@link TunnelClient}
Expand Down
25 changes: 17 additions & 8 deletions lib/bridge-client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,15 @@ BridgeClient.prototype.addShardToFileStagingFrame = function(f, s, opt, cb) {

if (typeof arguments[2] === 'function') {
cb = opt;
opt = { retry: 6 };
opt = { retry: 24 };
}

function _addShard() {
self._logger.info('Adding shard metadata for %s to frame', s.hash);
self._logger.info(
'Querying bridge for contract for %s (retry: %s)',
s.hash,
retries
);

pendingReq = self._request('PUT', '/frames/' + f, s, function(err, result) {
if (err) {
Expand Down Expand Up @@ -625,7 +629,7 @@ BridgeClient.prototype._transferShard = function(evt, name, pointer, state) {
evt.emit('retry', name, pointer);
}

state.on('killed', client.removeAllListeners.bind('client'));
state.on('killed', client.removeAllListeners.bind(client));
client.on('error', _onErr).on('open', function() {
self._logger.info('Data channel opened, transferring shard...');

Expand All @@ -634,15 +638,20 @@ BridgeClient.prototype._transferShard = function(evt, name, pointer, state) {
pointer.hash
);

shardFile.pipe(datachannel).on('error', _onErr).on('finish', function() {
function _onStateKilled() {
shardFile.unpipe(datachannel);
datachannel.end();
datachannel.destroy();
evt.emit('finish');
});
}

state.on('killed', function() {
shardFile.unpipe(datachannel);
datachannel.removeAllListeners();
state.dataChannels.push(datachannel);
shardFile.pipe(datachannel).on('error', _onErr).on('finish', function() {
state.removeListener('killed', _onStateKilled);
evt.emit('finish');
});

state.on('killed', _onStateKilled);
});

return evt;
Expand Down
7 changes: 6 additions & 1 deletion lib/bridge-client/upload-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var EventEmitter = require('events').EventEmitter;
* @param {Function} options.onComplete - Reference to callback after complete
*/
function UploadState(options) {
/* jshint maxstatements: 14 */
/* jshint maxstatements: 16 */
if (!(this instanceof UploadState)) {
return new UploadState(options);
}
Expand All @@ -36,6 +36,7 @@ function UploadState(options) {
this.concurrency = options.concurrency;
this.queue = async.queue(options.worker, this.concurrency);
this.killed = false;
this.dataChannels = [];

EventEmitter.call(this);
this.setMaxListeners(0);
Expand Down Expand Up @@ -64,6 +65,10 @@ UploadState.prototype.cleanup = function() {
}
});

this.dataChannels.forEach(function(channel) {
channel.destroy();
});

this.queue.kill();
this.emit('killed');
this.removeAllListeners();
Expand Down
4 changes: 1 addition & 3 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,5 @@ module.exports = {
/** @constant {Number} MAX_CONCURRENT_AUDITS - Number of concurrent audits */
MAX_CONCURRENT_AUDITS: 3,
/** @constant MAX_FIND_TUNNEL_RELAYS - Max times to relay FIND_TUNNEL */
MAX_FIND_TUNNEL_RELAYS: 2,
/** @constant {Number} SUBSCRIBE_THROTTLE - Wait between opcode subscribe */
SUBSCRIBE_THROTTLE: 3000
MAX_FIND_TUNNEL_RELAYS: 2
};
16 changes: 16 additions & 0 deletions lib/data-channels/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ function ReadableDataChannelStream(channel, token, hash) {
this._channel = channel;
this._token = token;
this._hash = hash;
this._isDestroyed = false;

this.isAuthenticated = false;

Expand Down Expand Up @@ -71,4 +72,19 @@ ReadableDataChannelStream.prototype._read = function() {
}
};

/**
* Closes the underlying connection
* @returns {Boolean} didDestroy - Indicates if the stream was destroyed
*/
ReadableDataChannelStream.prototype.destroy = function() {
if (this._isDestroyed) {
return false;
}

this._channel._client.terminate();
this._isDestroyed = true;

return true;
};

module.exports = ReadableDataChannelStream;
5 changes: 4 additions & 1 deletion lib/data-channels/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ DataChannelServer.prototype.reject = function(token) {
var client = this._allowed[token].client;

if ([ws.CONNECTING, ws.OPEN].indexOf(client.readyState) !== -1) {
this._allowed[token].client.close();
this._allowed[token].client.close(
DataChannelErrors.UNAUTHORIZED_TOKEN,
'The authorization token was rejected'
);
}
}

Expand Down
20 changes: 19 additions & 1 deletion lib/data-channels/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function WritableDataChannelStream(channel, token, hash) {
this._channel = channel;
this._token = token;
this._hash = hash;
this._isDestroyed = false;

this.isAuthenticated = false;

Expand Down Expand Up @@ -109,10 +110,27 @@ WritableDataChannelStream.prototype._sendData = function(chunk, next) {
*/
WritableDataChannelStream.prototype._handleClosed = function(flush, code, msg) {
if (code !== 1000) {
return flush(new Error(msg));
var err = new Error(msg || 'Unspecified error occurred'); err.code = code;

return flush(err);
}

flush(null);
};

/**
* Closes the underlying connection
* @returns {Boolean} didDestroy - Indicates if the stream was destroyed
*/
WritableDataChannelStream.prototype.destroy = function() {
if (this._isDestroyed) {
return false;
}

this._channel._client.terminate();
this._isDestroyed = true;

return true;
};

module.exports = WritableDataChannelStream;
31 changes: 27 additions & 4 deletions lib/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ Network.prototype.join = function(callback) {

async.series(
[
utils.ensureNtpClockIsSynchronized,
this._warnIfClockNotSynced.bind(this), // TODO: Make this not fail hard
this.manager.open.bind(this.manager),
this._setupTunnelClient.bind(this),
],
Expand Down Expand Up @@ -519,13 +519,17 @@ Network.prototype._verifySignature = function(options, callback) {
ecdsa.hashbuf = signedmsg.magicHash();
ecdsa.sig = options.signobj;

this._pubkeys[options.contact.nodeID] = ecdsa.toPublicKey();
try {
this._pubkeys[options.contact.nodeID] = ecdsa.toPublicKey();
} catch (err) {
return callback(err);
}

if (!signedmsg.verify(options.address, options.signature)) {
return callback(new Error('Signature verification failed'));
}

callback();
callback(null);
};

/**
Expand Down Expand Up @@ -767,7 +771,8 @@ Network.prototype._establishTunnel = function(tunnels, callback) {

self._tunclient = new TunnelClient(
tunnel,
'http://127.0.0.1:' + localAddress.port
'http://127.0.0.1:' + localAddress.port,
{ logger: self._logger }
);

self._tunclient.on('open', function() {
Expand Down Expand Up @@ -864,4 +869,22 @@ Network.prototype._updateActivityCounter = function() {
);
};

/**
* Warns the user if their clock is not synchronized with NTP server
* @private
*/
Network.prototype._warnIfClockNotSynced = function(callback) {
var self = this;

utils.ensureNtpClockIsSynchronized(function(err, delta) {
if (err) {
self._logger.warn(err.message);
} else {
self._logger.info('clock is synchronized with ntp, delta: %s', delta);
}

callback(null);
});
};

module.exports = Network;
27 changes: 23 additions & 4 deletions lib/network/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,31 @@ Protocol.prototype._handleMirror = function(params, callback) {

var dcx = new DataChannelClient(params.farmer);

dcx.on('error', callback).on('open', function() {
function _onChannelError(err) {
dcx.removeAllListeners();
self._logger.error(
'failed to open datachannel for mirroring, reason: %s',
err.message
);
callback(err);
}

function _onChannelOpen() {
var rs = dcx.createReadStream(token, hash);

self._logger.info('datachannel successfully established for mirror');
dcx.removeAllListeners('error');
dcx.createReadStream(token, hash).pipe(item.shard);
dcx.removeListener('error', _onChannelError);

rs.on('error', function _onStreamError(err) {
self._logger.error('failed to read from mirror node: %s', err.message);
rs.unpipe(item.shard);
item.shard.destroy();
}).pipe(item.shard);

callback(null, {});
});
}

dcx.on('error', _onChannelError).on('open', _onChannelOpen);
});
};

Expand Down
3 changes: 2 additions & 1 deletion lib/network/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ Transport.prototype._bindTunnelServer = function() {
this._tunserver = new TunnelServer({
maxTunnels: this._maxTunnels,
port: this._tunport,
portRange: this._gateways
portRange: this._gateways,
logger: self._log
});

this._tunserver.on('ready', function() {
Expand Down
2 changes: 1 addition & 1 deletion lib/storage/adapters/level/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function LevelDBStorageAdapter(path, backend) {

LevelDBStorageAdapter.SIZE_START_KEY = '0';
LevelDBStorageAdapter.SIZE_END_KEY = 'z';
LevelDBStorageAdapter.MAX_OPEN_FILES = 500;
LevelDBStorageAdapter.MAX_OPEN_FILES = 1000;

inherits(LevelDBStorageAdapter, StorageAdapter);

Expand Down
Loading

0 comments on commit af28691

Please sign in to comment.