Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge pull request #285 from bookchin/master
Browse files Browse the repository at this point in the history
v2.0.6
  • Loading branch information
bookchin committed Jul 25, 2016
2 parents dd1ecf5 + 2356fd5 commit 891774e
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 76 deletions.
6 changes: 6 additions & 0 deletions index.js
Expand Up @@ -35,6 +35,9 @@ module.exports.DataChannelServer = require('./lib/datachannel/server');
/** {@link DataChannelPointer} */
module.exports.DataChannelPointer = require('./lib/datachannel/pointer');

/** {@link DataChannelErrors} */
module.exports.DataChannelErrors = require('./lib/datachannel/errorcodes');

/** {@link Protocol} */
module.exports.Protocol = require('./lib/network/protocol');

Expand All @@ -47,6 +50,9 @@ module.exports.TunnelMuxer = require('./lib/tunnel/multiplexer');
/** {@link TunnelDemuxer} */
module.exports.TunnelDemuxer = require('./lib/tunnel/demultiplexer');

/** {@link TunnelErrors} */
module.exports.TunnelErrors = require('./lib/tunnel/errorcodes');

/** {@link TunnelClient} */
module.exports.TunnelClient = require('./lib/tunnel/client');

Expand Down
23 changes: 15 additions & 8 deletions lib/bridgeclient.js
Expand Up @@ -31,6 +31,8 @@ var async = require('async');
* @param {KeyPair} options.keypair - KeyPair instance for request signing
* @param {Object} options.logger - Logger instance
* @param {Number} options.concurrency - Upload concurrency limit
* @param {Number} options.transferRetries - Limit number of shard transfer
* retries before getting a new contract
* @param {Object} options.basicauth
* @param {String} options.basicauth.email - Email address for HTTP basic auth
* @param {String} options.basicauth.password - Password for HTTP basic auth
Expand All @@ -53,7 +55,8 @@ BridgeClient.prototype._checkOptions = function(uri, options) {
options = merge({
baseURI: uri || process.env.STORJ_BRIDGE || 'https://api.storj.io',
logger: new kad.Logger(0),
concurrency: 6
concurrency: 6,
transferRetries: 3
}, options);

assert.ok(utils.validateLogger(options.logger), 'Invalid logger supplied');
Expand Down Expand Up @@ -404,7 +407,8 @@ BridgeClient.prototype._handleShardStream = function(shard, i, frame, state) {
size: 0,
index: i,
hasher: crypto.createHash('sha256'),
excludeFarmers: []
excludeFarmers: [],
transferRetries: 0
};
var tmpFile = fs.createWriteStream(meta.tmpName);
var passthrough = new stream.PassThrough();
Expand Down Expand Up @@ -475,16 +479,18 @@ BridgeClient.prototype._startTransfer = function(pointer, state, meta, done) {
self._logger.info('Contract negotiated with: %j', pointer.farmer);

transferStatus.on('retry', function() {
if (transferStatus._eventsCount < 3) {
if (meta.transferRetries < self._options.transferRetries) {
meta.transferRetries++;
self._logger.info('Retrying shard transfer, pointer: %j', pointer);
self._transferShard(emitter, meta.tmpName, pointer);
} else {
self._logger.info(
'Shard transfer failed %s times, getting another contract...',
3
meta.transferRetries
);
transferStatus.removeAllListeners();
meta.excludeFarmers.push(pointer.farmer.nodeID);
meta.transferRetries = 0;
self._handleShardTmpFileFinish(state, meta, done);
}
});
Expand Down Expand Up @@ -543,20 +549,21 @@ BridgeClient.prototype._transferShard = function(emitter, name, pointer, cb) {
var shardFile = fs.createReadStream(name);
var client = new DataChannelClient(Contact(pointer.farmer));

client.on('error', function(err) {
function _onErr(err) {
self._logger.warn('Failed to transfer shard, reason: %s', err.message);
client.removeAllListeners();
emitter.emit('retry', name, pointer, cb);
});
}

client.on('open', function() {
client.on('error', _onErr).on('open', function() {
self._logger.info('Data channel opened, transferring shard...');

var datachannel = client.createWriteStream(
pointer.token,
pointer.hash
);

shardFile.pipe(datachannel).on('finish', function() {
shardFile.pipe(datachannel).on('error', _onErr).on('finish', function() {
emitter.emit('finish');
});
});
Expand Down
14 changes: 0 additions & 14 deletions lib/network/index.js
Expand Up @@ -505,20 +505,6 @@ Network.prototype._listenForTunnelers = function() {
announce();
}

this._transport._tunserver.on('locked', function() {
self._pubsub.publish(
Buffer.concat([prefix, unavailable]).toString('hex'),
self._contact
);
});

this._transport._tunserver.on('unlocked', function() {
self._pubsub.publish(
Buffer.concat([prefix, available]).toString('hex'),
self._contact
);
});

this._pubsub.subscribe(
Buffer.concat([prefix, available]).toString('hex'),
function(contact) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "storj",
"version": "2.0.5",
"version": "2.0.6",
"description": "implementation of the storj protocol for node.js and the browser",
"main": "index.js",
"directories": {
Expand Down
13 changes: 10 additions & 3 deletions test/bridgeclient.unit.js
Expand Up @@ -1371,7 +1371,15 @@ describe('BridgeClient', function() {
client,
'_shardTransferComplete'
).callsArg(2);
client._startTransfer(pointer, {}, {}, function() {
client._startTransfer(pointer, {}, {
frame: 'frame',
tmpName: 'tmpname',
size: 0,
index: 0,
hasher: crypto.createHash('sha256'),
excludeFarmers: [],
transferRetries: 0
}, function() {
_transferShard.restore();
_transferComplete.restore();
expect(_transferShard.callCount).to.equal(2);
Expand All @@ -1387,7 +1395,6 @@ describe('BridgeClient', function() {

it('should get a new contract if transfer fails 3 times', function(done) {
var _transferStatus = new EventEmitter();
_transferStatus._eventsCount = 3;
var _kill = sinon.stub();
var client = new BridgeClient();
var pointer = {
Expand All @@ -1408,7 +1415,7 @@ describe('BridgeClient', function() {
client._startTransfer(pointer, {
queue: { kill: _kill },
callback: sinon.stub()
}, { excludeFarmers: [] });
}, { excludeFarmers: [], transferRetries: 3 });
setImmediate(function() {
_transferStatus.emit('retry');
setImmediate(function() {
Expand Down
50 changes: 0 additions & 50 deletions test/network/index.unit.js
Expand Up @@ -305,56 +305,6 @@ describe('Network (private)', function() {

describe('#_listenForTunnelers', function() {

it('should send a publish message on tunserver locked', function(done) {
var net = Network({
keypair: KeyPair(),
manager: Manager(RAMStorageAdapter()),
logger: kad.Logger(0),
seeds: [],
address: '127.0.0.1',
port: 0,
noforward: true,
tunnels: 0
});
var emitter = new EventEmitter();
net._transport._tunserver = emitter;
var _pub = sinon.stub(net._pubsub, 'publish');
var _sub = sinon.stub(net._pubsub, 'subscribe');
net._listenForTunnelers();
emitter.emit('locked');
setImmediate(function() {
_pub.restore();
_sub.restore();
expect(_pub.callCount).to.equal(1);
done();
});
});

it('should send a publish message on tunserver unlocked', function(done) {
var net = Network({
keypair: KeyPair(),
manager: Manager(RAMStorageAdapter()),
logger: kad.Logger(0),
seeds: [],
address: '127.0.0.1',
port: 0,
noforward: true,
tunnels: 0
});
var emitter = new EventEmitter();
net._transport._tunserver = emitter;
var _pub = sinon.stub(net._pubsub, 'publish');
var _sub = sinon.stub(net._pubsub, 'subscribe');
net._listenForTunnelers();
emitter.emit('unlocked');
setImmediate(function() {
_pub.restore();
_sub.restore();
expect(_pub.callCount).to.equal(1);
done();
});
});

it('should announce unavailable tunnels', function(done) {
var net = Network({
keypair: KeyPair(),
Expand Down

0 comments on commit 891774e

Please sign in to comment.