Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge f8a0d28 into b35aa94
Browse files Browse the repository at this point in the history
  • Loading branch information
aleitner committed Sep 9, 2016
2 parents b35aa94 + f8a0d28 commit ff62e85
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 31 deletions.
11 changes: 11 additions & 0 deletions bin/actions/files.js
Expand Up @@ -54,6 +54,8 @@ module.exports.getInfo = function(bucketid, fileid, callback) {
return callback(file);
}
});

return callback(null);
});
};

Expand Down Expand Up @@ -89,6 +91,10 @@ module.exports.remove = function(id, fileId, env) {
module.exports.upload = function(bucket, filepath, env) {
var self = this;

if (env.concurrency > 6) {
log('warn', 'A concurrency of %s may result in issues!', env.concurrency);
}

var concurrency = env.concurrency ? parseInt(env.concurrency) : 6;

if (parseInt(env.redundancy) > 12 || parseInt(env.redundancy) < 1) {
Expand Down Expand Up @@ -326,8 +332,13 @@ module.exports.download = function(bucket, id, filepath, env) {
}

module.exports.getInfo.call(self, bucket, id, function(file) {

var target;

if (file === null) {
return log('error', 'file %s does not exist in bucket %s', [id, bucket]);
}

// Check if path is an existing path
if (storj.utils.existsSync(filepath) === true ) {
// Check if given path is a directory
Expand Down
9 changes: 6 additions & 3 deletions bin/storj.js
Expand Up @@ -178,9 +178,12 @@ program
.option('-c, --concurrency <count>', 'max shard upload concurrency')
.option('-C, --fileconcurrency <count>', 'max file upload concurrency', 1)
.option('-r, --redundancy <mirrors>', 'number of mirrors to create for file')
.description('upload a file or files to the network and track in a bucket')
.description('<filepath> can be a path with wildcard or a space separated')
.description(' list of files')
.description('upload a file or files to the network and track in a bucket.' +
'\n upload all files in a single directory using "/path/*"\n' +
' or upload recursively using "/path/**/*".\n' +
' <filepath> can be a path with wildcard or a space separated' +
' list of files.'
)
.action(actions.files.upload.bind(program));

program
Expand Down
7 changes: 6 additions & 1 deletion lib/bridge-client/index.js
Expand Up @@ -395,7 +395,12 @@ BridgeClient.prototype.storeFileInBucket = function(id, token, file, cb) {
return cb(new Error(fileSize +' bytes is not a supported file size.'));
}

var shardSize = FileDemuxer.getOptimalShardSize(fileSize);
var shardSize = FileDemuxer.getOptimalShardSize(
{
fileSize: fileSize,
shardConcurrency: this._transferConcurrency
}
);
var uploadState = new UploadState({
id: id,
file: file,
Expand Down
4 changes: 2 additions & 2 deletions lib/bridge-client/upload-state.js
@@ -1,11 +1,11 @@
'use strict';

var fs = require('fs');
var async = require('async');
var utils = require('../utils');
var merge = require('merge');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var rimraf = require('rimraf');

/**
* Internal state machine used by {@link BridgeClient}
Expand Down Expand Up @@ -61,7 +61,7 @@ UploadState.prototype.cleanup = function() {

this.cleanQueue.forEach(function(tmpFilePath) {
if (utils.existsSync(tmpFilePath)) {
fs.unlinkSync(tmpFilePath);
rimraf.sync(tmpFilePath);
}
});

Expand Down
43 changes: 33 additions & 10 deletions lib/file-handling/file-demuxer.js
Expand Up @@ -7,6 +7,7 @@ var fs = require('fs');
var EventEmitter = require('events').EventEmitter;
var merge = require('merge');
var utils = require('../utils');
var os = require('os');

/**
* Takes a single file read stream and outputs several output streams, used for
Expand Down Expand Up @@ -135,24 +136,46 @@ FileDemuxer.prototype._needsNewOutputStream = function() {

/**
* Determine the optimal shard size given an arbitrary file size in bytes
* @param {Number} fileSize - The number of bytes in the given file
* @param {Object} fileInfo
* @param {Number} fileInfo.fileSize - The number of bytes in the given file
* @param {Number} fileInfo.shardConcurrency - Num of shards uploaded at once
* @param {Number} [acc=1] - Accumulator (number of recursions)
* @returns {Number} shardSize
*/
FileDemuxer.getOptimalShardSize = function(fileSize, acc) {
FileDemuxer.getOptimalShardSize = function(fileInfo, acc) {
var accumulator = typeof acc === 'undefined' ? 0 : acc;
var byteMultiple = (8 * (1024 * 1024)) * Math.pow(2, accumulator);
var check = fileSize / byteMultiple;

if (check > 0 && check < 2) {
var distance = (accumulator - FileDemuxer.SHARD_MULTIPLES_BACK) < 0 ?
0 :
accumulator - FileDemuxer.SHARD_MULTIPLES_BACK;
// Determine hops back by accumulator
var hops = (accumulator - FileDemuxer.SHARD_MULTIPLES_BACK) < 0 ?
0 :
accumulator - FileDemuxer.SHARD_MULTIPLES_BACK;

return (8 * (1024 * 1024)) * Math.pow(2, distance);
// Calculate bytemultiple shard size by hops back
var shardSize = function(hops) {
return (8 * (1024 * 1024)) * Math.pow(2, hops);
};

var byteMultiple = shardSize(accumulator);
var check = fileInfo.fileSize / byteMultiple;

// Determine if bytemultiple is highest bytemultiple that is still <= fileSize
if (check > 0 && check <= 1) {

// Certify the number of concurrency * shardSize doesn't exceed freemem
while (
hops > 0 &&
(os.freemem() / shardSize(hops) <= fileInfo.shardConcurrency)
) {
hops = hops - 1 <= 0 ? 0 : hops - 1;
}

console.log('shard size is '+shardSize(hops));
console.log('free mem is '+os.freemem());

return shardSize(hops);
}

return this.getOptimalShardSize(fileSize, ++accumulator);
return this.getOptimalShardSize(fileInfo, ++accumulator);
};

module.exports = FileDemuxer;
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "storj",
"version": "3.1.2",
"version": "3.1.3",
"description": "implementation of the storj protocol for node.js and the browser",
"main": "index.js",
"directories": {
Expand Down
8 changes: 4 additions & 4 deletions test/bridge-client/upload-state.unit.js
Expand Up @@ -18,10 +18,10 @@ describe('UploadState', function() {
describe('#cleanup', function() {

it('should unlink the tmpfile if it exists', function() {
var unlinkSync = sinon.stub();
var rimrafSync = sinon.stub();
var StubUploadState = proxyquire('../../lib/bridge-client/upload-state', {
fs: {
unlinkSync: unlinkSync
rimraf: {
sync: rimrafSync
},
'../utils': {
existsSync: sinon.stub().returns(true)
Expand All @@ -30,7 +30,7 @@ describe('UploadState', function() {
var uploadState = new StubUploadState();
uploadState.cleanQueue.push('/some/tmp/file');
uploadState.cleanup();
expect(unlinkSync.called).to.equal(true);
expect(rimrafSync.called).to.equal(true);
});

});
Expand Down
98 changes: 88 additions & 10 deletions test/file-handling/file-demuxer.unit.js
Expand Up @@ -8,6 +8,7 @@ var os = require('os');
var fs = require('fs');
var path = require('path');
var utils = require('../../lib/utils');
var proxyquire = require('proxyquire');
var filePathEven = path.join(os.tmpdir(), 'storjfiledmxtest-even.data');
var filePathOdd = path.join(os.tmpdir(), 'storjfiledmxtest-odd.data');
var filePathEmpty = path.join(os.tmpdir(), 'storjfiledmxtest-empty.data');
Expand Down Expand Up @@ -114,64 +115,141 @@ describe('FileDemuxer', function() {

describe('FileDemuxer#getOptimalShardSize', function() {

var FileDemuxerStub = proxyquire('../../lib/file-handling/file-demuxer', {
'os': {
freemem: function() {
return 1024 * (1024 * 1024); //1GB of memory
}
}
});

it('should return 8 for 8', function() {
expect(
FileDemuxer.getOptimalShardSize(8 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 3 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(8 * (1024 * 1024));
});

it('should return 8 for 16', function() {
expect(
FileDemuxer.getOptimalShardSize(16 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 16 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(8 * (1024 * 1024));
});

it('should return 8 for 32', function() {
expect(
FileDemuxer.getOptimalShardSize(32 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 32 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(8 * (1024 * 1024));
});

it('should return 8 for 64', function() {
expect(
FileDemuxer.getOptimalShardSize(64 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 64 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(8 * (1024 * 1024));
});

it('should return 8 for 128', function() {
expect(
FileDemuxer.getOptimalShardSize(128 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 128 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(8 * (1024 * 1024));
});

it('should return 8 for 256', function() {
expect(
FileDemuxer.getOptimalShardSize(256 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 256 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(8 * (1024 * 1024));
});

it('should return 16 for 512', function() {
expect(
FileDemuxer.getOptimalShardSize(512 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 512 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(16 * (1024 * 1024));
});

it('should return 32 for 1024', function() {
expect(
FileDemuxer.getOptimalShardSize(1024 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 1024 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(32 * (1024 * 1024));
});

it('should return 64 for 2048', function() {
expect(
FileDemuxer.getOptimalShardSize(2048 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 2048 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(64 * (1024 * 1024));
});

it('should return 128 for 4096', function() {
expect(
FileDemuxer.getOptimalShardSize(4096 * (1024 * 1024))
FileDemuxerStub.getOptimalShardSize(
{
fileSize: 4096 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(128 * (1024 * 1024));
});

it('should return 8 for 4096 if only 16MB of memory', function() {
var LowMemDemuxer = proxyquire('../../lib/file-handling/file-demuxer', {
'os': {
freemem: function() {
return 16 * (1024 * 1024); // 16MB of memory
}
}
});

expect(
LowMemDemuxer.getOptimalShardSize(
{
fileSize: 4096 * (1024 * 1024),
shardConcurrency: 3
}
)
).to.equal(8 * (1024 * 1024));
});

});

0 comments on commit ff62e85

Please sign in to comment.