Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
multiple file upload added with file concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
phutchins committed Aug 24, 2016
1 parent 4ce60fd commit b01876d
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 61 deletions.
171 changes: 112 additions & 59 deletions bin/actions/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ var fs = require('fs');
var path = require('path');
var through = require('through');
var storj = require('../..');
var glob = require('glob');
var async = require('async');

module.exports.list = function(privateClient, bucketid) {
privateClient.listFilesInBucket(bucketid, function(err, files) {
Expand Down Expand Up @@ -50,81 +52,132 @@ module.exports.remove = function(keypass, privateClient, id, fileId, env) {
destroyFile();
};

module.exports.upload = function(privateClient, keypass, bucket, filepath, env) {
if (!storj.utils.existsSync(filepath)) {
return log('error', 'No file found at %s', filepath);
}

var secret = new storj.DataCipherKeyIv();
var encrypter = new storj.EncryptStream(secret);

utils.getKeyRing(keypass, function(keyring) {
log('info', 'Generating encryption key...');
log('info', 'Encrypting file "%s"', [filepath]);

utils.makeTempDir(function(err, tmpDir, tmpCleanup) {
module.exports.upload = function(privateClient, keypass, bucket, filepaths, env) {
async.eachOfSeries(filepaths, function(origFilepath, index, callback) {
// In *NIX the wildcard is already parsed so this will cover other OS's
glob(origFilepath, function(err, parsedFileArray) {
if (err) {
return log('error', err.message);
return log('error', 'Invalid path or file %s', [ err ]);
}

var tmppath = path.join(tmpDir, path.basename(filepath) + '.crypt');
var newPathFound = ( filepaths.indexOf(parsedFileArray[0]) === -1 );
var pathWasParsed = (( parsedFileArray.length > 1 ) || newPathFound );

function cleanup() {
log('info', 'Cleaning up...');
tmpCleanup();
log('info', 'Finished cleaning!');
// If the arrays length is 1, and it doesn't exist in the orig fileArray
// remove the element from this index in fileArray and add parsedFileArray
// to it
if (pathWasParsed) {
filepaths.splice(index, 1, parsedFileArray);
}

fs.createReadStream(filepath)
.pipe(encrypter)
.pipe(fs.createWriteStream(tmppath)).on('finish', function() {
log('info', 'Encryption complete!');
log('info', 'Creating storage token...');
privateClient.createToken(
bucket,
'PUSH',
function(err, token) {
if (err) {
log('error', err.message);
return cleanup();
}

log('info', 'Storing file, hang tight!');

privateClient.storeFileInBucket(
callback();
});
}, function(err) {
if (err) {
return log('error', 'Problem parsing file paths');
}

var fileCount = filepaths.length;
var uploadedCount = 0;
var fileConcurrency = env.fileconcurrency;

log('info', '%s file(s) to upload.', [ fileCount ]);

utils.getKeyRing(keypass, function(keyring) {
log('info', 'Generating encryption key...');

async.eachLimit(filepaths, fileConcurrency, function(filepath, callback) {
if (!storj.utils.existsSync(filepath)) {
return log('error', 'No file found at %s', filepath);
}

utils.makeTempDir(function(err, tmpDir, tmpCleanup) {
if (err) {
return log('error', err.message);
}

log('info', 'Encrypting file "%s"', [filepath]);

var secret = new storj.DataCipherKeyIv();
var encrypter = new storj.EncryptStream(secret);


var tmppath = path.join(tmpDir, path.basename(filepath) + '.crypt');

function cleanup() {
log('info', 'Cleaning up...');
tmpCleanup();
log('info', 'Finished cleaning!');
}

fs.createReadStream(filepath)
.pipe(encrypter)
.pipe(fs.createWriteStream(tmppath)).on('finish', function() {
log('info', 'Encryption complete!');
log('info', 'Creating storage token...');
privateClient.createToken(
bucket,
token.token,
tmppath,
function(err, file) {
'PUSH',
function(err, token) {
if (err) {
log('warn', 'Error occurred. Triggering cleanup...');
cleanup();
return log('error', err.message);
log('error', err.message);
return cleanup();
}

keyring.set(file.id, secret);
cleanup();
log('info', 'Encryption key saved to keyring.');
log('info', 'File successfully stored in bucket.');
log(
'info',
'Name: %s, Type: %s, Size: %s bytes, ID: %s',
[file.filename, file.mimetype, file.size, file.id]
log('info', 'Storing file, hang tight!');

privateClient.storeFileInBucket(
bucket,
token.token,
tmppath,
function(err, file) {
if (err) {
log('warn', 'Error occurred. Triggering cleanup...');
cleanup();
callback(err, filepath);
// Should retry this file
return log('error', err.message);
}

keyring.set(file.id, secret);
cleanup();
log('info', 'Encryption key saved to keyring.');
log('info', 'File successfully stored in bucket.');
log(
'info',
'Name: %s, Type: %s, Size: %s bytes, ID: %s',
[file.filename, file.mimetype, file.size, file.id]
);

if (env.redundancy) {
return this.mirrors(privateClient, bucket, file.id, env);
}

uploadedCount++;

if (uploadedCount === fileCount) {
log('info', '%s files uploaded. Done', [ uploadedCount ]);
process.exit();
}
callback(null, filepath);
}
);

if (env.redundancy) {
return this.mirrors(privateClient, bucket, file.id, env);
}

process.exit();
}
);
}
);
});
});
}, function(err, filepath) {
if (err) {
log('error', 'A file has failed to upload: %s', [ filepath ]);
}
);

log('info', 'Successfully uploaded %s files!', [ uploadedCount ]);
});
});
});
/*
});
*/
};

module.exports.mirror = function(privateClient, bucket, file, env) {
Expand Down
11 changes: 9 additions & 2 deletions bin/storj.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,13 @@ var ACTIONS = {
var privateClient = PrivateClient({
concurrency: env.concurrency ? parseInt(env.concurrency) : 6
});
actions.files.upload(privateClient, getKeyPass(), bucket, filepath, env);

var filepaths = process.argv;
var firstFileIndex = filepaths.indexOf(filepath);

filepaths.splice(0,firstFileIndex);

actions.files.upload(privateClient, getKeyPass(), bucket, filepaths, env);
},
createmirrors: function createmirrors(bucket, file, env) {
actions.files.mirror(PrivateClient(), bucket, file, env);
Expand Down Expand Up @@ -295,7 +301,8 @@ program

program
.command('upload-file <bucket-id> <filepath>')
.option('-c, --concurrency <count>', 'max upload concurrency')
.option('-c, --concurrency <count>', 'max shard upload concurrency')
.option('-C, --fileconcurrency <count>', 'max file upload concurrency', 1)
.option('-r, --redundancy <mirrors>', 'number of mirrors to create for file')
.description('upload a file to the network and track in a bucket')
.action(ACTIONS.uploadfile);
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"colors": "^1.1.2",
"commander": "^2.9.0",
"flushwritable": "^1.0.0",
"glob": "^7.0.5",
"ip": "^1.1.2",
"jsen": "^0.6.0",
"json-stable-stringify": "^1.0.1",
Expand Down

0 comments on commit b01876d

Please sign in to comment.