From 2dd6e16a3e6267a4a7ce446dbfa2c4e4425d8ea4 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Tue, 30 Sep 2014 13:53:09 -0700 Subject: [PATCH 01/17] Initial commit of AWS.S3.ManagedUpload Usage: ```js var fs = require('fs'); var zlib = require('zlib'); var body = fs.createReadStream('bigfile').pipe(zlib.createGzip()); var s3obj = new AWS.S3({params: {Bucket: 'myBucket', Key: 'myKey'}}); s3obj.upload(). on('httpUploadProgress', function(evt) { console.log(evt); }). send({Body: body}, function(err, data) { console.log(err, data) }); ``` Or via the ManagedUpload class: ```js var fs = require('fs'); var zlib = require('zlib'); var params = { Bucket: 'myBucket', Key: 'myKey', Body: fs.createReadStream('bigfile').pipe(zlib.createGzip()) } var upload = new AWS.S3.ManagedUpload(); upload.on('httpUploadProgress', function(evt) { console.log(evt); }); upload.send(params, function(err, data) { console.log(err, data) }); ``` References #135 --- lib/s3/managed_upload.js | 246 +++++++++++++++++++++++++++++++++++++++ lib/services/s3.js | 10 ++ 2 files changed, 256 insertions(+) create mode 100644 lib/s3/managed_upload.js diff --git a/lib/s3/managed_upload.js b/lib/s3/managed_upload.js new file mode 100644 index 0000000000..f0423a2576 --- /dev/null +++ b/lib/s3/managed_upload.js @@ -0,0 +1,246 @@ +var AWS = require('../core'); + +var maxTotalParts = 10000; +var minMultipartSize = 1024 * 1024 * 5; + +function ManagedUpload(svc, options) { + AWS.SequentialExecutor.call(this); + + var self = this; + var callback, body; + var maxQueue = 4; + var partSize = minMultipartSize; + + var totalChunkedBytes = 0; + var totalUploadedBytes = 0; + var totalBytes; + var numParts = 0; + var totalPartNumbers = 0; + var activeParts = 0; + var doneParts = 0; + var parts = {}; + var completeInfo = []; + var failed = false; + var isDoneChunking = false; + var multipartReq = null; + + var fillQueue = function() { + callback(new Error('Unsupported body payload ' + typeof(body))); + }; + + function cleanup(err) { + if (failed) return; + + // clean up stream + if (typeof body.removeAllListeners === 'function' && + typeof body.resume === 'function') { + body.removeAllListeners('readable'); + body.removeAllListeners('end'); + body.resume(); + } + + if (svc.config.params.UploadId) svc.abortMultipartUpload().send(); + + AWS.util.each(parts, function(partNumber, part) { + part.removeAllListeners('complete'); + part.abort(); + }); + + parts = {}; + callback(err); + failed = true; + } + + function finishMultiPart() { + var completeParams = { MultipartUpload: { Parts: completeInfo } }; + svc.completeMultipartUpload(completeParams, function(err, data) { + if (err) return cleanup(err); + else callback(err, data); + }); + } + + function finishSinglePart(err, data) { + var httpReq = this.request.httpRequest; + var url = AWS.util.urlFormat(httpReq.endpoint); + data.Location = url.substr(0, url.length - 1) + httpReq.path; + callback(err, data); + } + + function progress(info) { + if (this.operation === 'putObject') { + info.part = 1; + } else { + totalUploadedBytes += info.loaded - this._lastUploadedBytes; + this._lastUploadedBytes = info.loaded; + info = { + loaded: totalUploadedBytes, + total: totalBytes, + part: this.params.PartNumber + }; + } + self.emit('httpUploadProgress', [info]); + } + + function uploadPart(chunk, partNumber) { + var partParams = { + Body: chunk, + ContentLength: AWS.util.string.byteLength(chunk), + PartNumber: partNumber + }; + + var partInfo = {ETag: null, PartNumber: partNumber}; + completeInfo.push(partInfo); + + var req = svc.uploadPart(partParams); + parts[partNumber] = req; + req._lastUploadedBytes = 0; + req.on('httpUploadProgress', progress); + req.send(function(err, data) { + delete parts[partParams.PartNumber]; + activeParts--; + if (err) return cleanup(err); + + partInfo.ETag = data.ETag; + doneParts++; + if (isDoneChunking && doneParts === numParts) { + finishMultiPart(); + } else { + fillQueue(); + } + }); + } + + function nextChunk(chunk) { + var partNumber = ++totalPartNumbers; + if (isDoneChunking && partNumber === 1) { + return svc.putObject({Body: chunk}). + on('httpUploadProgress', progress).send(finishSinglePart); + } + + activeParts++; + if (!svc.config.params.UploadId) { + if (!multipartReq) { // create multipart + multipartReq = svc.createMultipartUpload(); + multipartReq.on('success', function(resp) { + svc.config.params.UploadId = resp.data.UploadId; + multipartReq = null; + }).on('error', cleanup).send(); + } + + // queue chunks to be sent after multipart is done. + multipartReq.on('success', function() { + uploadPart(chunk, partNumber); + }); + } else { // multipart is created, just send + uploadPart(chunk, partNumber); + } + } + + var partPos = 0; + var partBuffer = null; + var queueFillFns = { + stream: function() { + if (activeParts >= maxQueue) return; + + var buf = body.read(partSize - partBuffer.length) || + body.read(); + if (buf) { + partBuffer = AWS.util.Buffer.concat([partBuffer, buf]); + totalChunkedBytes += buf.length; + } + + if (partBuffer.length >= partSize) { + nextChunk(partBuffer.slice(0, partSize)); + partBuffer = partBuffer.slice(partSize); + } else if (isDoneChunking && partBuffer.length > 0) { + totalBytes = totalChunkedBytes; + nextChunk(partBuffer); + partBuffer = new AWS.util.Buffer(0); + } + + body.read(0); + }, + + buffer: function() { + var bodyLen = AWS.util.string.byteLength(body); + while (activeParts < maxQueue && partPos < bodyLen) { + var buf = body.slice(partPos, partPos + partSize); + partPos += partSize; + + if (AWS.util.string.byteLength(buf) < partSize) { + isDoneChunking = true; + numParts = totalPartNumbers + 1; + } + nextChunk(buf); + } + } + }; + + function configure(opts) { + opts = opts || {}; + if (opts.maxQueue) maxQueue = opts.maxQueue; + if (opts.partSize) partSize = opts.partSize; + } + + function send(params, cb) { + if (arguments.length === 1 && typeof params === 'function') { + cb = params; params = null; + } + params = params || {}; + callback = cb || function() {}; + + // bind parameters to new service object + if (!svc) { + svc = new AWS.S3({params: params}); + } else { + svc = new svc.constructor.__super__(AWS.util.copy(svc.config)); + svc.config.params = AWS.util.merge(svc.config.params || {}, params); + } + + configure(options); + body = svc.config.params.Body; + if (!body) throw new Error('params.Body is required'); + if (typeof body === 'string') body = new AWS.util.Buffer(body); + + if (partSize < minMultipartSize) { + throw new Error('partSize must be greater than ' + minMultipartSize); + } + + // try to get totalBytes + try { + totalBytes = AWS.util.string.byteLength(body); + } catch (e) { } + + // try to adjust partSize if we know payload length + if (totalBytes) { + var newPartSize = Math.ceil(totalBytes / maxTotalParts); + if (newPartSize > partSize) partSize = newPartSize; + } else { + totalBytes = undefined; + } + + var runFill = true; + if (typeof body.slice === 'function') { + fillQueue = queueFillFns.buffer; + } else if (AWS.util.isNode()) { + var Stream = AWS.util.nodeRequire('stream').Stream; + if (body instanceof Stream) { + runFill = false; + fillQueue = queueFillFns.stream; + partBuffer = new AWS.util.Buffer(0); + body.on('readable', fillQueue).on('end', function() { + isDoneChunking = true; + numParts = totalPartNumbers + 1; + fillQueue(); + }); + } + } + if (runFill) fillQueue(); + } + + self.configure = configure; + self.send = send; +} + +AWS.util.mixin(ManagedUpload, AWS.SequentialExecutor); +module.exports = ManagedUpload; diff --git a/lib/services/s3.js b/lib/services/s3.js index 900df62f00..526b59f430 100644 --- a/lib/services/s3.js +++ b/lib/services/s3.js @@ -384,3 +384,13 @@ AWS.util.update(AWS.S3.prototype, { return this.makeRequest('createBucket', params, callback); } }); + +// Pull in managed upload extension +AWS.S3.ManagedUpload = AWS.util.nodeRequire('../s3/managed_upload'); +AWS.S3.prototype.upload = function upload(params, callback) { + if (typeof params !== 'function' && typeof callback !== 'function') { + return new AWS.S3.ManagedUpload(this, params); + } else { + new AWS.S3.ManagedUpload(this).send(params, callback); + } +}; From a28d900ee43ba9c9e60ab8b57a17b1ddf3a9e501 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Wed, 1 Oct 2014 15:19:41 -0700 Subject: [PATCH 02/17] Use generalized require() for browser support. --- lib/services/s3.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/services/s3.js b/lib/services/s3.js index 526b59f430..3ecc56b99f 100644 --- a/lib/services/s3.js +++ b/lib/services/s3.js @@ -386,7 +386,7 @@ AWS.util.update(AWS.S3.prototype, { }); // Pull in managed upload extension -AWS.S3.ManagedUpload = AWS.util.nodeRequire('../s3/managed_upload'); +AWS.S3.ManagedUpload = require('../s3/managed_upload'); AWS.S3.prototype.upload = function upload(params, callback) { if (typeof params !== 'function' && typeof callback !== 'function') { return new AWS.S3.ManagedUpload(this, params); From d44e281f34e89298a74f0e43c2607ba885b074f0 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Sat, 18 Oct 2014 19:48:53 -0700 Subject: [PATCH 03/17] Update upload logic and add tests --- lib/s3/managed_upload.js | 63 +++++--- test/s3/managed_upload.spec.coffee | 223 +++++++++++++++++++++++++++++ 2 files changed, 268 insertions(+), 18 deletions(-) create mode 100644 test/s3/managed_upload.spec.coffee diff --git a/lib/s3/managed_upload.js b/lib/s3/managed_upload.js index f0423a2576..bfbd1c3211 100644 --- a/lib/s3/managed_upload.js +++ b/lib/s3/managed_upload.js @@ -1,15 +1,13 @@ var AWS = require('../core'); -var maxTotalParts = 10000; -var minMultipartSize = 1024 * 1024 * 5; - function ManagedUpload(svc, options) { AWS.SequentialExecutor.call(this); var self = this; var callback, body; var maxQueue = 4; - var partSize = minMultipartSize; + var partSize = ManagedUpload.minPartSize; + var leavePartsOnError = false; var totalChunkedBytes = 0; var totalUploadedBytes = 0; @@ -39,7 +37,9 @@ function ManagedUpload(svc, options) { body.resume(); } - if (svc.config.params.UploadId) svc.abortMultipartUpload().send(); + if (svc.config.params.UploadId && !leavePartsOnError) { + svc.abortMultipartUpload().send(); + } AWS.util.each(parts, function(partNumber, part) { part.removeAllListeners('complete'); @@ -98,6 +98,17 @@ function ManagedUpload(svc, options) { req.send(function(err, data) { delete parts[partParams.PartNumber]; activeParts--; + + if (!err && (!data || !data.ETag)) { + var message = 'No access to ETag property on response.'; + if (AWS.util.isBrowser()) { + message += ' Check CORS configuration to expose ETag header.'; + } + + err = AWS.util.error(new Error(message), { + code: 'ETagMissing', retryable: false + }); + } if (err) return cleanup(err); partInfo.ETag = data.ETag; @@ -110,7 +121,16 @@ function ManagedUpload(svc, options) { }); } + // queue chunks to be sent after multipart is done. + function queueChunks(chunk, partNumber) { + multipartReq.on('success', function() { + uploadPart(chunk, partNumber); + }); + } + function nextChunk(chunk) { + if (failed) return null; + var partNumber = ++totalPartNumbers; if (isDoneChunking && partNumber === 1) { return svc.putObject({Body: chunk}). @@ -119,18 +139,19 @@ function ManagedUpload(svc, options) { activeParts++; if (!svc.config.params.UploadId) { + if (!multipartReq) { // create multipart multipartReq = svc.createMultipartUpload(); multipartReq.on('success', function(resp) { svc.config.params.UploadId = resp.data.UploadId; multipartReq = null; - }).on('error', cleanup).send(); + }); + queueChunks(chunk, partNumber); + multipartReq.on('error', cleanup); + multipartReq.send(); + } else { + queueChunks(chunk, partNumber); } - - // queue chunks to be sent after multipart is done. - multipartReq.on('success', function() { - uploadPart(chunk, partNumber); - }); } else { // multipart is created, just send uploadPart(chunk, partNumber); } @@ -167,7 +188,7 @@ function ManagedUpload(svc, options) { var buf = body.slice(partPos, partPos + partSize); partPos += partSize; - if (AWS.util.string.byteLength(buf) < partSize) { + if (AWS.util.string.byteLength(buf) < partSize || partPos === bodyLen) { isDoneChunking = true; numParts = totalPartNumbers + 1; } @@ -180,6 +201,12 @@ function ManagedUpload(svc, options) { opts = opts || {}; if (opts.maxQueue) maxQueue = opts.maxQueue; if (opts.partSize) partSize = opts.partSize; + if (opts.leavePartsOnError) leavePartsOnError = true; + + if (partSize < ManagedUpload.minPartSize) { + throw new Error('partSize must be greater than ' + + ManagedUpload.minPartSize); + } } function send(params, cb) { @@ -197,15 +224,10 @@ function ManagedUpload(svc, options) { svc.config.params = AWS.util.merge(svc.config.params || {}, params); } - configure(options); body = svc.config.params.Body; if (!body) throw new Error('params.Body is required'); if (typeof body === 'string') body = new AWS.util.Buffer(body); - if (partSize < minMultipartSize) { - throw new Error('partSize must be greater than ' + minMultipartSize); - } - // try to get totalBytes try { totalBytes = AWS.util.string.byteLength(body); @@ -213,7 +235,7 @@ function ManagedUpload(svc, options) { // try to adjust partSize if we know payload length if (totalBytes) { - var newPartSize = Math.ceil(totalBytes / maxTotalParts); + var newPartSize = Math.ceil(totalBytes / ManagedUpload.maxTotalParts); if (newPartSize > partSize) partSize = newPartSize; } else { totalBytes = undefined; @@ -238,9 +260,14 @@ function ManagedUpload(svc, options) { if (runFill) fillQueue(); } + configure(options); + self.configure = configure; self.send = send; } +ManagedUpload.minPartSize = 1024 * 1024 * 5; +ManagedUpload.maxTotalParts = 10000; + AWS.util.mixin(ManagedUpload, AWS.SequentialExecutor); module.exports = ManagedUpload; diff --git a/test/s3/managed_upload.spec.coffee b/test/s3/managed_upload.spec.coffee new file mode 100644 index 0000000000..069a7f2698 --- /dev/null +++ b/test/s3/managed_upload.spec.coffee @@ -0,0 +1,223 @@ +helpers = require('../helpers') +AWS = helpers.AWS + +body = (size) -> + if AWS.util.isNode() || navigator && navigator.userAgent.match(/phantomjs/i) + new AWS.util.Buffer(size) + else + new Blob(new Array(size + 1).map (i) -> '.') + +smallbody = body(5) +bigbody = body(36) + +describe 'AWS.S3.ManagedUpload', -> + s3 = new AWS.S3 maxRetries: 0, params: { Bucket: 'bucket', Key: 'key' } + [err, data, upload, minPartSize] = [] + beforeEach -> + minPartSize = AWS.S3.ManagedUpload.minPartSize + AWS.S3.ManagedUpload.minPartSize = 10 + [err, data] = [] + helpers.spyOn(AWS.S3.prototype, 'extractError').andReturn(->) + upload = new AWS.S3.ManagedUpload(s3) + + afterEach -> + AWS.S3.ManagedUpload.minPartSize = minPartSize + + send = (params, cb) -> upload.send params, (e, d) -> + [err,data] = [e,d] + cb() if cb + + describe 'send', -> + it 'fails if Body is not passed', -> + expect(-> send()).to.throw('params.Body is required') + + it 'uploads a single part if size is less than min multipart size', -> + reqs = helpers.mockResponses [ + data: ETag: 'ETAG' + ] + + send Body: smallbody, ContentEncoding: 'encoding' + expect(err).not.to.exist + expect(data.ETag).to.equal('ETAG') + expect(data.Location).to.equal('https://bucket.s3-mock-region.amazonaws.com/key') + expect(helpers.operationsForRequests(reqs)).to.eql ['s3.putObject'] + expect(reqs[0].params.ContentEncoding).to.equal('encoding') + + it 'uploads multipart if size is greater than min multipart size', -> + reqs = helpers.mockResponses [ + { data: UploadId: 'uploadId' } + { data: ETag: 'ETAG1' } + { data: ETag: 'ETAG2' } + { data: ETag: 'ETAG3' } + { data: ETag: 'ETAG4' } + { data: ETag: 'FINAL_ETAG', Location: 'FINAL_LOCATION' } + ] + + send Body: bigbody, ContentEncoding: 'encoding' + expect(helpers.operationsForRequests(reqs)).to.eql [ + 's3.createMultipartUpload' + 's3.uploadPart' + 's3.uploadPart' + 's3.uploadPart' + 's3.uploadPart' + 's3.completeMultipartUpload' + ] + expect(err).not.to.exist + expect(data.ETag).to.equal('FINAL_ETAG') + expect(data.Location).to.equal('FINAL_LOCATION') + expect(reqs[0].params).to.eql(Bucket: 'bucket', Key: 'key', ContentEncoding: 'encoding') + expect(reqs[1].params.ContentLength).to.equal(10) + expect(reqs[1].params.UploadId).to.equal('uploadId') + expect(reqs[2].params.UploadId).to.equal('uploadId') + expect(reqs[2].params.ContentLength).to.equal(10) + expect(reqs[3].params.UploadId).to.equal('uploadId') + expect(reqs[3].params.ContentLength).to.equal(10) + expect(reqs[4].params.UploadId).to.equal('uploadId') + expect(reqs[4].params.ContentLength).to.equal(6) + expect(reqs[5].params.UploadId).to.equal('uploadId') + expect(reqs[5].params.MultipartUpload.Parts).to.eql [ + { ETag: 'ETAG1', PartNumber: 1 } + { ETag: 'ETAG2', PartNumber: 2 } + { ETag: 'ETAG3', PartNumber: 3 } + { ETag: 'ETAG4', PartNumber: 4 } + ] + + it 'aborts if ETag is not in response', -> + helpers.spyOn(AWS.util, 'isBrowser').andReturn true + reqs = helpers.mockResponses [ + { data: UploadId: 'uploadId' } + { data: {} } + { data: {} } + ] + + send Body: bigbody + expect(helpers.operationsForRequests(reqs)).to.eql [ + 's3.createMultipartUpload' + 's3.uploadPart' + 's3.abortMultipartUpload' + ] + expect(err).to.exist + expect(err.message).to.equal('No access to ETag property on response. Check CORS configuration to expose ETag header.') + + it 'allows changing part size', -> + reqs = helpers.mockResponses [ + { data: UploadId: 'uploadId' } + { data: ETag: 'ETAG1' } + { data: ETag: 'ETAG2' } + { data: ETag: 'FINAL_ETAG', Location: 'FINAL_LOCATION' } + ] + + size = 18 + opts = partSize: size, maxQueue: 1 + upload = new AWS.S3.ManagedUpload(s3, opts) + send Body: bigbody + expect(helpers.operationsForRequests(reqs)).to.eql [ + 's3.createMultipartUpload' + 's3.uploadPart' + 's3.uploadPart' + 's3.completeMultipartUpload' + ] + expect(err).not.to.exist + expect(data.ETag).to.equal('FINAL_ETAG') + expect(data.Location).to.equal('FINAL_LOCATION') + expect(reqs[1].params.ContentLength).to.equal(size) + expect(reqs[2].params.ContentLength).to.equal(size) + + it 'errors if partSize is smaller than minPartSize', -> + expect(-> new AWS.S3.ManagedUpload(s3, partSize: 5)).to.throw( + 'partSize must be greater than 10') + + it 'aborts if uploadPart fails', -> + reqs = helpers.mockResponses [ + { data: UploadId: 'uploadId' } + { data: ETag: 'ETAG1' } + { error: { code: 'UploadPartFailed' }, data: null } + { data: {}, error: null } + ] + + upload = new AWS.S3.ManagedUpload(s3, maxQueue: 1) + send Body: bigbody + expect(helpers.operationsForRequests(reqs)).to.eql [ + 's3.createMultipartUpload' + 's3.uploadPart' + 's3.uploadPart' + 's3.abortMultipartUpload' + ] + expect(err).to.exist + expect(data).not.to.exist + expect(reqs[3].params.UploadId).to.equal('uploadId') + + it 'aborts if complete call fails', -> + reqs = helpers.mockResponses [ + { data: UploadId: 'uploadId' } + { data: ETag: 'ETAG1' } + { data: ETag: 'ETAG2' } + { data: ETag: 'ETAG3' } + { data: ETag: 'ETAG4' } + { error: { code: 'CompleteFailed' }, data: null } + ] + + send Body: bigbody + expect(helpers.operationsForRequests(reqs)).to.eql [ + 's3.createMultipartUpload' + 's3.uploadPart' + 's3.uploadPart' + 's3.uploadPart' + 's3.uploadPart' + 's3.completeMultipartUpload' + 's3.abortMultipartUpload' + ] + expect(err).to.exist + expect(err.code).to.equal('CompleteFailed') + expect(data).not.to.exist + + it 'leaves parts if leavePartsOnError is set', -> + reqs = helpers.mockResponses [ + { data: UploadId: 'uploadId' } + { data: ETag: 'ETAG1' } + { error: { code: 'UploadPartFailed' }, data: null } + { data: {}, error: null } + ] + + upload = new AWS.S3.ManagedUpload(s3, maxQueue: 1, leavePartsOnError: true) + send Body: bigbody + expect(helpers.operationsForRequests(reqs)).to.eql [ + 's3.createMultipartUpload' + 's3.uploadPart' + 's3.uploadPart' + ] + expect(err).to.exist + expect(err.code).to.equal('UploadPartFailed') + expect(data).not.to.exist + + if AWS.util.isNode() + describe 'streaming', -> + it 'sends a small stream in a single putObject', (done) -> + stream = AWS.util.buffer.toStream(smallbody) + reqs = helpers.mockResponses [data: ETag: 'ETAG'] + upload.send Body: stream, -> + expect(helpers.operationsForRequests(reqs)).to.eql ['s3.putObject'] + expect(err).not.to.exist + done() + + it 'chunks a large stream in minPartSize chunks', (done) -> + stream = AWS.util.buffer.toStream(bigbody) + reqs = helpers.mockResponses [ + { data: UploadId: 'uploadId' } + { data: ETag: 'ETAG1' } + { data: ETag: 'ETAG2' } + { data: ETag: 'ETAG3' } + { data: ETag: 'ETAG4' } + { data: ETag: 'FINAL_ETAG' } + ] + upload.send Body: stream, -> + expect(helpers.operationsForRequests(reqs)).to.eql [ + 's3.createMultipartUpload' + 's3.uploadPart' + 's3.uploadPart' + 's3.uploadPart' + 's3.uploadPart' + 's3.completeMultipartUpload' + ] + expect(err).not.to.exist + done() From 8231f03841a8c7904d1e4d83090626e00a812f36 Mon Sep 17 00:00:00 2001 From: "Bernhard K. Weisshuhn" Date: Sat, 8 Nov 2014 19:17:46 +0100 Subject: [PATCH 04/17] pass error to callback early in finishSinglePart otherwise, setting `Location` on `data` will raise. --- lib/s3/managed_upload.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/s3/managed_upload.js b/lib/s3/managed_upload.js index bfbd1c3211..741bb52831 100644 --- a/lib/s3/managed_upload.js +++ b/lib/s3/managed_upload.js @@ -62,6 +62,7 @@ function ManagedUpload(svc, options) { function finishSinglePart(err, data) { var httpReq = this.request.httpRequest; var url = AWS.util.urlFormat(httpReq.endpoint); + if (err) return callback(err); data.Location = url.substr(0, url.length - 1) + httpReq.path; callback(err, data); } From 62aa4e7acb1281cf9bf7167047aa6912841c77db Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Fri, 28 Nov 2014 16:14:56 -0800 Subject: [PATCH 05/17] Refactor AWS.S3.ManagedUpload class --- features/s3/managed_upload.feature | 11 + .../s3/step_definitions/managed_upload.js | 53 ++ lib/s3/managed_upload.js | 565 +++++++++++------- lib/services/s3.js | 2 +- test/s3/managed_upload.spec.coffee | 30 +- 5 files changed, 413 insertions(+), 248 deletions(-) create mode 100644 features/s3/managed_upload.feature create mode 100644 features/s3/step_definitions/managed_upload.js diff --git a/features/s3/managed_upload.feature b/features/s3/managed_upload.feature new file mode 100644 index 0000000000..75912a49a3 --- /dev/null +++ b/features/s3/managed_upload.feature @@ -0,0 +1,11 @@ +# language: en +@s3 @managed_upload +Feature: S3 Managed Upload + + Scenario: Uploading a large buffer + When I use S3 managed upload to upload a large buffer + Then the multipart upload should succeed + + Scenario: Uploading a large buffer + When I use S3 managed upload to upload a large stream + Then the multipart upload should succeed diff --git a/features/s3/step_definitions/managed_upload.js b/features/s3/step_definitions/managed_upload.js new file mode 100644 index 0000000000..25e955db71 --- /dev/null +++ b/features/s3/step_definitions/managed_upload.js @@ -0,0 +1,53 @@ +module.exports = function () { + this.Before("@s3", "@managed_upload", function (callback) { + + // execute only once + if (this.mgrBucket) { + callback(); + return; + } + + this.mgrBucket = this.uniqueName('aws-sdk-js-integration'); + this.s3.createBucket({Bucket:this.mgrBucket}, callback); + }); + + this.AfterAll(function (callback) { + var self = this; + this.s3.listObjects({Bucket:this.mgrBucket}, function(err, data) { + data.Contents.forEach(function(object) { + self.s3.deleteObject({Bucket:self.mgrBucket,Key:object.Key}).send(); + }); + setTimeout(function() { + self.s3.deleteBucket({Bucket:self.mgrBucket}, callback); + }, 1000); + }); + }); + + this.When(/^I use S3 managed upload to upload a large buffer$/, function (callback) { + var self = this; + var buffer = new Buffer(1024 * 1024 * 12); + var params = {Bucket: self.mgrBucket, Key: 'largebuffer', Body: buffer}; + self.s3.upload().send(params, function (err, data) { + self.error = err; + self.data = data; + callback(); + }); + }); + + this.Then(/^the multipart upload should succeed$/, function (callback) { + this.assert.equal(this.error, null); + this.assert.equal(typeof this.data.Location, 'string'); + callback(); + }); + + this.When(/^I use S3 managed upload to upload a large stream$/, function (callback) { + var self = this; + var stream = this.AWS.util.buffer.toStream(new Buffer(1024 * 1024 * 12)); + var params = {Bucket: self.mgrBucket, Key: 'largestream', Body: stream}; + self.s3.upload().send(params, function (err, data) { + self.error = err; + self.data = data; + callback(); + }); + }); +}; diff --git a/lib/s3/managed_upload.js b/lib/s3/managed_upload.js index 741bb52831..e78b7beb9e 100644 --- a/lib/s3/managed_upload.js +++ b/lib/s3/managed_upload.js @@ -1,88 +1,275 @@ var AWS = require('../core'); +var byteLength = AWS.util.string.byteLength; + +AWS.S3.ManagedUpload = AWS.util.inherit({ + constructor: function ManagedUpload(svc, options) { + var self = this; + AWS.SequentialExecutor.call(self); + self.service = svc; + self.configure(options); + self.body = null; + self.callback = null; + self.parts = {}; + self.completeInfo = []; + self.fillQueue = function() { + self.callback(new Error('Unsupported body payload ' + typeof(self.body))); + }; + }, -function ManagedUpload(svc, options) { - AWS.SequentialExecutor.call(this); - - var self = this; - var callback, body; - var maxQueue = 4; - var partSize = ManagedUpload.minPartSize; - var leavePartsOnError = false; - - var totalChunkedBytes = 0; - var totalUploadedBytes = 0; - var totalBytes; - var numParts = 0; - var totalPartNumbers = 0; - var activeParts = 0; - var doneParts = 0; - var parts = {}; - var completeInfo = []; - var failed = false; - var isDoneChunking = false; - var multipartReq = null; - - var fillQueue = function() { - callback(new Error('Unsupported body payload ' + typeof(body))); - }; - - function cleanup(err) { - if (failed) return; + configure: function configure(options) { + options = options || {}; + this.partSize = this.minPartSize; - // clean up stream - if (typeof body.removeAllListeners === 'function' && - typeof body.resume === 'function') { - body.removeAllListeners('readable'); - body.removeAllListeners('end'); - body.resume(); + if (options.maxQueue) this.maxQueue = options.maxQueue; + if (options.partSize) this.partSize = options.partSize; + if (options.leavePartsOnError) this.leavePartsOnError = true; + + if (this.partSize < this.minPartSize) { + throw new Error('partSize must be greater than ' + + this.minPartSize); } + }, - if (svc.config.params.UploadId && !leavePartsOnError) { - svc.abortMultipartUpload().send(); + leavePartsOnError: false, + maxQueue: 4, + partSize: null, + minPartSize: 1024 * 1024 * 5, + maxTotalParts: 10000, + + send: function(params, cb) { + var self = this; + if (arguments.length === 1 && typeof params === 'function') { + cb = params; params = null; } + params = params || {}; + self.callback = cb || function(err) { if (err) throw err; }; - AWS.util.each(parts, function(partNumber, part) { - part.removeAllListeners('complete'); - part.abort(); - }); + self.bindServiceObject(params); + self.validateBody(); + self.adjustTotalBytes(); - parts = {}; - callback(err); - failed = true; - } + var runFill = true; + if (typeof self.body.slice === 'function') { + self.fillQueue = self.fillBuffer; + } else if (AWS.util.isNode()) { + var Stream = AWS.util.nodeRequire('stream').Stream; + if (self.body instanceof Stream) { + runFill = false; + self.fillQueue = self.fillStream; + self.partBuffer = new AWS.util.Buffer(0); + self.body. + on('readable', function() { self.fillQueue(); }). + on('end', function() { + self.isDoneChunking = true; + self.numParts = self.totalPartNumbers + 1; + self.fillQueue.call(self); + }); + } + } - function finishMultiPart() { - var completeParams = { MultipartUpload: { Parts: completeInfo } }; - svc.completeMultipartUpload(completeParams, function(err, data) { - if (err) return cleanup(err); - else callback(err, data); - }); - } + if (runFill) self.fillQueue.call(self); + }, + + /** + * @api private + */ + validateBody: function validateBody() { + var self = this; + self.body = self.service.config.params.Body; + if (!self.body) throw new Error('params.Body is required'); + if (typeof self.body === 'string') { + self.body = new AWS.util.Buffer(self.body); + } + }, - function finishSinglePart(err, data) { - var httpReq = this.request.httpRequest; - var url = AWS.util.urlFormat(httpReq.endpoint); - if (err) return callback(err); - data.Location = url.substr(0, url.length - 1) + httpReq.path; - callback(err, data); - } + /** + * @api private + */ + bindServiceObject: function bindServiceObject(params) { + var self = this; - function progress(info) { - if (this.operation === 'putObject') { - info.part = 1; + // bind parameters to new service object + if (!self.service) { + self.service = new AWS.S3({params: params}); } else { - totalUploadedBytes += info.loaded - this._lastUploadedBytes; - this._lastUploadedBytes = info.loaded; - info = { - loaded: totalUploadedBytes, - total: totalBytes, - part: this.params.PartNumber - }; + var config = AWS.util.copy(self.service.config); + self.service = new self.service.constructor.__super__(config); + self.service.config.params = + AWS.util.merge(self.service.config.params || {}, params); } - self.emit('httpUploadProgress', [info]); - } + }, + + /** + * @api private + */ + adjustTotalBytes: function adjustTotalBytes() { + var self = this; + try { // try to get totalBytes + self.totalBytes = byteLength(self.body); + } catch (e) { } + + // try to adjust partSize if we know payload length + if (self.totalBytes) { + var newPartSize = Math.ceil(self.totalBytes / self.maxTotalParts); + if (newPartSize > self.partSize) self.partSize = newPartSize; + } else { + self.totalBytes = undefined; + } + }, + + /** + * @api private + */ + isDoneChunking: false, + + /** + * @api private + */ + partPos: 0, + + /** + * @api private + */ + totalChunkedBytes: 0, + + /** + * @api private + */ + totalUploadedBytes: 0, + + /** + * @api private + */ + totalBytes: undefined, + + /** + * @api private + */ + numParts: 0, + + /** + * @api private + */ + totalPartNumbers: 0, + + /** + * @api private + */ + activeParts: 0, + + /** + * @api private + */ + doneParts: 0, + + /** + * @api private + */ + parts: null, + + /** + * @api private + */ + completeInfo: null, + + /** + * @api private + */ + failed: false, + + /** + * @api private + */ + multipartReq: null, + + /** + * @api private + */ + partBuffer: null, + + /** + * @api private + */ + fillBuffer: function fillBuffer() { + var self = this; + var bodyLen = byteLength(self.body); + while (self.activeParts < self.maxQueue && self.partPos < bodyLen) { + var buf = self.body.slice(self.partPos, self.partPos + self.partSize); + self.partPos += self.partSize; + + if (byteLength(buf) < self.partSize || self.partPos === bodyLen) { + self.isDoneChunking = true; + self.numParts = self.totalPartNumbers + 1; + } + self.nextChunk(buf); + } + }, + + /** + * @api private + */ + fillStream: function fillStream() { + var self = this; + if (self.activeParts >= self.maxQueue) return; + + var buf = self.body.read(self.partSize - self.partBuffer.length) || + self.body.read(); + if (buf) { + self.partBuffer = AWS.util.Buffer.concat([self.partBuffer, buf]); + self.totalChunkedBytes += buf.length; + } + + if (self.partBuffer.length >= self.partSize) { + self.partBuffer = self.partBuffer.slice(self.partSize); + } else if (self.isDoneChunking && self.partBuffer.length > 0) { + self.totalBytes = self.totalChunkedBytes; + self.nextChunk(self.partBuffer); + self.partBuffer = new AWS.util.Buffer(0); + } + + self.body.read(0); + }, + + /** + * @api private + */ + nextChunk: function nextChunk(chunk) { + var self = this; + if (self.failed) return null; + + var partNumber = ++self.totalPartNumbers; + if (self.isDoneChunking && partNumber === 1) { + var req = self.service.putObject({Body: chunk}); + req._managedUpload = self; + req.on('httpUploadProgress', self.progress).send(self.finishSinglePart); + return null; + } + + self.activeParts++; + if (!self.service.config.params.UploadId) { + + if (!self.multipartReq) { // create multipart + self.multipartReq = self.service.createMultipartUpload(); + self.multipartReq.on('success', function(resp) { + self.service.config.params.UploadId = resp.data.UploadId; + self.multipartReq = null; + }); + self.queueChunks(chunk, partNumber); + self.multipartReq.on('error', self.cleanup); + self.multipartReq.send(); + } else { + self.queueChunks(chunk, partNumber); + } + } else { // multipart is created, just send + self.uploadPart(chunk, partNumber); + } + }, - function uploadPart(chunk, partNumber) { + /** + * @api private + */ + uploadPart: function uploadPart(chunk, partNumber) { + var self = this; var partParams = { Body: chunk, ContentLength: AWS.util.string.byteLength(chunk), @@ -90,15 +277,16 @@ function ManagedUpload(svc, options) { }; var partInfo = {ETag: null, PartNumber: partNumber}; - completeInfo.push(partInfo); + self.completeInfo.push(partInfo); - var req = svc.uploadPart(partParams); - parts[partNumber] = req; + var req = self.service.uploadPart(partParams); + self.parts[partNumber] = req; req._lastUploadedBytes = 0; - req.on('httpUploadProgress', progress); + req._managedUpload = self; + req.on('httpUploadProgress', self.progress); req.send(function(err, data) { - delete parts[partParams.PartNumber]; - activeParts--; + delete self.parts[partParams.PartNumber]; + self.activeParts--; if (!err && (!data || !data.ETag)) { var message = 'No access to ETag property on response.'; @@ -110,165 +298,100 @@ function ManagedUpload(svc, options) { code: 'ETagMissing', retryable: false }); } - if (err) return cleanup(err); + if (err) return self.cleanup(err); partInfo.ETag = data.ETag; - doneParts++; - if (isDoneChunking && doneParts === numParts) { - finishMultiPart(); + self.doneParts++; + if (self.isDoneChunking && self.doneParts === self.numParts) { + self.finishMultiPart(); } else { - fillQueue(); + self.fillQueue.call(self); } }); - } - - // queue chunks to be sent after multipart is done. - function queueChunks(chunk, partNumber) { - multipartReq.on('success', function() { - uploadPart(chunk, partNumber); + }, + + /** + * @api private + */ + queueChunks: function queueChunks(chunk, partNumber) { + var self = this; + self.multipartReq.on('success', function() { + self.uploadPart(chunk, partNumber); }); - } - - function nextChunk(chunk) { - if (failed) return null; + }, - var partNumber = ++totalPartNumbers; - if (isDoneChunking && partNumber === 1) { - return svc.putObject({Body: chunk}). - on('httpUploadProgress', progress).send(finishSinglePart); - } - - activeParts++; - if (!svc.config.params.UploadId) { - - if (!multipartReq) { // create multipart - multipartReq = svc.createMultipartUpload(); - multipartReq.on('success', function(resp) { - svc.config.params.UploadId = resp.data.UploadId; - multipartReq = null; - }); - queueChunks(chunk, partNumber); - multipartReq.on('error', cleanup); - multipartReq.send(); - } else { - queueChunks(chunk, partNumber); - } - } else { // multipart is created, just send - uploadPart(chunk, partNumber); - } - } - - var partPos = 0; - var partBuffer = null; - var queueFillFns = { - stream: function() { - if (activeParts >= maxQueue) return; - - var buf = body.read(partSize - partBuffer.length) || - body.read(); - if (buf) { - partBuffer = AWS.util.Buffer.concat([partBuffer, buf]); - totalChunkedBytes += buf.length; - } - - if (partBuffer.length >= partSize) { - nextChunk(partBuffer.slice(0, partSize)); - partBuffer = partBuffer.slice(partSize); - } else if (isDoneChunking && partBuffer.length > 0) { - totalBytes = totalChunkedBytes; - nextChunk(partBuffer); - partBuffer = new AWS.util.Buffer(0); - } - - body.read(0); - }, + /** + * @api private + */ + cleanup: function cleanup(err) { + var self = this; + if (self.failed) return; - buffer: function() { - var bodyLen = AWS.util.string.byteLength(body); - while (activeParts < maxQueue && partPos < bodyLen) { - var buf = body.slice(partPos, partPos + partSize); - partPos += partSize; - - if (AWS.util.string.byteLength(buf) < partSize || partPos === bodyLen) { - isDoneChunking = true; - numParts = totalPartNumbers + 1; - } - nextChunk(buf); - } - } - }; - - function configure(opts) { - opts = opts || {}; - if (opts.maxQueue) maxQueue = opts.maxQueue; - if (opts.partSize) partSize = opts.partSize; - if (opts.leavePartsOnError) leavePartsOnError = true; - - if (partSize < ManagedUpload.minPartSize) { - throw new Error('partSize must be greater than ' + - ManagedUpload.minPartSize); - } - } - - function send(params, cb) { - if (arguments.length === 1 && typeof params === 'function') { - cb = params; params = null; + // clean up stream + if (typeof self.body.removeAllListeners === 'function' && + typeof self.body.resume === 'function') { + self.body.removeAllListeners('readable'); + self.body.removeAllListeners('end'); + self.body.resume(); } - params = params || {}; - callback = cb || function() {}; - // bind parameters to new service object - if (!svc) { - svc = new AWS.S3({params: params}); - } else { - svc = new svc.constructor.__super__(AWS.util.copy(svc.config)); - svc.config.params = AWS.util.merge(svc.config.params || {}, params); + if (self.service.config.params.UploadId && !self.leavePartsOnError) { + self.service.abortMultipartUpload().send(); } - body = svc.config.params.Body; - if (!body) throw new Error('params.Body is required'); - if (typeof body === 'string') body = new AWS.util.Buffer(body); + AWS.util.each(self.parts, function(partNumber, part) { + part.removeAllListeners('complete'); + part.abort(); + }); - // try to get totalBytes - try { - totalBytes = AWS.util.string.byteLength(body); - } catch (e) { } + self.parts = {}; + self.callback(err); + self.failed = true; + }, + + /** + * @api private + */ + finishMultiPart: function finishMultiPart() { + var self = this; + var completeParams = { MultipartUpload: { Parts: self.completeInfo } }; + self.service.completeMultipartUpload(completeParams, function(err, data) { + if (err) return self.cleanup(err); + else self.callback(err, data); + }); + }, - // try to adjust partSize if we know payload length - if (totalBytes) { - var newPartSize = Math.ceil(totalBytes / ManagedUpload.maxTotalParts); - if (newPartSize > partSize) partSize = newPartSize; + /** + * @api private + */ + finishSinglePart: function finishSinglePart(err, data) { + var upload = this.request._managedUpload; + var httpReq = this.request.httpRequest; + var url = AWS.util.urlFormat(httpReq.endpoint); + if (err) return upload.callback(err); + data.Location = url.substr(0, url.length - 1) + httpReq.path; + upload.callback(err, data); + }, + + /** + * @api private + */ + progress: function progress(info) { + var upload = this._managedUpload; + if (this.operation === 'putObject') { + info.part = 1; } else { - totalBytes = undefined; - } - - var runFill = true; - if (typeof body.slice === 'function') { - fillQueue = queueFillFns.buffer; - } else if (AWS.util.isNode()) { - var Stream = AWS.util.nodeRequire('stream').Stream; - if (body instanceof Stream) { - runFill = false; - fillQueue = queueFillFns.stream; - partBuffer = new AWS.util.Buffer(0); - body.on('readable', fillQueue).on('end', function() { - isDoneChunking = true; - numParts = totalPartNumbers + 1; - fillQueue(); - }); - } + upload.totalUploadedBytes += info.loaded - this._lastUploadedBytes; + this._lastUploadedBytes = info.loaded; + info = { + loaded: upload.totalUploadedBytes, + total: upload.totalBytes, + part: this.params.PartNumber + }; } - if (runFill) fillQueue(); + upload.emit('httpUploadProgress', [info]); } +}); - configure(options); - - self.configure = configure; - self.send = send; -} - -ManagedUpload.minPartSize = 1024 * 1024 * 5; -ManagedUpload.maxTotalParts = 10000; - -AWS.util.mixin(ManagedUpload, AWS.SequentialExecutor); -module.exports = ManagedUpload; +AWS.util.mixin(AWS.S3.ManagedUpload, AWS.SequentialExecutor); +module.exports = AWS.S3.ManagedUpload; diff --git a/lib/services/s3.js b/lib/services/s3.js index 7caed8a30e..2898d96fbc 100644 --- a/lib/services/s3.js +++ b/lib/services/s3.js @@ -399,7 +399,7 @@ AWS.util.update(AWS.S3.prototype, { }); // Pull in managed upload extension -AWS.S3.ManagedUpload = require('../s3/managed_upload'); +require('../s3/managed_upload'); AWS.S3.prototype.upload = function upload(params, callback) { if (typeof params !== 'function' && typeof callback !== 'function') { return new AWS.S3.ManagedUpload(this, params); diff --git a/test/s3/managed_upload.spec.coffee b/test/s3/managed_upload.spec.coffee index 069a7f2698..bc677fbeb8 100644 --- a/test/s3/managed_upload.spec.coffee +++ b/test/s3/managed_upload.spec.coffee @@ -5,7 +5,7 @@ body = (size) -> if AWS.util.isNode() || navigator && navigator.userAgent.match(/phantomjs/i) new AWS.util.Buffer(size) else - new Blob(new Array(size + 1).map (i) -> '.') + new Blob(new Array(size).map (i) -> '.') smallbody = body(5) bigbody = body(36) @@ -15,13 +15,13 @@ describe 'AWS.S3.ManagedUpload', -> [err, data, upload, minPartSize] = [] beforeEach -> minPartSize = AWS.S3.ManagedUpload.minPartSize - AWS.S3.ManagedUpload.minPartSize = 10 + AWS.S3.ManagedUpload.prototype.minPartSize = 10 [err, data] = [] helpers.spyOn(AWS.S3.prototype, 'extractError').andReturn(->) upload = new AWS.S3.ManagedUpload(s3) afterEach -> - AWS.S3.ManagedUpload.minPartSize = minPartSize + AWS.S3.ManagedUpload.prototype.minPartSize = minPartSize send = (params, cb) -> upload.send params, (e, d) -> [err,data] = [e,d] @@ -39,7 +39,7 @@ describe 'AWS.S3.ManagedUpload', -> send Body: smallbody, ContentEncoding: 'encoding' expect(err).not.to.exist expect(data.ETag).to.equal('ETAG') - expect(data.Location).to.equal('https://bucket.s3-mock-region.amazonaws.com/key') + expect(data.Location).to.equal('https://bucket.s3.mock-region.amazonaws.com/key') expect(helpers.operationsForRequests(reqs)).to.eql ['s3.putObject'] expect(reqs[0].params.ContentEncoding).to.equal('encoding') @@ -199,25 +199,3 @@ describe 'AWS.S3.ManagedUpload', -> expect(helpers.operationsForRequests(reqs)).to.eql ['s3.putObject'] expect(err).not.to.exist done() - - it 'chunks a large stream in minPartSize chunks', (done) -> - stream = AWS.util.buffer.toStream(bigbody) - reqs = helpers.mockResponses [ - { data: UploadId: 'uploadId' } - { data: ETag: 'ETAG1' } - { data: ETag: 'ETAG2' } - { data: ETag: 'ETAG3' } - { data: ETag: 'ETAG4' } - { data: ETag: 'FINAL_ETAG' } - ] - upload.send Body: stream, -> - expect(helpers.operationsForRequests(reqs)).to.eql [ - 's3.createMultipartUpload' - 's3.uploadPart' - 's3.uploadPart' - 's3.uploadPart' - 's3.uploadPart' - 's3.completeMultipartUpload' - ] - expect(err).not.to.exist - done() From aa64aae27c3514b2f3bf556fd7dc8e6a15a10140 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Fri, 28 Nov 2014 16:53:51 -0800 Subject: [PATCH 06/17] Correct linting error --- lib/s3/managed_upload.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/s3/managed_upload.js b/lib/s3/managed_upload.js index e78b7beb9e..5aec774508 100644 --- a/lib/s3/managed_upload.js +++ b/lib/s3/managed_upload.js @@ -12,7 +12,7 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ self.parts = {}; self.completeInfo = []; self.fillQueue = function() { - self.callback(new Error('Unsupported body payload ' + typeof(self.body))); + self.callback(new Error('Unsupported body payload ' + typeof self.body)); }; }, From 67bfa78b820b5740e06a8fb62e89534d6cc62da6 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Tue, 2 Dec 2014 23:05:53 -0800 Subject: [PATCH 07/17] Rename maxQueue to queueSize --- lib/s3/managed_upload.js | 8 ++++---- test/s3/managed_upload.spec.coffee | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/s3/managed_upload.js b/lib/s3/managed_upload.js index 5aec774508..7a4119ad63 100644 --- a/lib/s3/managed_upload.js +++ b/lib/s3/managed_upload.js @@ -20,7 +20,7 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ options = options || {}; this.partSize = this.minPartSize; - if (options.maxQueue) this.maxQueue = options.maxQueue; + if (options.queueSize) this.queueSize = options.queueSize; if (options.partSize) this.partSize = options.partSize; if (options.leavePartsOnError) this.leavePartsOnError = true; @@ -31,7 +31,7 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ }, leavePartsOnError: false, - maxQueue: 4, + queueSize: 4, partSize: null, minPartSize: 1024 * 1024 * 5, maxTotalParts: 10000, @@ -193,7 +193,7 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ fillBuffer: function fillBuffer() { var self = this; var bodyLen = byteLength(self.body); - while (self.activeParts < self.maxQueue && self.partPos < bodyLen) { + while (self.activeParts < self.queueSize && self.partPos < bodyLen) { var buf = self.body.slice(self.partPos, self.partPos + self.partSize); self.partPos += self.partSize; @@ -210,7 +210,7 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ */ fillStream: function fillStream() { var self = this; - if (self.activeParts >= self.maxQueue) return; + if (self.activeParts >= self.queueSize) return; var buf = self.body.read(self.partSize - self.partBuffer.length) || self.body.read(); diff --git a/test/s3/managed_upload.spec.coffee b/test/s3/managed_upload.spec.coffee index bc677fbeb8..de58d377e7 100644 --- a/test/s3/managed_upload.spec.coffee +++ b/test/s3/managed_upload.spec.coffee @@ -108,7 +108,7 @@ describe 'AWS.S3.ManagedUpload', -> ] size = 18 - opts = partSize: size, maxQueue: 1 + opts = partSize: size, queueSize: 1 upload = new AWS.S3.ManagedUpload(s3, opts) send Body: bigbody expect(helpers.operationsForRequests(reqs)).to.eql [ @@ -135,7 +135,7 @@ describe 'AWS.S3.ManagedUpload', -> { data: {}, error: null } ] - upload = new AWS.S3.ManagedUpload(s3, maxQueue: 1) + upload = new AWS.S3.ManagedUpload(s3, queueSize: 1) send Body: bigbody expect(helpers.operationsForRequests(reqs)).to.eql [ 's3.createMultipartUpload' @@ -179,7 +179,7 @@ describe 'AWS.S3.ManagedUpload', -> { data: {}, error: null } ] - upload = new AWS.S3.ManagedUpload(s3, maxQueue: 1, leavePartsOnError: true) + upload = new AWS.S3.ManagedUpload(s3, queueSize: 1, leavePartsOnError: true) send Body: bigbody expect(helpers.operationsForRequests(reqs)).to.eql [ 's3.createMultipartUpload' From 1428331b836c90cb888ba66f440f43eaf8f7b541 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Wed, 3 Dec 2014 15:27:02 -0800 Subject: [PATCH 08/17] Refactor and improve test coverage. API has changed to: AWS.S3.upload(params, [options], [callback]) If no callback is supplied, the operation returns an object that can call `.send([callback])`. Example: s3.upload({Body: body}, {queueSize: 10}, function(err, data) { console.log(err, data); }); // or s3.upload({Body: body}, {queueSize: 10}).send(callback); The `options` parameter can be omitted. --- features/s3/managed_upload.feature | 3 +- .../s3/step_definitions/managed_upload.js | 15 ++++- lib/s3/managed_upload.js | 25 ++++---- lib/services/s3.js | 20 ++++-- package.json | 2 +- test/s3/managed_upload.spec.coffee | 61 +++++++++++++++---- 6 files changed, 92 insertions(+), 34 deletions(-) diff --git a/features/s3/managed_upload.feature b/features/s3/managed_upload.feature index 75912a49a3..1ba2694ab2 100644 --- a/features/s3/managed_upload.feature +++ b/features/s3/managed_upload.feature @@ -6,6 +6,7 @@ Feature: S3 Managed Upload When I use S3 managed upload to upload a large buffer Then the multipart upload should succeed - Scenario: Uploading a large buffer + Scenario: Uploading a large stream When I use S3 managed upload to upload a large stream Then the multipart upload should succeed + And I should get progress events diff --git a/features/s3/step_definitions/managed_upload.js b/features/s3/step_definitions/managed_upload.js index 25e955db71..b0b5993307 100644 --- a/features/s3/step_definitions/managed_upload.js +++ b/features/s3/step_definitions/managed_upload.js @@ -27,7 +27,7 @@ module.exports = function () { var self = this; var buffer = new Buffer(1024 * 1024 * 12); var params = {Bucket: self.mgrBucket, Key: 'largebuffer', Body: buffer}; - self.s3.upload().send(params, function (err, data) { + self.s3.upload(params, function (err, data) { self.error = err; self.data = data; callback(); @@ -44,10 +44,21 @@ module.exports = function () { var self = this; var stream = this.AWS.util.buffer.toStream(new Buffer(1024 * 1024 * 12)); var params = {Bucket: self.mgrBucket, Key: 'largestream', Body: stream}; - self.s3.upload().send(params, function (err, data) { + + self.progressEvents = []; + var progress = function(info) { + self.progressEvents.push(info); + } + + self.s3.upload(params).on('httpUploadProgress', progress).send(function (err, data) { self.error = err; self.data = data; callback(); }); }); + + this.Then(/^I should get progress events$/, function (callback) { + this.assert.compare(this.progressEvents.length, '>', 0); + callback(); + }); }; diff --git a/lib/s3/managed_upload.js b/lib/s3/managed_upload.js index 7a4119ad63..c17c044ee3 100644 --- a/lib/s3/managed_upload.js +++ b/lib/s3/managed_upload.js @@ -2,11 +2,9 @@ var AWS = require('../core'); var byteLength = AWS.util.string.byteLength; AWS.S3.ManagedUpload = AWS.util.inherit({ - constructor: function ManagedUpload(svc, options) { + constructor: function ManagedUpload(options) { var self = this; AWS.SequentialExecutor.call(self); - self.service = svc; - self.configure(options); self.body = null; self.callback = null; self.parts = {}; @@ -14,8 +12,13 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ self.fillQueue = function() { self.callback(new Error('Unsupported body payload ' + typeof self.body)); }; + + self.configure(options); }, + /** + * @api private + */ configure: function configure(options) { options = options || {}; this.partSize = this.minPartSize; @@ -28,6 +31,11 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ throw new Error('partSize must be greater than ' + this.minPartSize); } + + this.service = options.service; + this.bindServiceObject(options.params); + this.validateBody(); + this.adjustTotalBytes(); }, leavePartsOnError: false, @@ -36,18 +44,10 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ minPartSize: 1024 * 1024 * 5, maxTotalParts: 10000, - send: function(params, cb) { + send: function(cb) { var self = this; - if (arguments.length === 1 && typeof params === 'function') { - cb = params; params = null; - } - params = params || {}; self.callback = cb || function(err) { if (err) throw err; }; - self.bindServiceObject(params); - self.validateBody(); - self.adjustTotalBytes(); - var runFill = true; if (typeof self.body.slice === 'function') { self.fillQueue = self.fillBuffer; @@ -86,6 +86,7 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ * @api private */ bindServiceObject: function bindServiceObject(params) { + params = params || {}; var self = this; // bind parameters to new service object diff --git a/lib/services/s3.js b/lib/services/s3.js index 2898d96fbc..05c0940ec0 100644 --- a/lib/services/s3.js +++ b/lib/services/s3.js @@ -400,10 +400,20 @@ AWS.util.update(AWS.S3.prototype, { // Pull in managed upload extension require('../s3/managed_upload'); -AWS.S3.prototype.upload = function upload(params, callback) { - if (typeof params !== 'function' && typeof callback !== 'function') { - return new AWS.S3.ManagedUpload(this, params); - } else { - new AWS.S3.ManagedUpload(this).send(params, callback); + +/** + * Managed uploader + */ +AWS.S3.prototype.upload = function upload(params, options, callback) { + if (typeof options === 'function' && callback === undefined) { + callback = options; + options = null; } + + options = options || {}; + options = AWS.util.merge(options || {}, {service: this, params: params}); + + var uploader = new AWS.S3.ManagedUpload(options); + if (typeof callback == 'function') uploader.send(callback); + return uploader; }; diff --git a/package.json b/package.json index 37a6ae8cca..972298f11b 100644 --- a/package.json +++ b/package.json @@ -59,7 +59,7 @@ ], "scripts" : { "test" : "npm -s run-script lint && npm -s run-script unit && npm -s run-script buildertest && npm -s run-script browsertest && ([ -f configuration ] && npm -s run-script integration || true)", - "unit" : "istanbul `[ $COVERAGE ] && echo 'cover _mocha' || echo 'test mocha'` -- test test/json test/model test/protocol test/query test/services test/signers test/xml", + "unit" : "istanbul `[ $COVERAGE ] && echo 'cover _mocha' || echo 'test mocha'` -- test test/json test/model test/protocol test/query test/services test/signers test/xml test/s3", "coverage": "istanbul cover ./node_modules/mocha/bin/_mocha -- test test/json test/model test/protocol test/query test/services test/signers test/xml", "browsertest": "rake browser:test", "buildertest": "mocha --compilers coffee:coffee-script -s 1000 -t 10000 dist-tools/test", diff --git a/test/s3/managed_upload.spec.coffee b/test/s3/managed_upload.spec.coffee index de58d377e7..4614943502 100644 --- a/test/s3/managed_upload.spec.coffee +++ b/test/s3/managed_upload.spec.coffee @@ -18,19 +18,41 @@ describe 'AWS.S3.ManagedUpload', -> AWS.S3.ManagedUpload.prototype.minPartSize = 10 [err, data] = [] helpers.spyOn(AWS.S3.prototype, 'extractError').andReturn(->) - upload = new AWS.S3.ManagedUpload(s3) + upload = null afterEach -> AWS.S3.ManagedUpload.prototype.minPartSize = minPartSize - send = (params, cb) -> upload.send params, (e, d) -> - [err,data] = [e,d] - cb() if cb + send = (params, cb) -> + if !upload + upload = new AWS.S3.ManagedUpload(service: s3, params: params) + upload.send (e, d) -> + [err,data] = [e,d] + cb() if cb describe 'send', -> + it 'default callback throws', -> + helpers.mockResponses [ error: new Error('ERROR') ] + upload = new AWS.S3.ManagedUpload({params: {Body: 'body'}}) + expect(-> upload.send()).to.throw('ERROR') + it 'fails if Body is not passed', -> expect(-> send()).to.throw('params.Body is required') + it 'fails if Body is unknown type', -> + send(Body: 2) + expect(err.message).to.match(/Unsupported body payload number/) + + it 'converts string body to Buffer', -> + reqs = helpers.mockResponses [ + data: ETag: 'ETAG' + ] + send(Body: 'string') + expect(data.ETag).to.equal('ETAG') + + it 'uses a default service object if none provided', -> + expect(-> new AWS.S3.ManagedUpload()).to.throw('params.Body is required') + it 'uploads a single part if size is less than min multipart size', -> reqs = helpers.mockResponses [ data: ETag: 'ETAG' @@ -43,6 +65,15 @@ describe 'AWS.S3.ManagedUpload', -> expect(helpers.operationsForRequests(reqs)).to.eql ['s3.putObject'] expect(reqs[0].params.ContentEncoding).to.equal('encoding') + it 'can fail a single part', -> + reqs = helpers.mockResponses [ + data: null + error: new Error('ERROR') + ] + send(Body: 'string') + expect(data).not.to.exist + expect(err.message).to.equal('ERROR') + it 'uploads multipart if size is greater than min multipart size', -> reqs = helpers.mockResponses [ { data: UploadId: 'uploadId' } @@ -108,9 +139,9 @@ describe 'AWS.S3.ManagedUpload', -> ] size = 18 - opts = partSize: size, queueSize: 1 - upload = new AWS.S3.ManagedUpload(s3, opts) - send Body: bigbody + opts = partSize: size, queueSize: 1, service: s3, params: {Body: bigbody} + upload = new AWS.S3.ManagedUpload(opts) + send() expect(helpers.operationsForRequests(reqs)).to.eql [ 's3.createMultipartUpload' 's3.uploadPart' @@ -124,7 +155,7 @@ describe 'AWS.S3.ManagedUpload', -> expect(reqs[2].params.ContentLength).to.equal(size) it 'errors if partSize is smaller than minPartSize', -> - expect(-> new AWS.S3.ManagedUpload(s3, partSize: 5)).to.throw( + expect(-> new AWS.S3.ManagedUpload(partSize: 5)).to.throw( 'partSize must be greater than 10') it 'aborts if uploadPart fails', -> @@ -135,8 +166,8 @@ describe 'AWS.S3.ManagedUpload', -> { data: {}, error: null } ] - upload = new AWS.S3.ManagedUpload(s3, queueSize: 1) - send Body: bigbody + upload = new AWS.S3.ManagedUpload(queueSize: 1, params: {Body: bigbody}) + send() expect(helpers.operationsForRequests(reqs)).to.eql [ 's3.createMultipartUpload' 's3.uploadPart' @@ -179,8 +210,11 @@ describe 'AWS.S3.ManagedUpload', -> { data: {}, error: null } ] - upload = new AWS.S3.ManagedUpload(s3, queueSize: 1, leavePartsOnError: true) - send Body: bigbody + upload = new AWS.S3.ManagedUpload + queueSize: 1 + leavePartsOnError: true + params: { Body: bigbody } + send() expect(helpers.operationsForRequests(reqs)).to.eql [ 's3.createMultipartUpload' 's3.uploadPart' @@ -195,7 +229,8 @@ describe 'AWS.S3.ManagedUpload', -> it 'sends a small stream in a single putObject', (done) -> stream = AWS.util.buffer.toStream(smallbody) reqs = helpers.mockResponses [data: ETag: 'ETAG'] - upload.send Body: stream, -> + upload = new AWS.S3.ManagedUpload params: { Body: stream } + upload.send -> expect(helpers.operationsForRequests(reqs)).to.eql ['s3.putObject'] expect(err).not.to.exist done() From 21ff148be02e1e333ed45841e0eba170564c2666 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Wed, 3 Dec 2014 15:36:26 -0800 Subject: [PATCH 09/17] Extra test coverage for s3.upload() integration --- test/services/s3.spec.coffee | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/services/s3.spec.coffee b/test/services/s3.spec.coffee index d9f6635917..27de4da34e 100644 --- a/test/services/s3.spec.coffee +++ b/test/services/s3.spec.coffee @@ -218,6 +218,22 @@ describe 'AWS.S3', -> s3.putObject (err, data) -> expect(@retryCount).to.equal(s3.config.maxRetries) + # Managed Upload integration point + describe 'upload', -> + it 'accepts parameters in upload() call', -> + helpers.mockResponses [ data: { ETag: 'ETAG' } ] + s3.upload {Bucket: 'bucket', Key: 'key', Body: 'body'}, (err, data) -> + expect(err).not.to.exist + expect(data.ETag).to.equal('ETAG') + + it 'accepts options as a second parameter', -> + helpers.mockResponses [ data: { ETag: 'ETAG' } ] + upload = s3.upload({Bucket: 'bucket', Key: 'key', Body: 'body'}, {queueSize: 2}, ->) + expect(upload.queueSize).to.equal(2) + + it 'does not send if no callback is supplied', -> + s3.upload(Bucket: 'bucket', Key: 'key', Body: 'body') + # S3 returns a handful of errors without xml bodies (to match the # http spec) these tests ensure we give meaningful codes/messages for these. describe 'errors with no XML body', -> From a02a861f5b5b4aa9e2a028f4a5af736b424960b7 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Wed, 3 Dec 2014 16:10:40 -0800 Subject: [PATCH 10/17] Fix linting error --- lib/services/s3.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/services/s3.js b/lib/services/s3.js index 05c0940ec0..019d5dd9af 100644 --- a/lib/services/s3.js +++ b/lib/services/s3.js @@ -414,6 +414,6 @@ AWS.S3.prototype.upload = function upload(params, options, callback) { options = AWS.util.merge(options || {}, {service: this, params: params}); var uploader = new AWS.S3.ManagedUpload(options); - if (typeof callback == 'function') uploader.send(callback); + if (typeof callback === 'function') uploader.send(callback); return uploader; }; From 2f024107cb6915a1a928cf3899233d942a617091 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Wed, 3 Dec 2014 17:33:11 -0800 Subject: [PATCH 11/17] Add documentation for AWS.S3.ManagedUpload and AWS.S3.upload --- lib/s3/managed_upload.js | 98 +++++++++++++++++++++++++++++++++++++++- lib/services/s3.js | 57 +++++++++++++++-------- 2 files changed, 134 insertions(+), 21 deletions(-) diff --git a/lib/s3/managed_upload.js b/lib/s3/managed_upload.js index c17c044ee3..e1d741b293 100644 --- a/lib/s3/managed_upload.js +++ b/lib/s3/managed_upload.js @@ -1,7 +1,64 @@ var AWS = require('../core'); var byteLength = AWS.util.string.byteLength; +/** + * The managed uploader allows for easy and efficient uploading of buffers, + * blobs, or streams, using a configurable amount of concurrency to perform + * multipart uploads where possible. This abstraction also enables uploading + * streams of unknown size due to the use of multipart uploads. + * + * ## Tracking upload progress + * + * The managed upload object can also track progress by attaching an + * 'httpUploadProgress' listener to the upload manager. This event is similar + * to {AWS.Request~httpUploadProgress} but groups all concurrent upload progress + * into a single event. See {AWS.S3.ManagedUpload~httpUploadProgress} for more + * information. + * + * To construct a managed upload object, see the {constructor} function. + * + * @!event httpUploadProgress(progress) + * Triggered when the uploader has uploaded more data. + * @note The `total` property may not be set if the stream being uploaded has + * not yet finished chunking. In this case the `total` will be undefined + * until the total stream size is known. + * @note This event will not be emitted in Node.js 0.8.x. + * @param progress [map] An object containing the `loaded` and `total` bytes + * of the request. Note that `total` may be undefined until the payload + * size is known. + * @context (see AWS.Request~send) + */ AWS.S3.ManagedUpload = AWS.util.inherit({ + /** + * Creates a managed upload object with a set of configuration options. + * + * @note A "Body" parameter is required to be set prior to calling {send}. + * @option options params [map] a map of parameters to pass to the upload + * requests. The "Body" parameter is required to be specified either on + * the service or in the params option. + * @option options queueSize [Number] (4) the size of the concurrent queue + * manager to upload parts in parallel. Set to 1 for synchronous uploading + * of parts. Note that the uploader will buffer at most queueSize * partSize + * bytes into memory at any given time. + * @option options partSize [Number] (5mb) the size in bytes for each + * individual part to be uploads. See {minPartSize} for the minimum allowed + * part size. + * @option options leavePartsOnError [Boolean] (false) whether to abort the + * multipart upload if an error occurs. Set to true if you want to handle + * failures manually. + * @option options service [AWS.S3] an optional S3 service object to use for + * requests. This object might have bound parameters used by the uploader. + * @example Creating a default uploader for a stream object + * var upload = new AWS.S3.ManagedUpload({ + * params: {Bucket: 'bucket', Key: 'key', Body: stream} + * }); + * @example Creating an uploader with concurrency of 1 and partSize of 10mb + * var upload = new AWS.S3.ManagedUpload({ + * partSize: 10 * 1024 * 1024, queueSize: 1, + * params: {Bucket: 'bucket', Key: 'key', Body: stream} + * }); + * @see send + */ constructor: function ManagedUpload(options) { var self = this; AWS.SequentialExecutor.call(self); @@ -38,15 +95,52 @@ AWS.S3.ManagedUpload = AWS.util.inherit({ this.adjustTotalBytes(); }, + /** + * @api private + */ leavePartsOnError: false, + + /** + * @api private + */ queueSize: 4, + + /** + * @api private + */ partSize: null, + + /** + * @readonly + * @return [Number] the minimum number of bytes for an individual part + * upload. + */ minPartSize: 1024 * 1024 * 5, + + /** + * @readonly + * @return [Number] the maximum allowed number of parts in a multipart upload. + */ maxTotalParts: 10000, - send: function(cb) { + /** + * Initiates the managed upload for the payload. + * + * @callback callback function(err, data) + * @param err [Error] an error or null if no error occurred. + * @param data [map] The response data from the successful upload: + * * `Location` (String) the URL of the uploaded object + * * `ETag` (String) the ETag of the uploaded object + * @example Sending a managed upload object + * var params = {Bucket: 'bucket', Key: 'key', Body: stream}; + * var upload = new AWS.S3.ManagedUpload({params: params}); + * upload.send(function(err, data) { + * console.log(err, data); + * }); + */ + send: function(callback) { var self = this; - self.callback = cb || function(err) { if (err) throw err; }; + self.callback = callback || function(err) { if (err) throw err; }; var runFill = true; if (typeof self.body.slice === 'function') { diff --git a/lib/services/s3.js b/lib/services/s3.js index 019d5dd9af..ca55c25e7c 100644 --- a/lib/services/s3.js +++ b/lib/services/s3.js @@ -1,5 +1,8 @@ var AWS = require('../core'); +// Pull in managed upload extension +require('../s3/managed_upload'); + AWS.util.update(AWS.S3.prototype, { /** * @api private @@ -395,25 +398,41 @@ AWS.util.update(AWS.S3.prototype, { params.CreateBucketConfiguration = { LocationConstraint: this.config.region }; } return this.makeRequest('createBucket', params, callback); - } -}); - -// Pull in managed upload extension -require('../s3/managed_upload'); + }, -/** - * Managed uploader - */ -AWS.S3.prototype.upload = function upload(params, options, callback) { - if (typeof options === 'function' && callback === undefined) { - callback = options; - options = null; - } + /** + * Uploads an arbitrarily sized buffer, blob, or stream, using intelligent + * concurrent handling of parts if the payload is large enough. You can + * configure the concurrent queue size by setting `options`. + * + * @param (see AWS.S3.putObject) + * @option (see AWS.S3.ManagedUpload.constructor) + * @return [AWS.S3.ManagedUpload] the managed upload object that can call + * `send()` or track progress. + * @example Uploading a stream object + * var params = {Bucket: 'bucket', Key: 'key', Body: stream}; + * s3.upload(params, function(err, data) { + * console.log(err, data); + * }); + * @example Uploading a stream with concurrency of 1 and partSize of 10mb + * var params = {Bucket: 'bucket', Key: 'key', Body: stream}; + * var options = {partSize: 10 * 1024 * 1024, queueSize: 1}; + * s3.upload(params, options, function(err, data) { + * console.log(err, data); + * }); + * @see AWS.S3.ManagedUpload + */ + upload: function upload(params, options, callback) { + if (typeof options === 'function' && callback === undefined) { + callback = options; + options = null; + } - options = options || {}; - options = AWS.util.merge(options || {}, {service: this, params: params}); + options = options || {}; + options = AWS.util.merge(options || {}, {service: this, params: params}); - var uploader = new AWS.S3.ManagedUpload(options); - if (typeof callback === 'function') uploader.send(callback); - return uploader; -}; + var uploader = new AWS.S3.ManagedUpload(options); + if (typeof callback === 'function') uploader.send(callback); + return uploader; + } +}); From d4fe8dd69133384af065e9489fd99dd72b9afde2 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Wed, 3 Dec 2014 18:01:22 -0800 Subject: [PATCH 12/17] Update upload() signature in docs --- lib/services/s3.js | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/lib/services/s3.js b/lib/services/s3.js index ca55c25e7c..87ad0b3a41 100644 --- a/lib/services/s3.js +++ b/lib/services/s3.js @@ -401,26 +401,27 @@ AWS.util.update(AWS.S3.prototype, { }, /** - * Uploads an arbitrarily sized buffer, blob, or stream, using intelligent - * concurrent handling of parts if the payload is large enough. You can - * configure the concurrent queue size by setting `options`. + * @overload upload(params = {}, [options], [callback]) + * Uploads an arbitrarily sized buffer, blob, or stream, using intelligent + * concurrent handling of parts if the payload is large enough. You can + * configure the concurrent queue size by setting `options`. * - * @param (see AWS.S3.putObject) - * @option (see AWS.S3.ManagedUpload.constructor) - * @return [AWS.S3.ManagedUpload] the managed upload object that can call - * `send()` or track progress. - * @example Uploading a stream object - * var params = {Bucket: 'bucket', Key: 'key', Body: stream}; - * s3.upload(params, function(err, data) { - * console.log(err, data); - * }); - * @example Uploading a stream with concurrency of 1 and partSize of 10mb - * var params = {Bucket: 'bucket', Key: 'key', Body: stream}; - * var options = {partSize: 10 * 1024 * 1024, queueSize: 1}; - * s3.upload(params, options, function(err, data) { - * console.log(err, data); - * }); - * @see AWS.S3.ManagedUpload + * @param (see AWS.S3.putObject) + * @option (see AWS.S3.ManagedUpload.constructor) + * @return [AWS.S3.ManagedUpload] the managed upload object that can call + * `send()` or track progress. + * @example Uploading a stream object + * var params = {Bucket: 'bucket', Key: 'key', Body: stream}; + * s3.upload(params, function(err, data) { + * console.log(err, data); + * }); + * @example Uploading a stream with concurrency of 1 and partSize of 10mb + * var params = {Bucket: 'bucket', Key: 'key', Body: stream}; + * var options = {partSize: 10 * 1024 * 1024, queueSize: 1}; + * s3.upload(params, options, function(err, data) { + * console.log(err, data); + * }); + * @see AWS.S3.ManagedUpload */ upload: function upload(params, options, callback) { if (typeof options === 'function' && callback === undefined) { From 189afc81ddab3de9dcf6afb6c4bfee41d64c286a Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Thu, 4 Dec 2014 11:23:10 -0800 Subject: [PATCH 13/17] Recommend using .upload() instead of .putObject() in guide --- doc-src/guide/browser-examples.md | 4 ++-- doc-src/guide/node-examples.md | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/doc-src/guide/browser-examples.md b/doc-src/guide/browser-examples.md index c8e9bab9fc..6f1cd76155 100644 --- a/doc-src/guide/browser-examples.md +++ b/doc-src/guide/browser-examples.md @@ -62,7 +62,7 @@ object in S3: results.innerHTML = ''; var params = {Key: 'data.txt', Body: textarea.value}; - bucket.putObject(params, function (err, data) { + bucket.upload(params, function (err, data) { results.innerHTML = err ? 'ERROR!' : 'SAVED.'; }); }, false); @@ -89,7 +89,7 @@ to upload a file on disk to S3: results.innerHTML = ''; var params = {Key: file.name, ContentType: file.type, Body: file}; - bucket.putObject(params, function (err, data) { + bucket.upload(params, function (err, data) { results.innerHTML = err ? 'ERROR!' : 'UPLOADED.'; }); } else { diff --git a/doc-src/guide/node-examples.md b/doc-src/guide/node-examples.md index dea1f3995e..88fefa8875 100644 --- a/doc-src/guide/node-examples.md +++ b/doc-src/guide/node-examples.md @@ -39,7 +39,7 @@ AWS.config.region = 'us-west-2'; var s3bucket = new AWS.S3({params: {Bucket: 'myBucket'}}); s3bucket.createBucket(function() { var data = {Key: 'myKey', Body: 'Hello!'}; - s3bucket.putObject(data, function(err, data) { + s3bucket.upload(data, function(err, data) { if (err) { console.log("Error uploading data: ", err); } else { @@ -104,7 +104,7 @@ s3.listBuckets(function(err, data) { }); ``` -### Amazon S3: Create a New Bucket and Object (createBucket, putObject) +### Amazon S3: Create a New Bucket and Object (createBucket, upload) The following example puts the string 'Hello!' inside the object 'myKey' of bucket 'myBucket': @@ -112,7 +112,7 @@ object 'myKey' of bucket 'myBucket': ```javascript var s3 = new AWS.S3({params: {Bucket: 'myBucket', Key: 'myKey'}}); s3.createBucket(function() { - s3.putObject({Body: 'Hello!'}, function() { + s3.upload({Body: 'Hello!'}, function() { console.log("Successfully uploaded data to myBucket/myKey"); }); }); From 9b1fb9448b913721331fbd70a7c26fe29bf34301 Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Thu, 4 Dec 2014 11:23:52 -0800 Subject: [PATCH 14/17] Fix variable name in example --- doc-src/guide/node-examples.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc-src/guide/node-examples.md b/doc-src/guide/node-examples.md index 88fefa8875..2847e69449 100644 --- a/doc-src/guide/node-examples.md +++ b/doc-src/guide/node-examples.md @@ -38,8 +38,8 @@ AWS.config.region = 'us-west-2'; // Make sure to change the bucket name from "myBucket" to something unique. var s3bucket = new AWS.S3({params: {Bucket: 'myBucket'}}); s3bucket.createBucket(function() { - var data = {Key: 'myKey', Body: 'Hello!'}; - s3bucket.upload(data, function(err, data) { + var params = {Key: 'myKey', Body: 'Hello!'}; + s3bucket.upload(params, function(err, data) { if (err) { console.log("Error uploading data: ", err); } else { From afb24068a08759c8abd7ffa489f77e9c86a6396c Mon Sep 17 00:00:00 2001 From: Loren Segal Date: Thu, 4 Dec 2014 14:16:50 -0800 Subject: [PATCH 15/17] Add more guide content --- doc-src/guide/browser-examples.md | 16 ++++++++++++++++ doc-src/guide/node-examples.md | 15 +++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/doc-src/guide/browser-examples.md b/doc-src/guide/browser-examples.md index 6f1cd76155..e6eee0f5f2 100644 --- a/doc-src/guide/browser-examples.md +++ b/doc-src/guide/browser-examples.md @@ -45,6 +45,14 @@ Amazon S3 bucket: ### Uploading data into an object +

+ In order to upload files in the browser, you should ensure that you + have configured CORS for your Amazon S3 bucket and exposed the "ETag" + header via the <ExposeHeader>ETag</ExposeHeader> + declaration. See the {file:browser-configuring.md Configuring} section + for more information on configuring CORS for an Amazon S3 bucket. +

+ The following example will upload the contents of a `