Skip to content

Commit

Permalink
storage: close sockets on error
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen Sawchuk committed Jun 4, 2015
1 parent 8e98bdf commit 3f2e960
Show file tree
Hide file tree
Showing 2 changed files with 347 additions and 134 deletions.
52 changes: 33 additions & 19 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,8 @@ File.prototype.createReadStream = function(options) {
util.is(options.start, 'number') || util.is(options.end, 'number');
var throughStream = streamEvents(through());

var requestStream;

var validations = ['crc32c', 'md5'];
var validation;

Expand Down Expand Up @@ -430,6 +432,22 @@ File.prototype.createReadStream = function(options) {
o: encodeURIComponent(this.name)
});

// End the stream, first emitting an error or complete event.
var endThroughStream = once(function(err, resp) {
if (err) {
throughStream.emit('error', err, resp);
} else {
throughStream.emit('complete', resp);
}

throughStream.destroy();
});

var endRequestStream = once(function() {
requestStream.abort();
requestStream.destroy();
});

createAuthorizedReq(remoteFilePath);

return throughStream;
Expand Down Expand Up @@ -458,7 +476,7 @@ File.prototype.createReadStream = function(options) {
that.bucket.storage.makeAuthorizedRequest_(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
done(err, null);
endThroughStream(err);
return;
}

Expand All @@ -467,8 +485,13 @@ File.prototype.createReadStream = function(options) {
var localCrcHash;
var localMd5Hash = crypto.createHash('md5');

request(authorizedReqOpts)
.on('error', done)
requestStream = request(authorizedReqOpts);

requestStream
.on('error', function(err) {
endRequestStream();
endThroughStream(err);
})

.on('response', throughStream.emit.bind(throughStream, 'response'))

Expand All @@ -485,13 +508,13 @@ File.prototype.createReadStream = function(options) {
.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, resp) {
if (err) {
done(err, resp);
endThroughStream(err, resp);
return;
}

if (rangeRequest) {
// Range requests can't receive data integrity checks.
done(null, resp);
endThroughStream(null, resp);
return;
}

Expand Down Expand Up @@ -531,28 +554,19 @@ File.prototype.createReadStream = function(options) {
].join(' '));
mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';

done(mismatchError, resp);
endThroughStream(mismatchError, resp);
} else {
done(null, resp);
endThroughStream(null, resp);
}
});
})

.pipe(throughStream);
.pipe(throughStream)

.on('error', endRequestStream);
}
});
}

// End the stream, first emitting an error or complete event.
function done(err) {
if (err) {
throughStream.emit('error', err);
} else {
throughStream.emit('complete');
}

throughStream.end();
}
};

/**
Expand Down
Loading

0 comments on commit 3f2e960

Please sign in to comment.