From 4403eb3e62453fd9cb1f5e78cf25a8a074f1fe69 Mon Sep 17 00:00:00 2001 From: Laurence Liss Date: Fri, 18 Mar 2016 17:51:37 -0400 Subject: [PATCH] Delete asset versions when deleting asset. We had been leaving historical asset versions on disk and in the LevelDB database after deleting an asset. This was not the desired functionality as it would eventually result in us having to run migrations to remove old asset versions. Currently we do not give users a way to manage these so it can be assumed that a user will expect all data related to an asset to be removed upon deletion and this includes historical versions. While testing, I found an issue in which the AWS uploader could fail and incorrectly report to the uploader that it had succeeded. To deal with this condition I added an error handler on the stream and ensured the the AWS upload function emits an error event when relevant. --- lib/Server.js | 47 ++++++++++++++++--------- lib/plugins/database/LevelDB.js | 37 +++++++++++++++---- lib/plugins/fileStorage/AwsS3Storage.js | 10 ++++-- 3 files changed, 68 insertions(+), 26 deletions(-) diff --git a/lib/Server.js b/lib/Server.js index aec1df2..3487e3e 100644 --- a/lib/Server.js +++ b/lib/Server.js @@ -251,23 +251,33 @@ Server.prototype.routes.deleteBucketToken = function(req, res, done) { Server.prototype.routes.deleteAssetFromBucket = function(req, res, done) { var self = this; - var bucket = req.params.bucket; + var bucketId = req.params.bucket; var assetName = req.params.assetName; - - this.database.deleteBucketAsset(bucket, assetName, function(error, data) { - if (error) { - return res.status(500).send('Asset could not be deleted.'); - } - self.fileStorage.deleteFile(data, function(err) { - if (err) { - return res.status(500).send('Asset could not be deleted.'); - } - else { - return res.status(202).send('Asset removed.'); - } - }); + var flush = function(cb) { + res.writeHead(202); + res.end('Asset deleted'); + cb(); + done(); + }; + this.database.deleteBucketAsset(bucketId, assetName, function(err) { + if (err) { return self.handleError(req, res, err); } + self.database.getAssetVersionsByAssetId(bucketId, assetName) + .pipe(through2.obj(function(assetId, enc, cb) { + this.push(assetId); + self.database.deleteBucketAssetVersion(bucketId, assetName, assetId, function(err) { + if (err) { return self.handleError(req, res, err); } + self.fileStorage.deleteFile(assetId, function(err) { + if (err) { self.handleError(req, res, err); } + self.logger.info(`Deleted asset file: ${assetId}`, { + bucketId: bucketId, + assetId: assetId, + assetName: assetName, + }); + cb(); + }); + }); + }, flush)); }); - }; Server.prototype.routes.receiveFileAsset = function(req, res, done) { @@ -298,13 +308,17 @@ Server.prototype.routes.receiveFileAsset = function(req, res, done) { .send('Invalid token'); } self.logger.info('Asset ' + assetName + ' uploading to bucket bucket ' + bucket + ' using token ' + token, {bucket: bucket, token: token, assetName: assetName}); - res.writeHead(201, {'Content-Type': 'text/plain'}); req .pipe(rawByteCounter) .pipe(zlib.createGzip()) .pipe(zippedByteCounter) .pipe(crypto.createCipher(self.options.encryptionCipher, self.options.encryptionPassword + assetId + bucket)) .pipe(self.fileStorage.createWriteStream(assetId)) + .on('error', function(err) { + self.logger.error(`Write stream failed to create file asset: ${assetId}`, err); + self.handleError(req, res, err); + done(err); + }) .on('close', function() { metadata.rawSize = rawByteCounter.bytes; metadata.zippedSize = zippedByteCounter.bytes; @@ -314,6 +328,7 @@ Server.prototype.routes.receiveFileAsset = function(req, res, done) { return console.error(error); } self.logger.info('Asset ' + assetName + ' uploaded to bucket bucket ' + bucket + ' using token ' + token, {bucket: bucket, token: token, assetName: assetName}); + res.writeHead(201, {'Content-Type': 'text/plain'}); res.end(assetId); }); }); diff --git a/lib/plugins/database/LevelDB.js b/lib/plugins/database/LevelDB.js index d3eaf07..6fff8bc 100644 --- a/lib/plugins/database/LevelDB.js +++ b/lib/plugins/database/LevelDB.js @@ -175,14 +175,37 @@ class LevelStore { } deleteBucketAsset(bucketId, assetName, done) { - var self = this; - this.getAssetId(bucketId, assetName, function(err, assetId) { - self.db.batch() - .del(`bucket-asset!!${bucketId}!!${assetName}`) - .del(`bucket-asset-version!!${bucketId}!!${assetName}!!${assetId}`) - .del(`asset!!${assetId}`) - .write(done(err, assetId)); + var keysForDeletion = [ + `bucket-asset!!${bucketId}!!${assetName}`, + ]; + this.deleteBatch(keysForDeletion, function(err) { + done(err); + }); + } + + deleteBucketAssetVersion(bucketId, assetName, assetId, done) { + var keys = [ + `asset!!${assetId}`, + `bucket-asset-version!!${bucketId}!!${assetName}!!${assetId}`, + ]; + this.deleteBatch(keys, function(err) { + done(err); + }); + } + + getAssetVersionsByAssetId(bucketId, assetName) { + var streamOptions = { + gt: `bucket-asset-version!!${bucketId}!!${assetName}!!!`, + lt: `bucket-asset-version!!${bucketId}!!${assetName}!!~`, + }; + var stream = this.db.createKeyStream(streamOptions); + var transform = through2.obj(function(data, enc, cb) { + var assetId = data.split('!!')[3]; + this.push(assetId); + cb(); }); + stream.pipe(transform); + return transform; } /** diff --git a/lib/plugins/fileStorage/AwsS3Storage.js b/lib/plugins/fileStorage/AwsS3Storage.js index c63bca2..c8bb5ce 100644 --- a/lib/plugins/fileStorage/AwsS3Storage.js +++ b/lib/plugins/fileStorage/AwsS3Storage.js @@ -36,12 +36,16 @@ class AwsS3Storage { request.send(function(error, data) { // We must alert the write stream that we have finished or it will // not close the connection to the client. - writeStream.emit('close'); if (error) { - self.logger.error(error); + self.logger.error(`AWS S3 file upload failure for asset: ${assetName}`, error); + writeStream.emit('error', error); + if (done) { + return done(error); + } } + writeStream.emit('close'); if (done) { - return done(error); + return done(); } }); return writeStream;