Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge pull request #106 from gordonwritescode/master
Browse files Browse the repository at this point in the history
[WIP] v0.6.14
  • Loading branch information
gordonwritescode committed May 18, 2016
2 parents 59ebdaf + 37ecf80 commit 7c3b793
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 30 deletions.
3 changes: 3 additions & 0 deletions index.js
Expand Up @@ -77,6 +77,9 @@ module.exports.Manager = require('./lib/manager');
/** {@link StorageAdapter} */
module.exports.StorageAdapter = require('./lib/storage/adapter');

/** {@link StorageMigration} */
module.exports.StorageMigration = require('./lib/storage/migration');

/** {@link FSStorageAdapter} */
module.exports.FSStorageAdapter = require('./lib/storage/adapters/fs');

Expand Down
3 changes: 1 addition & 2 deletions lib/contract/index.js
Expand Up @@ -197,10 +197,9 @@ Contract.prototype._validate = function() {

/**
* Checks if the contract is complete
* @private
* @returns {Boolean} completed
*/
Contract.prototype._complete = function() {
Contract.prototype.isComplete = function() {
for (var prop in this._properties) {
if (this._properties[prop] === null) {
return false;
Expand Down
33 changes: 17 additions & 16 deletions lib/datachannel/server.js
Expand Up @@ -170,23 +170,23 @@ DataChannelServer.prototype._handleConsignStream = function(socket, token) {

this._manager.load(hash, function(err, item) {
if (err) {
socket.close(500, err.message );
socket.close(500, err.message);
return self.reject(token);
}

var contract = Object.keys(item.contracts)[0];
var shardsize = item.contracts[contract].get('data_size');

if (socket.readyState !== ws.OPEN) {
return self.reject(token);
}

socket.resume();

// If the shard is not writable, it means we already have it, so let's
// just respond with a success message
if (typeof item.shard.write !== 'function') {
socket.send(JSON.stringify({
code: 200,
message: 'Consignment completed successfully'
}));
return self.reject(token);
return self._closeSocketSuccess(socket, 'Consignment completed', token);
}

passthrough.on('data', function(chunk) {
Expand All @@ -205,12 +205,7 @@ DataChannelServer.prototype._handleConsignStream = function(socket, token) {
}

item.shard.end();
socket.send(JSON.stringify({
code: 200,
message: 'Consignment completed successfully'
}));

self.reject(token);
self._closeSocketSuccess(socket, 'Consignment completed', token);
});
});
};
Expand Down Expand Up @@ -241,9 +236,7 @@ DataChannelServer.prototype._handleRetrieveStream = function(socket, token) {
});

filestream.on('end', function() {
socket.send(JSON.stringify({ code: 200 }), function() {
self.reject(token);
});
self._closeSocketSuccess(socket, 'File transfer complete', token);
});
});
};
Expand All @@ -258,7 +251,6 @@ DataChannelServer.prototype._handleUnknownStream = function(stream, token) {
var client = this._allowed[token].client;

client.close(400, 'Failed to handle the defined operation');

this.reject(token);
};

Expand All @@ -272,4 +264,13 @@ DataChannelServer.prototype._handleError = function(err) {
this.emit('error', err);
};

/**
* Sends a success message for operation and rejects the token
* @private
*/
DataChannelServer.prototype._closeSocketSuccess = function(sock, msg, token) {
sock.send(JSON.stringify({ code: 200, message: msg }));
this.reject(token);
};

module.exports = DataChannelServer;
11 changes: 7 additions & 4 deletions lib/manager.js
Expand Up @@ -69,15 +69,18 @@ Manager.prototype.clean = function(callback) {
rstream.pause();

var total = Object.keys(item.contracts).length;
var ended = 0;
var endedOrIncomplete = 0;

for (var nodeID in item.contracts) {
if (item.contracts[nodeID].get('store_end') < Date.now()) {
ended++;
var ended = item.contracts[nodeID].get('store_end') < Date.now();
var incomplete = !item.contracts[nodeID].isComplete();

if (ended || incomplete) {
endedOrIncomplete++;
}
}

if (total === ended) {
if (total === endedOrIncomplete) {
self._storage.del(item.hash, function(/* err */) {
rstream.resume();
});
Expand Down
2 changes: 1 addition & 1 deletion lib/network/protocol.js
Expand Up @@ -102,7 +102,7 @@ Protocol.prototype._verifyContract = function(contract, contact, callback) {

contract.sign('renter', this._network._keypair.getPrivateKey());

if (!contract._complete()) {
if (!contract.isComplete()) {
return callback(new Error('Contract is not complete'));
}

Expand Down
2 changes: 1 addition & 1 deletion lib/storage/adapters/level/filestore.js
Expand Up @@ -68,7 +68,7 @@ LevelDBFileStore.prototype.createReadStream = function(key) {

self._db.get(key + ' ' + index.toString(), function(err, result) {
if (err) {
return rs.emit('error', err);
return rs.push(null);
}

if (result === 'null') { // NB: We store the string "null" to signal EOF
Expand Down
124 changes: 124 additions & 0 deletions lib/storage/migration.js
@@ -0,0 +1,124 @@
'use strict';

var assert = require('assert');
var StorageAdapter = require('./adapter');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var StorageItem = require('./item');

/**
* Migrates data stored with one {@link StorageAdapter} to another
* @constructor
* @param {StorageAdapter} source - The source adapter
* @param {StorageAdapter} target - The migration destination
*/
function StorageMigration(source, target) {
if (!(this instanceof StorageMigration)) {
return new StorageMigration(source, target);
}

assert(source instanceof StorageAdapter, 'Invalid storage adapter supplied');
assert(target instanceof StorageAdapter, 'Invalid storage adapter supplied');

this.source = source;
this.target = target;
this.readyState = StorageMigration.STOPPED;

EventEmitter.call(this);
}

inherits(StorageMigration, EventEmitter);

StorageMigration.STOPPED = 0;
StorageMigration.STARTED = 1;

/**
* Starts the migration process
*/
StorageMigration.prototype.start = function() {
assert(
this.readyState === StorageMigration.STOPPED,
'Migration has already started'
);

this.readyState = StorageMigration.STARTED;
this._sourceStream = this.source.createReadStream();

this._sourceStream.on('data', this._handleSourceObject.bind(this));
this._sourceStream.on('end', this._handleSourceFinished.bind(this));
this._sourceStream.on('error', this._handleSourceError.bind(this));

return this._sourceStream;
};

/**
* Stops the migration process
*/
StorageMigration.prototype.stop = function() {
assert(
this.readyState === StorageMigration.STARTED,
'Migration has already stopped'
);

this._sourceStream.removeAllListeners();

this.readyState = StorageMigration.STOPPED;
this._sourceStream = null;
};

/**
* Handles a data event from the source read stream and inserts it into the
* the target adapter
* @private
* @param {StorageItem} sourceItem - Storage item from the source read stream
*/
StorageMigration.prototype._handleSourceObject = function(sourceItem) {
var self = this;

self._sourceStream.pause();

self.target.put(StorageItem(sourceItem), function(err) {
if (err) {
return self.emit('error', err);
}

self.target.get(sourceItem.hash, function(err, targetItem) {
if (err) {
return self.emit('error', err);
}

if (typeof sourceItem.shard.read === 'function') {
return sourceItem.shard.pipe(targetItem.shard)
.on('error', self._handleSourceError.bind(this))
.on('finish', self._sourceStream.resume.bind(self._sourceStream));
}

self._sourceStream.resume();
});
});
};

/**
* Handles the completion of the source stream read
* @private
*/
StorageMigration.prototype._handleSourceFinished = function() {
this.readyState = StorageMigration.STOPPED;
this._sourceStream = null;

this.emit('finish');
};

/**
* Handles errors received from the underyling source stream
* @private
* @param {Error} error
*/
StorageMigration.prototype._handleSourceError = function(err) {
this.readyState = StorageMigration.STOPPED;
this._sourceStream = null;

this.emit('error', err);
};

module.exports = StorageMigration;
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "storj",
"version": "0.6.13",
"version": "0.6.14",
"description": "implementation of the storj protocol for node.js and the browser",
"main": "index.js",
"directories": {
Expand Down
6 changes: 3 additions & 3 deletions test/contract/index.unit.js
Expand Up @@ -192,10 +192,10 @@ describe('Contract (private)', function() {

});

describe('#_complete', function() {
describe('#isComplete', function() {

it('should return false if fields are null', function() {
expect(Contract()._complete()).to.equal(false);
expect(Contract().isComplete()).to.equal(false);
});

it('should return true if fields are not null', function() {
Expand All @@ -210,7 +210,7 @@ describe('Contract (private)', function() {
});
contract.sign('renter', kp1.getPrivateKey());
contract.sign('farmer', kp2.getPrivateKey());
expect(contract._complete()).to.equal(true);
expect(contract.isComplete()).to.equal(true);
});

});
Expand Down

0 comments on commit 7c3b793

Please sign in to comment.