Skip to content

Commit

Permalink
Clean up createReadStream logic to use a PassThrough stream.
Browse files Browse the repository at this point in the history
Also fixes incorrect detection of streaming API in Node 0.12.x and
resolves a memory leak when not using pipe.

Fixes #605
  • Loading branch information
lsegal committed May 19, 2015
1 parent 351a544 commit ebd10c8
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 45 deletions.
19 changes: 12 additions & 7 deletions features/s3/step_definitions/objects.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ module.exports = function () {
var params = {Bucket: this.sharedBucket, Key: key};
var world = this;
this.result = '';
this.service.getObject(params).createReadStream().
on('end', function() { callback(); }).
on('data', function(d) { world.result += d.toString(); });
var s = this.service.getObject(params).createReadStream();

setTimeout(function() {
s.on('end', function() { callback(); });
s.on('data', function(d) { world.result += d.toString(); });
}, 2000); // delay streaming to ensure it is buffered
});

this.When(/^I stream2 key "([^"]*)"$/, function(key, callback) {
Expand All @@ -80,10 +83,12 @@ module.exports = function () {
var world = this;
this.result = '';
var stream = this.service.getObject(params).createReadStream();
stream.on('end', function() { callback(); });
stream.on('readable', function() {
var v = stream.read(); if (v) world.result += v;
});
setTimeout(function() {
stream.on('end', function() { callback(); });
stream.on('readable', function() {
var v = stream.read(); if (v) world.result += v;
});
}, 2000); // delay streaming to ensure it is buffered
});

