Permalink
Browse files

Large rewrite of the event lifecycle architecture.

  • Loading branch information...
1 parent 0da841d commit 37660f7e2ab28055a8b14b75b9685938c27573a0 @lsegal lsegal committed Jan 23, 2014
@@ -32,11 +32,9 @@ module.exports = function() {
this.Then(/^the bucket should have a location constraint of "([^"]*)"$/, function(loc, next) {
this.s3.getBucketLocation({Bucket:this.bucket}, function(err, data) {
- if (data && data.LocationConstraint == loc)
- next();
- else
- fail();
- });
+ this.assert.equal(data.LocationConstraint, loc);
+ next();
+ }.bind(this));
});
this.Then(/^I delete the bucket$/, function(next) {
View
@@ -70,19 +70,19 @@ AWS.EventListeners = {
AWS.EventListeners = {
Core: new AWS.SequentialExecutor().addNamedListeners(function(add, addAsync) {
addAsync('VALIDATE_CREDENTIALS', 'validate',
- function VALIDATE_CREDENTIALS(req, doneCallback) {
+ function VALIDATE_CREDENTIALS(req, done) {
req.service.config.getCredentials(function(err) {
if (err) {
- err = AWS.util.error(err,
+ req.response.err = AWS.util.error(err,
{code: 'SigningError', message: 'Missing credentials in config'});
}
- doneCallback(err);
+ done();
});
});
add('VALIDATE_REGION', 'validate', function VALIDATE_REGION(req) {
if (!req.service.config.region && !req.service.hasGlobalEndpoint()) {
- throw AWS.util.error(new Error(),
+ req.response.error = AWS.util.error(new Error(),
{code: 'SigningError', message: 'Missing region in config'});
}
});
@@ -103,13 +103,16 @@ AWS.EventListeners = {
req.httpRequest.headers['Host'] = req.httpRequest.endpoint.host;
});
- addAsync('SIGN', 'sign', function SIGN(req, doneCallback) {
- if (!req.service.api.signatureVersion) return doneCallback(); // none
+ addAsync('SIGN', 'sign', function SIGN(req, done) {
+ if (!req.service.api.signatureVersion) return done(); // none
req.service.config.getCredentials(function (err, credentials) {
- try {
- if (err) return doneCallback(err);
+ if (err) {
+ req.response.error = err;
+ return done();
+ }
+ try {
var date = AWS.util.date.getDate();
var SignerClass = req.service.getSignerClass(req);
var signer = new SignerClass(req.httpRequest,
@@ -122,76 +125,74 @@ AWS.EventListeners = {
// add new authorization
signer.addAuthorization(credentials, date);
- doneCallback();
} catch (e) {
- doneCallback(e);
+ req.response.error = e;
}
+ done();
});
});
- add('SETUP_ERROR', 'extractError', function SETUP_ERROR(resp) {
+ add('VALIDATE_RESPONSE', 'validateResponse', function VALIDATE_RESPONSE(resp) {
if (this.service.successfulResponse(resp, this)) {
- // throwing null will stop the error extraction chain
- // but will not set an error for data extraction
- throw null;
+ resp.data = {};
+ resp.error = null;
+ } else {
+ resp.data = null;
+ resp.error = AWS.util.error(new Error(),
+ {code: 'UnknownError', message: 'An unknown error occurred.'});
}
-
- resp.error = AWS.util.error(new Error(),
- {code: 'UnknownError', message: 'An unknown error occurred.'});
- resp.data = null;
- });
-
- add('SETUP_DATA', 'extractData', function SETUP_DATA(resp) {
- resp.data = {};
- resp.error = null;
});
- add('SEND', 'send', function SEND(resp) {
+ addAsync('SEND', 'send', function SEND(resp, done) {
function callback(httpResp) {
resp.httpResponse.stream = httpResp;
+ resp.httpResponse._abortCallback = done;
httpResp.on('headers', function onHeaders(statusCode, headers) {
- resp.request.emitEvent('httpHeaders', [statusCode, headers, resp]);
+ resp.request.emit('httpHeaders', [statusCode, headers, resp]);
if (!resp.request.httpRequest._streaming) {
if (AWS.HttpClient.streamsApiVersion === 2) { // streams2 API check
httpResp.on('readable', function onReadable() {
var data = httpResp.read();
if (data !== null) {
- resp.request.emitEvent('httpData', [data, resp]);
+ resp.request.emit('httpData', [data, resp]);
}
});
} else { // legacy streams API
httpResp.on('data', function onData(data) {
- resp.request.emitEvent('httpData', [data, resp]);
+ resp.request.emit('httpData', [data, resp]);
});
}
}
});
httpResp.on('end', function onEnd() {
- resp.request.emitEvent('httpDone', [resp]);
+ resp.request.emit('httpDone');
+ done();
});
}
function progress(httpResp) {
httpResp.on('sendProgress', function onSendProgress(progress) {
- resp.request.emitEvent('httpUploadProgress', [progress, resp]);
+ resp.request.emit('httpUploadProgress', [progress, resp]);
});
httpResp.on('receiveProgress', function onReceiveProgress(progress) {
- resp.request.emitEvent('httpDownloadProgress', [progress, resp]);
+ resp.request.emit('httpDownloadProgress', [progress, resp]);
});
}
function error(err) {
- err = AWS.util.error(err, {
+ resp.error = AWS.util.error(err, {
code: 'NetworkingError',
region: resp.request.httpRequest.region,
hostname: resp.request.httpRequest.endpoint.hostname,
retryable: true
});
- resp.request.emitEvent('httpError', [err, resp]);
+ resp.request.emit('httpError', [resp.error, resp], function() {
+ done();
+ });
}
resp.error = null;
@@ -219,7 +220,7 @@ AWS.EventListeners = {
var total = resp.httpResponse.headers['content-length'];
var progress = { loaded: resp.httpResponse.numBytes, total: total };
- resp.request.emitEvent('httpDownloadProgress', [progress, resp]);
+ resp.request.emit('httpDownloadProgress', [progress, resp]);
}
resp.httpResponse.buffers.push(new AWS.util.Buffer(chunk));
@@ -234,19 +235,14 @@ AWS.EventListeners = {
}
delete resp.httpResponse.numBytes;
delete resp.httpResponse.buffers;
-
- this.completeRequest(resp);
- });
-
- add('HTTP_ERROR', 'httpError', function HTTP_ERROR(error, resp) {
- resp.error = error;
- this.completeRequest(resp);
});
add('FINALIZE_ERROR', 'retry', function FINALIZE_ERROR(resp) {
- resp.error.statusCode = resp.httpResponse.statusCode;
- if (resp.error.retryable === undefined) {
- resp.error.retryable = this.service.retryableError(resp.error, this);
+ if (resp.httpResponse.statusCode) {
+ resp.error.statusCode = resp.httpResponse.statusCode;
+ if (resp.error.retryable === undefined) {
+ resp.error.retryable = this.service.retryableError(resp.error, this);
+ }
}
});
@@ -277,25 +273,24 @@ AWS.EventListeners = {
} else if (resp.error.retryable && resp.retryCount < this.service.numRetries()) {
resp.retryCount++;
} else {
- throw resp.error;
+ resp._doneRetry = true;
}
}
});
- addAsync('RETRY_SIGN', 'retry', function RETRY_SIGN(resp, doneCallback) {
- this.emitEvent('sign', resp, doneCallback);
- });
+ addAsync('RESET_RETRY_STATE', 'afterRetry', function RESET_RETRY_STATE(resp, done) {
+ if (!resp._doneRetry && resp.error) {
+ var delay = 0;
+ if (!resp.error.redirect) {
+ delay = this.service.retryDelays()[resp.retryCount-1] || 0;
+ }
- addAsync('RETRY_DELAY_SEND', 'retry', function RETRY_DELAY_SEND(resp, doneCallback) {
- var delay = 0;
- if (!resp.error.redirect) {
- delay = this.service.retryDelays()[resp.retryCount-1] || 0;
+ resp.error = null;
+ delete resp._doneRetry;
+ setTimeout(done, delay);
+ } else {
+ done();
}
-
- setTimeout(function() {
- resp.request.emitEvent('send', resp, doneCallback);
- }, delay);
-
});
}),
Oops, something went wrong.

0 comments on commit 37660f7

Please sign in to comment.