Skip to content

Commit

Permalink
Merge pull request #601 from aws/fix/zero-byte-and-perf-streaming-upload
Browse files Browse the repository at this point in the history
AWS.S3.ManagedUpload fixes for zero-byte streams and buffering performance
  • Loading branch information
lsegal committed May 8, 2015
2 parents 1027de1 + b496595 commit a53b30f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 11 deletions.
53 changes: 42 additions & 11 deletions lib/s3/managed_upload.js
Expand Up @@ -158,7 +158,7 @@ AWS.S3.ManagedUpload = AWS.util.inherit({
if (self.body instanceof Stream) {
runFill = false;
self.fillQueue = self.fillStream;
self.partBuffer = new AWS.util.Buffer(0);
self.partBuffers = [];
self.body.
on('readable', function() { self.fillQueue(); }).
on('end', function() {
Expand Down Expand Up @@ -315,14 +315,27 @@ AWS.S3.ManagedUpload = AWS.util.inherit({
/**
* @api private
*/
partBuffer: null,
partBuffers: null,

/**
* @api private
*/
partBufferLength: 0,

/**
* @api private
*/
fillBuffer: function fillBuffer() {
var self = this;
var bodyLen = byteLength(self.body);

if (bodyLen === 0) {
self.isDoneChunking = true;
self.numParts = 1;
self.nextChunk(self.body);
return;
}

while (self.activeParts < self.queueSize && self.partPos < bodyLen) {
var endPos = Math.min(self.partPos + self.partSize, bodyLen);
var buf = self.sliceFn.call(self.body, self.partPos, endPos);
Expand All @@ -343,23 +356,41 @@ AWS.S3.ManagedUpload = AWS.util.inherit({
var self = this;
if (self.activeParts >= self.queueSize) return;

var buf = self.body.read(self.partSize - self.partBuffer.length) ||
var buf = self.body.read(self.partSize - self.partBufferLength) ||
self.body.read();
if (buf) {
self.partBuffer = AWS.util.Buffer.concat([self.partBuffer, buf]);
self.partBuffers.push(buf);
self.partBufferLength += buf.length;
self.totalChunkedBytes += buf.length;
}

if (self.partBuffer.length >= self.partSize) {
self.nextChunk(self.partBuffer.slice(0, self.partSize));
self.partBuffer = self.partBuffer.slice(self.partSize);
} else if (self.isDoneChunking) {
if (self.partBufferLength >= self.partSize) {
var pbuf = Buffer.concat(self.partBuffers);
self.partBuffers = [];
self.partBufferLength = 0;

// if we have more than partSize, push the rest back on the queue
if (pbuf.length > self.partSize) {
var rest = pbuf.slice(self.partSize);
self.partBuffers.push(rest);
self.partBufferLength += rest.length;
pbuf = pbuf.slice(0, self.partSize);
}

self.nextChunk(pbuf);
}

if (self.isDoneChunking && !self.isDoneSending) {
pbuf = Buffer.concat(self.partBuffers);
self.partBuffers = [];
self.partBufferLength = 0;
self.totalBytes = self.totalChunkedBytes;
if (self.partBuffer.length > 0) {
self.isDoneSending = true;

if (self.numParts === 0 || pbuf.length > 0) {
self.numParts++;
self.nextChunk(self.partBuffer);
self.nextChunk(pbuf);
}
self.partBuffer = new AWS.util.Buffer(0);
}

self.body.read(0);
Expand Down
17 changes: 17 additions & 0 deletions test/s3/managed_upload.spec.coffee
Expand Up @@ -9,6 +9,7 @@ body = (size) ->

smallbody = body(5)
bigbody = body(36)
zerobody = body(0)

describe 'AWS.S3.ManagedUpload', ->
s3 = new AWS.S3 maxRetries: 0, params: { Bucket: 'bucket', Key: 'key' }
Expand Down Expand Up @@ -154,6 +155,13 @@ describe 'AWS.S3.ManagedUpload', ->
expect(reqs[1].params.ContentLength).to.equal(size)
expect(reqs[2].params.ContentLength).to.equal(size)

it 'supports zero-byte body buffers', ->
reqs = helpers.mockResponses [data: ETag: 'ETAG']
upload = new AWS.S3.ManagedUpload params: { Body: zerobody }
upload.send ->
expect(helpers.operationsForRequests(reqs)).to.eql ['s3.putObject']
expect(err).not.to.exist

it 'errors if partSize is smaller than minPartSize', ->
expect(-> new AWS.S3.ManagedUpload(partSize: 5)).to.throw(
'partSize must be greater than 10')
Expand Down Expand Up @@ -234,3 +242,12 @@ describe 'AWS.S3.ManagedUpload', ->
expect(helpers.operationsForRequests(reqs)).to.eql ['s3.putObject']
expect(err).not.to.exist
done()

it 'sends a zero byte stream', (done) ->
stream = AWS.util.buffer.toStream(zerobody)
reqs = helpers.mockResponses [data: ETag: 'ETAG']
upload = new AWS.S3.ManagedUpload params: { Body: stream }
upload.send ->
expect(helpers.operationsForRequests(reqs)).to.eql ['s3.putObject']
expect(err).not.to.exist
done()

0 comments on commit a53b30f

Please sign in to comment.