Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
resolve issues with duplicate files, separate buckets, etc, rework so…
Browse files Browse the repository at this point in the history
…me indexes, add some more tests
  • Loading branch information
Gordon Hall committed Mar 15, 2016
1 parent a880ba8 commit 75e75f9
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 133 deletions.
193 changes: 121 additions & 72 deletions lib/server/routes/buckets.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function BucketsRouterFactory(config, storage, network) {
const Token = storage.models.Token;
const Contact = storage.models.Contact;
const File = storage.models.File;
const BucketEntry = storage.models.BucketEntry;
const Bucket = storage.models.Bucket;
const Hash = storage.models.Hash;
const verify = authenticate(storage);
Expand Down Expand Up @@ -244,10 +245,11 @@ function BucketsRouterFactory(config, storage, network) {

let index = 0;
let file = new File({
bucket: bucket._id,
shards: [],
size: 0
});
let entry = new BucketEntry({
bucket: bucket._id
});
let hashes = [];
let hash = crypto.createHash('sha256');

Expand Down Expand Up @@ -308,7 +310,6 @@ function BucketsRouterFactory(config, storage, network) {

// Create a Hash object with the correct index
hashes.push({
file: file,
hash: hash,
index: index
});
Expand All @@ -325,15 +326,12 @@ function BucketsRouterFactory(config, storage, network) {
});

transfer.on('end', function onComplete() {
var self = this;

if (file.size !== filesize) {
return next(new Error('File does not match the declared size'));
}

File.findOneAndUpdate({
_id: file.hash,
bucket: bucket._id
_id: file.hash
}, file, {
upsert: true,
new: true
Expand All @@ -342,28 +340,48 @@ function BucketsRouterFactory(config, storage, network) {
return next(err);
}

async.each(hashes, function(hash, done) {
Hash.create(
hash.file,
hash.hash,
hash.index,
done
);
}, function(err) {
entry.file = pointer.id;

BucketEntry.findOneAndUpdate({
file: file.hash,
bucket: bucket.id
}, entry, {
upsert: true,
new: true
}, function(err, bucketentry) {
if (err) {
return self.emit('error', err);
return next(err);
}

res.send(pointer.toObject());
async.each(hashes, function(hash, done) {
Hash.create(
pointer,
hash.hash,
hash.index,
done
);
}, function(err) {
if (err) {
return next(err);
}

res.send({
bucket: bucketentry.bucket,
mimetype: bucketentry.mimetype,
filename: bucketentry.filename,
size: pointer.size,
hash: pointer.hash
});
});
});
});
});

let busboy = new BusBoy({ headers: req.headers });

busboy.once('file', function(field, stream, filename, encoding, mime) {
file.mimetype = mime;
file.filename = filename;
entry.mimetype = mime;
entry.name = filename;
stream.pipe(chunkbuffer).pipe(hasher).pipe(transfer);
});

Expand Down Expand Up @@ -405,75 +423,98 @@ function BucketsRouterFactory(config, storage, network) {
return next(new Error('Not authorized to retrieve from bucket'));
}

Hash.find({
file: req.params.hash
}).sort({
index: 1
}).exec(function(err, shards) {
File.findOne({ _id: req.params.hash }, function(err, file) {
if (err) {
return next(err);
}

async.mapSeries(shards, function(hash, done) {
network._manager.load(hash._id, function(err, item) {
if (err) {
return done(err);
}
if (!file) {
return next(new Error('The requested file was not found'));
}

let contract;
BucketEntry.findOne({
file: file.id,
bucket: bucket.id
}, function(err, entry) {
if (err) {
return next(err);
}

for (var c in item.contracts) {
if (item.contracts[c]._complete()) {
contract = item.contracts[c];
}
}
if (!entry) {
return next(new Error('The requested file was not found'));
}

if (!contract) {
return done(new Error('Failed to find the shard contract'));
Hash.find({
file: entry.file
}).sort({
index: 1
}).exec(function(err, shards) {
if (err) {
return next(err);
}

let farmer_id = contract.get('farmer_id');

Contact.findOne({ _id: farmer_id }, function(err, farmer) {
if (err) {
return done(err);
}

if (!farmer) {
return done(new Error('Could not find the farmer'));
}

var message = new kad.Message({
method: 'RETRIEVE',
params: { data_hash: hash._id, contact: network._contact }
});

network._transport.send(farmer, message, function(err, response) {
async.mapSeries(shards, function(hash, done) {
network._manager.load(hash._id, function(err, item) {
if (err) {
return done(err);
}

if (response.error) {
return done(new Error(response.error.message));
let contract;

for (var c in item.contracts) {
if (item.contracts[c]._complete()) {
contract = item.contracts[c];
}
}

done(null, {
hash: hash._id,
token: response.result.token,
operation: 'PULL',
channel: storj.DataChannelClient.getChannelURL(
response.result.contact
)
if (!contract) {
return done(new Error('Failed to find the shard contract'));
}

let farmer_id = contract.get('farmer_id');

Contact.findOne({ _id: farmer_id }, function(err, farmer) {
if (err) {
return done(err);
}

if (!farmer) {
return done(new Error('Could not find the farmer'));
}

var message = new kad.Message({
method: 'RETRIEVE',
params: { data_hash: hash._id, contact: network._contact }
});

network._transport.send(farmer, message, function(err, response) {
if (err) {
return done(err);
}

if (response.error) {
return done(new Error(response.error.message));
}

done(null, {
hash: hash._id,
token: response.result.token,
operation: 'PULL',
channel: storj.DataChannelClient.getChannelURL(
response.result.contact
)
});
});
});
});
}, function(err, payloads) {
if (err) {
return next(err);
}

res.send(payloads);
});
});
}, function(err, payloads) {
if (err) {
return next(err);
}

res.send(payloads);
});
});
});
Expand Down Expand Up @@ -501,13 +542,21 @@ function BucketsRouterFactory(config, storage, network) {
return next(new Error('Bucket not found'));
}

File.find({ bucket: req.params.id }, function(err, files) {
BucketEntry.find({
bucket: req.params.id
}).populate('file').exec(function(err, entries) {
if (err) {
return next(err);
}

res.status(200).send(files.map(function(file) {
return file.toObject();
res.status(200).send(entries.map(function(entry) {
return {
bucket: entry.bucket,
mimetype: entry.mimetype,
filename: entry.filename,
size: entry.file.size,
hash: entry.file.hash
};
}));
});
});
Expand Down
3 changes: 2 additions & 1 deletion lib/storage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ Storage.prototype._createBoundModels = function() {
Token: require('./models/token'),
Contact: require('./models/contact'),
Shard: require('./models/shard'),
Hash: require('./models/hash')
Hash: require('./models/hash'),
BucketEntry: require('./models/bucketentry')
};

for (let model in models) {
Expand Down
51 changes: 51 additions & 0 deletions lib/storage/models/bucketentry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
'use strict';

const mongoose = require('mongoose');
const mimetypes = require('mime-db');
const SchemaOptions = require('../options');

/**
* Represents a bucket entry that points to a file
* @constructor
*/
var BucketEntry = new mongoose.Schema({
file: {
type: String,
ref: 'File'
},
bucket: {
type: mongoose.Schema.Types.ObjectId,
ref: 'Bucket'
},
mimetype: {
type: String,
enum: Object.keys(mimetypes),
default: 'application/octet-stream',
required: true
},
name: {
type: String
}
});

BucketEntry.virtual('filename').get(function() {
return this.name || this.file;
});

BucketEntry.index({ file: 1, bucket: 1 }, { unique: true });

BucketEntry.plugin(SchemaOptions, {
read: 'secondaryPreferred'
});

BucketEntry.set('toObject', {
virtuals: true,
transform: function(doc, ret) {
delete ret.__v;
delete ret._id;
}
});

module.exports = function(connection) {
return connection.model('BucketEntry', BucketEntry);
};
24 changes: 5 additions & 19 deletions lib/storage/models/file.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,28 @@
'use strict';

const mongoose = require('mongoose');
const mimetypes = require('mime-db');
const SchemaOptions = require('../options');

/**
* Represents a file pointer
* @constructor
*/
var FileSchema = new mongoose.Schema({
_id: { // hash
_id: {
type: String,
required: true
},
bucket: {
type: mongoose.Schema.Types.ObjectId,
ref: 'Bucket'
},
mimetype: {
type: String,
enum: Object.keys(mimetypes),
default: 'application/octet-stream',
required: true
},
filename: {
type: String
},
size: {
type: Number,
min: 0,
default: 0
}
});

FileSchema.virtual('hash').get(function() {
return this._id;
});

FileSchema.plugin(SchemaOptions, {
read: 'secondaryPreferred'
});
Expand All @@ -45,10 +35,6 @@ FileSchema.set('toObject', {
}
});

FileSchema.virtual('hash').get(function() {
return this._id;
});

module.exports = function(connection) {
return connection.model('File', FileSchema);
};

0 comments on commit 75e75f9

Please sign in to comment.