Skip to content
This repository has been archived by the owner on Feb 5, 2020. It is now read-only.

Commit

Permalink
Refactor HttpClient.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenVerborgh committed Jun 20, 2016
1 parent 997230e commit e799227
Showing 1 changed file with 58 additions and 48 deletions.
106 changes: 58 additions & 48 deletions lib/util/HttpClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,79 +54,69 @@ HttpClient.prototype.get = function (url, headers, options) {
* @returns {Iterator} An iterator of the representation
*/
HttpClient.prototype.request = function (url, method, headers, options) {
var responseIterator = new Iterator.PassthroughIterator(true), self = this;
var self = this, responseIterator = new Iterator.PassthroughIterator(true);

function performRequest() {
self._activeRequests++;
self._logger.info('Requesting', url);

// Create the request
var requestOptions = {
var request, startTime = new Date(),
requestHeaders = _.assign({}, self._defaultHeaders, headers),
requestOptions = _.assign({
url: url,
method: method || 'GET',
headers: _.assign({}, self._defaultHeaders, headers),
headers: requestHeaders,
timeout: 5000,
followRedirect: true
}, request, startTime = new Date();
self._logger.info('Requesting', url);
try { request = self._request(options ? _.assign(requestOptions, options) : requestOptions); }
}, options);
try { request = self._request(requestOptions); }
catch (error) { return setImmediate(emitError, error), responseIterator; }
activeRequests[request._id = requestId++] = request;
function emitError(error) {
if (!this._aborted || !error || error.code !== 'ETIMEDOUT')
responseIterator._error(error);
delete activeRequests[request && request._id];
}

// React to a possible response
// Reply to its response
request.on('response', function (response) {
var statusCode = response.statusCode, responseHeaders = response.headers,
responseTime = new Date() - startTime, encoding, contentType, nextRequest;
// Start a possible queued request
delete activeRequests[request._id];
self._activeRequests--;
(nextRequest = self._queue.shift()) && nextRequest();

// Decompress the response when necessary
switch (encoding = responseHeaders['content-encoding'] || '') {
case 'gzip':
response.pipe(response = zlib.createGunzip());
break;
case 'deflate':
response.pipe(response = zlib.createInflate());
break;
case '':
break;
default:
return responseIterator._error(new Error('Unsupported encoding: ' + encoding));
}
response.on('error', emitError);

// Did we not get a time-negotiated response, even though we asked for it?
if (requestOptions.headers['accept-datetime'] && !responseHeaders['memento-datetime']) {
// Try to find a timegate in the links
var links = responseHeaders.link && parseLink(responseHeaders.link);
if (delete activeRequests[request._id])
self._activeRequests--;
var nextRequest = self._queue.shift();
nextRequest && nextRequest();

// Did we ask for a time-negotiated response, but haven't received one?
if (requestHeaders['accept-datetime'] && !response.headers['memento-datetime']) {
// The links might have a timegate that can help us
var links = response.headers.link && parseLink(response.headers.link);
if (links && links.timegate) {
// Obtain a response from the timegate instead
// Try to obtain a time-negotiated response from the timegate instead
var timegateResponse = self.request(links.timegate.url, method, headers, options);
return responseIterator.setSource(timegateResponse);
}
}

// Redirect output to the iterator
response.setEncoding && response.setEncoding('utf8');
response.pause && response.pause();
responseIterator.setSource(response);
// Redirect output to the response iterator
var responseStream = self._decodeResponse(response);
responseStream.setEncoding && responseStream.setEncoding('utf8');
responseStream.pause && responseStream.pause();
responseIterator.setSource(responseStream);
// Responses _must_ be entirely consumed,
// or they can lead to out-of-memory errors (http://nodejs.org/api/http.html)
responseIterator._bufferAll();

// Emit the metadata
contentType = (responseHeaders['content-type'] || '').replace(/\s*(?:;.*)?$/, '');
responseIterator.setProperty('statusCode', statusCode);
responseIterator.setProperty('contentType', contentType);
responseIterator.setProperty('responseTime', responseTime);
})
.on('error', emitError);
responseIterator.setProperty('statusCode', response.statusCode);
responseIterator.setProperty('contentType',
(response.headers['content-type'] || '').replace(/\s*(?:;.*)?$/, ''));
responseIterator.setProperty('responseTime', new Date() - startTime);
});

// Return possible errors on the response iterator
request.on('error', emitError);
function emitError(error) {
if (!this._aborted || !error || error.code !== 'ETIMEDOUT')
responseIterator._error(error);
if (request && delete activeRequests[request._id])
self._activeRequests--;
}
}

// Start or enqueue the request
Expand All @@ -138,6 +128,26 @@ HttpClient.prototype.request = function (url, method, headers, options) {
return responseIterator;
};

// Returns a decompressed stream from the HTTP response
HttpClient.prototype._decodeResponse = function (response) {
var encoding = response.headers['content-encoding'] || '', decodedResponse = response;
switch (encoding) {
case '':
break;
case 'gzip':
response.pipe(decodedResponse = zlib.createGunzip());
break;
case 'deflate':
response.pipe(decodedResponse = zlib.createInflate());
break;
default:
setImmediate(function () {
response.emit('error', new Error('Unsupported encoding: ' + encoding));
});
}
return decodedResponse;
};

// Abort all active requests
HttpClient.abortAll = function () {
for (var id in activeRequests) {
Expand Down

0 comments on commit e799227

Please sign in to comment.