this.Then(/^the streamed data should contain "([^"]*)"$/, function(data, callback) {
Expand Down
38 changes: 13 additions & 25 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -527,24 +527,22 @@ AWS.Request = inherit({
var streams = AWS.util.nodeRequire('stream');
var req = this;
var stream = null;
var legacyStreams = false;

if (AWS.HttpClient.streamsApiVersion === 2) {
stream = new streams.Readable();
stream._read = function() {};
stream = new streams.PassThrough();
req.send();
} else {
stream = new streams.Stream();
stream.readable = true;
}

stream.sent = false;
stream.on('newListener', function(event) {
if (!stream.sent && (event === 'data' || event === 'readable')) {
if (event === 'data') legacyStreams = true;
stream.sent = true;
process.nextTick(function() { req.send(function() { }); });
}
});
stream.sent = false;
stream.on('newListener', function(event) {
if (!stream.sent && event === 'data') {
stream.sent = true;
process.nextTick(function() { req.send(); });
}
});
}

this.on('httpHeaders', function streamHeaders(statusCode, headers, resp) {
if (statusCode < 300) {
Expand All @@ -556,25 +554,15 @@ AWS.Request = inherit({
});

var httpStream = resp.httpResponse.createUnbufferedStream();
if (legacyStreams) {
if (AWS.HttpClient.streamsApiVersion === 2) {
httpStream.pipe(stream);
} else {
httpStream.on('data', function(arg) {
stream.emit('data', arg);
});
httpStream.on('end', function() {
stream.emit('end');
});
} else {
httpStream.on('readable', function() {
var chunk;
do {
chunk = httpStream.read();
if (chunk !== null) stream.push(chunk);
} while (chunk !== null);
stream.read(0);
});
httpStream.on('end', function() {
stream.push(null);
});
}

httpStream.on('error', function(err) {
Expand Down
74 changes: 61 additions & 13 deletions test/request.spec.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,36 @@ describe 'AWS.Request', ->

if AWS.util.isNode()
describe 'createReadStream', ->
nstream = require('stream')

app = (req, resp) ->
resp.writeHead(200, {})
resp.write('FOOBARBAZQUX')
resp.end()

getport = (cb, startport) ->
port = startport or 45678
srv = require('net').createServer()
srv.on 'error', -> getport(cb, port + 1)
srv.listen port, ->
srv.once 'close', -> cb(port)
srv.close()

server = require('http').createServer (req, resp) ->
app(req, resp)

beforeEach (done) ->
getport (port) ->
server.listen(port)
service = new MockService(endpoint: 'http://localhost:' + port)
done()

afterEach ->
server.close()

it 'streams data', (done) ->
data = ''
helpers.mockHttpResponse 200, {}, ['FOO', 'BAR', 'BAZ', 'QUX']
helpers.spyOn(AWS.HttpClient, 'streamsApiVersion').andReturn 1

request = service.makeRequest('mockMethod')
s = request.createReadStream()
Expand All @@ -176,7 +203,6 @@ describe 'AWS.Request', ->
return done()

data = ''
helpers.mockHttpResponse 200, {}, ['FOO', 'BAR', 'BAZ', 'QUX']
request = service.makeRequest('mockMethod')
s = request.createReadStream()
s.on 'end', ->
Expand All @@ -195,7 +221,6 @@ describe 'AWS.Request', ->
return done()

data = ''
helpers.mockHttpResponse 200, {}, ['FOO', 'BAR', null, null, 'BAZ', 'QUX']
request = service.makeRequest('mockMethod')
s = request.createReadStream()
s.on 'end', ->
Expand All @@ -211,7 +236,10 @@ describe 'AWS.Request', ->

it 'does not stream data on failures', (done) ->
data = ''; error = null
helpers.mockHttpResponse 404, {}, ['No such file']
app = (req, resp) ->
resp.writeHead(404, {})
resp.end()

request = service.makeRequest('mockMethod')
s = request.createReadStream()
s.on 'error', (error) ->
Expand All @@ -222,7 +250,14 @@ describe 'AWS.Request', ->

it 'retries temporal errors and streams resulting successful response', (done) ->
data = ''; error = null
helpers.mockIntermittentFailureResponse 2, 200, {}, ['FOO', 'BAR', 'BAZ', 'QUX']
errs = 0
app = (req, resp) ->
status = if errs < 2 then 500 else 200
errs += 1
resp.writeHead(status, {})
if status == 200
resp.write('FOOBARBAZQUX')
resp.end()

request = service.makeRequest('mockMethod')
s = request.createReadStream()
Expand All @@ -238,13 +273,20 @@ describe 'AWS.Request', ->
helpers.spyOn(AWS.HttpClient, 'getInstance')
AWS.HttpClient.getInstance.andReturn handleRequest: (req, opts, cb, errCb) ->
req = new EventEmitter()
if AWS.HttpClient.streamsApiVersion == 2
req = new nstream.PassThrough()
req.statusCode = 200
req.headers = {}
cb(req)
req.emit('headers', 200, {})
AWS.util.arrayEach ['FOO', 'BAR', 'BAZ'], (str) ->
req.emit 'data', new Buffer(str)
errCb new Error('fail')
process.nextTick ->
cb(req)
req.emit('headers', 200, {})
AWS.util.arrayEach ['FOO', 'BAR', 'BAZ'], (str) ->
if AWS.HttpClient.streamsApiVersion < 2
process.nextTick -> req.emit 'data', new Buffer(str)
else
req.push(new Buffer(str))
process.nextTick -> req.emit 'readable'
process.nextTick -> errCb new Error('fail')
req

request = service.makeRequest('mockMethod')
Expand All @@ -259,12 +301,14 @@ describe 'AWS.Request', ->
s.on 'error', (e) -> error = e
s.on 'data', (c) -> data += c.toString()

it 'fails if retry occurs in the middle of a failing stream', (done) ->
it 'fails if retry occurs in the middle of a successful stream', (done) ->
data = ''; error = null; reqError = null; resp = null
retryCount = 0
helpers.spyOn(AWS.HttpClient, 'getInstance')
AWS.HttpClient.getInstance.andReturn handleRequest: (req, opts, cb, errCb) ->
req = new EventEmitter()
if AWS.HttpClient.streamsApiVersion == 2
req = new nstream.PassThrough()
req.statusCode = 200
req.headers = {}
process.nextTick ->
Expand All @@ -277,7 +321,11 @@ describe 'AWS.Request', ->
errCb code: 'NetworkingError', message: 'FAIL!', retryable: true
return AWS.util.abort
else
process.nextTick -> req.emit 'data', new Buffer(str)
if AWS.HttpClient.streamsApiVersion == 2
req.push(new Buffer(str))
process.nextTick -> req.emit 'readable'
else
process.nextTick -> req.emit 'data', new Buffer(str)
if retryCount >= 1
process.nextTick -> req.emit('end')
req
Expand All @@ -292,7 +340,7 @@ describe 'AWS.Request', ->
expect(data).to.equal('FOOBAR')
expect(error.code).to.equal('NetworkingError')
expect(reqError.code).to.equal('NetworkingError')
expect(reqError.hostname).to.equal('mockservice.mock-region.amazonaws.com')
expect(reqError.hostname).to.equal('localhost')
expect(reqError.region).to.equal('mock-region')
expect(resp.retryCount).to.equal(0)
done()
Expand Down

0 comments on commit ebd10c8

Please sign in to comment.