Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Updated the request process and fixed proxy support

  • Loading branch information...
commit 188bf1a7df7f95a57b67af81dd650ab54106f10b 1 parent 315827c
@chriso authored
View
4 HISTORY.md
@@ -1,3 +1,7 @@
+### v0.4.0
+ * Ported the entire request process to use mikeal/request
+ * Full HTTP proxy support
+
### v0.3.9
* Expose JSDOM window so it can be closed manually to prevent leaks
* Added the expand_single_selected option (see issue #34)
View
14 lib/node.io/process_worker.js
@@ -6,8 +6,7 @@
var Processor = require('./processor').Processor,
crc32 = require('./utils').crc32,
- JobClass = require('./job').JobClass,
- HttpProxy = require('./request').HttpProxy;
+ JobClass = require('./job').JobClass;
/**
* Routes messages received from the master process.
@@ -234,17 +233,6 @@ Processor.prototype.setupWorkerEvents = function (job, master) {
instance = createInstance();
}
- //Bind a proxy if the `proxy` op is set. Also allow the user to specify
- //a callback which returns a Proxy (e.g. to easily cycle proxies)
- if (typeof job.options.proxy.proxify === 'function') {
- job.options.proxy.proxify(instance);
- } else if (typeof job.options.proxy === 'function') {
- job.options.proxy().proxify(instance);
- } else if (typeof job.options.proxy === 'string') {
- job.options.proxy = new HttpProxy(job.options.proxy);
- job.options.proxy.proxify(instance);
- }
-
//Assign some input to the instance
var num = job.options.take;
while (job.input.length > 0 && num--) {
View
308 lib/node.io/request.js
@@ -10,7 +10,8 @@ var http = require('http'),
urlparse = require('url').parse,
query = require('querystring'),
Job = require('./job').JobProto,
- utils = require('./utils');
+ utils = require('./utils')
+ request = require('request');
/**
* The default headers to send when using createClient()
@@ -178,9 +179,8 @@ Job.prototype.encodeBody = function (body, use_json) {
* @param {Function} parse (optional)
* @api public
*/
-Job.prototype.doRequest = function (method, resource, body, headers, callback, parse, redirects) {
- var self = this, host, port, url, path, rid, secure, request, cleanup, h,
- request_response, options, on_complete;
+Job.prototype.doRequest = function (method, resource, body, headers, callback, parse) {
+ var self = this, host, port, url, path, rid, secure, h, options, on_complete;
//Give each a request a unique ID for debugging
rid = Math.floor(Math.random() * 100000);
@@ -218,13 +218,6 @@ Job.prototype.doRequest = function (method, resource, body, headers, callback, p
}
};
- //Internally keep track of the # of redirects
- redirects = redirects || 0;
- if (redirects > this.options.redirects) {
- callback('redirects');
- return;
- }
-
//Add a protocol if there isn't one
if (!resource.match(/https?:\/\//)) {
resource = 'http://' + resource;
@@ -242,14 +235,9 @@ Job.prototype.doRequest = function (method, resource, body, headers, callback, p
//Copy `headers` before modifying it
headers = utils.put({}, headers);
- if (typeof headers.host === 'undefined') {
- headers.host = url.hostname;
- }
-
- //Add headers set before the doRequest call if from the same host (e.g. cookie, user-agent, referer, etc.)
- if (typeof this.last.headers === 'object' && this.last.host === url.hostname) {
+ //Add headers from a previous request if this is a nested request
+ if (this.last.headers) {
utils.put(headers, this.last.headers);
- this.last = {};
}
//Add headers added by setHeader, setCookie, etc.
@@ -269,27 +257,6 @@ Job.prototype.doRequest = function (method, resource, body, headers, callback, p
headers['Content-Length'] = Buffer.byteLength(body);
}
- //Determine the port and add it to the host header
- port = url.port;
- if (!port) {
- switch (url.protocol) {
- case 'http:':
- port = 80;
- break;
- case 'https:':
- port = 443;
- secure = true;
- break;
- case 'ftp:':
- port = 21;
- break;
- default:
- port = 80;
- }
- } else {
- headers.host += ':' + port;
- }
-
method = method.toUpperCase();
//Debug request headers
@@ -299,27 +266,51 @@ Job.prototype.doRequest = function (method, resource, body, headers, callback, p
this.debug(' | ' + h[0].toUpperCase() + h.substr(1) + ': ' + headers[h]);
}
- host = url.hostname ? url.hostname : headers.host;
-
options = {
- host: host,
- port: port,
- path: path,
+ url: url,
method: method,
- headers: headers
- };
+ headers: headers,
+ body: body,
+ maxRedirects: this.options.redirects,
+ encoding: this.options.encoding,
+ }
- request = (secure ? https : http).request(options, function (response) {
+ //Use a HTTP proxy?
+ if (this.options.proxy) {
+ options.proxy = typeof this.options.proxy === 'function'
+ ? this.options.proxy()
+ : this.options.proxy;
+ }
- response.url = resource;
+ //Set a request timeout?
+ if (this.options.timeout) {
+ self.cancel_timeout();
+ options.timeout = this.options.timeout;
+ }
- request_reponse = response;
+ //We can parse chunks as they are received if using soupselect/htmlparser
+ if (this.htmlparser) {
+ options.onResponse = function (err, response) {
+ if (!err) {
+ response.on('data', function (chunk) {
+ self.htmlparser.parseChunk(chunk);
+ });
+ }
+ }
+ }
+ request(options, function (err, response, body) {
if (self.is_complete) {
- return cleanup();
+ return;
}
- response.setEncoding(self.options.encoding);
+ if (err) {
+ self.debug('\x1B[31mERR\x1B[0m Request ' + rid + ' failed with (' + (err.errno || '?') + ') ' + err + ' ('+resource+')');
+ if (/maxRedirects/.test(err.message)) {
+ err = 'redirects'; //compat
+ }
+ return callback(err);
+ }
var code = response.statusCode || 200;
@@ -343,111 +334,28 @@ Job.prototype.doRequest = function (method, resource, body, headers, callback, p
//Handle http response codes
if (!self.ignore_code) {
switch (Math.floor(code/100)) {
+ case 3:
case 4:
case 5:
self.debug('\x1B[31mERR\x1B[0m Request ' + rid + ' failed with code ' + code + ' ('+resource+')');
return callback(code);
- case 3:
- if (typeof response.headers.location === 'undefined') {
- self.debug('\x1B[31mERR\x1B[0m Request ' + rid + ' failed with invalid 30x response ('+resource+')');
- callback(code);
- } else {
- //Handle the 30x redirect
- var location = resolve(resource, response.headers.location);
- var redirect = urlparse(location);
- if (redirect.host) {
- headers.host = redirect.host;
- }
- if (code === 302 || code === 303) {
- // morph this post request into a get request
- for (header in headers) {
- h = header.toLowerCase();
- if (h === 'content-length' || h === 'content-type') {
- delete headers[header];
- }
- }
- self.doRequest('GET', location, null, headers, callback, parse, ++redirects);
- } else {
- self.doRequest(method, location, body, headers, callback, parse, ++redirects);
- }
- }
- return;
}
}
- var body = '';
- response.on('data', function (chunk) {
- self.bytes_received += chunk.length;
- if (self.htmlparser) {
- self.htmlparser.parseChunk(chunk);
- }
- if (self.is_complete) {
- return cleanup();
- }
- body = body + chunk;
- });
-
- response.on('end', function () {
- if (self.is_complete) {
- return cleanup();
- }
+ var parse_callback = function (err, data) {
+ callback(err, data, response.headers, response);
+ }
- var parse_callback = function (err, data) {
- callback(null, data, response.headers, response);
- };
-
- if (!parse) {
- //If no parse function was specified, just return the body
- parse_callback(null, body);
- } else {
- //Call the parse function on the response body - handle async and sync cases
- var ret = parse(body, parse_callback);
- if (typeof ret !== 'undefined') {
- parse_callback(null, ret);
- }
+ //Parse the response body with a custom parser?
+ if (parse) {
+ var ret = parse(body, parse_callback);
+ if (typeof ret !== 'undefined') {
+ callback(null, ret, response.headers, response);
}
-
- cleanup();
- });
-
- });
-
- //Write a body if it was specified
- if (body) {
- request.write(body);
- }
-
- //This method is called on each event if the instance is already complete (i.e. timed out)
- cleanup = function () {
- if (request.socket && request.socket.destroy) {
- request.socket.destroy();
- }
- if (request_response) {
- request_response.abort();
+ } else {
+ parse_callback(null, body)
}
- };
-
- //Watch for errors
- request.on('error', function (err) {
- if (self.is_complete) return;
- self.debug('\x1B[31mERR\x1B[0m Request ' + rid + ' failed with (' + err.errno + ') ' + err + ' ('+resource+')');
- cleanup();
- callback(err);
});
-
- //Set a special timeout if the `timeout` option is set. Redirects do not reset the timeout
- if (this.options.timeout && redirects === 0) {
- self.cancel_timeout();
- self.timeout = setTimeout(function() {
- if (self.is_complete) return;
- self.debug('\x1B[31mERR\x1B[0m Request ' + rid + ' timed out ('+resource+')');
- cleanup();
- callback('timeout');
- }, this.options.timeout * 1000);
- }
-
- //We're done
- request.end();
};
/**
@@ -515,113 +423,3 @@ Job.prototype.getBytesReceived = function () {
return this.bytes_received;
};
-/**
- * Creates a new proxy with the specified (optional) callbacks. The callbacks
- * are used to synchronously modify the url, request headers, parse callback,
- * and response headers surrounding a doRequest().
- *
- * @param {Function} url_callback
- * @param {Function} req_header_callback
- * @param {Function} parse_callback
- * @param {Function} res_header_callback
- * @api public
- */
-var Proxy = function (url_callback, req_header_callback, parse_callback, res_header_callback) {
- this.url_callback = url_callback;
- this.req_header_callback = req_header_callback;
- this.res_header_callback = res_header_callback;
- this.parse_callback = parse_callback;
-};
-
-/**
- * Proxifies the doRequest() method in the specified job instance.
- *
- * @param {Object} job
- * @return {Object} job
- * @api public
- */
-Proxy.prototype.proxify = function (job) {
- var self = this, doRequest = job.doRequest;
-
- job.doRequest = function (method, resource, body, headers, callback, parse, redirects) {
-
- if (typeof headers === 'function') {
- parse = callback;
- callback = headers;
- headers = default_headers;
- }
-
- //Modify the request url
- if (self.url_callback) {
- resource = self.url_callback(resource);
- }
-
- //Modify request headers
- if (self.req_header_callback) {
- headers = self.req_header_callback(headers);
- }
-
- //Modify the response body
- if (self.parse_callback) {
- if (!parse) {
- parse = self.parse_callback;
- } else {
- //If a parse function is passed to fetch, chain it together with the proxy parse_callback
- var old_parse = parse;
- parse = function (err, data) {
- var ret = self.parse_callback(data, old_parse);
- if (typeof ret !== 'undefined') {
- old_parse(null, ret);
- }
- };
- }
- }
-
- //Modify response headers
- if (self.res_header_callback) {
- var old_callback = callback;
- callback = function (err, data, headers) {
- headers = self.res_header_callback(headers);
- return old_callback(err, data, headers);
- };
- }
-
- doRequest.apply(job, [method, resource, body, headers, callback, parse, redirects]);
- };
-
- return job;
-};
-
-/**
- * Creates a new proxy that routes requests through the specified host.
- *
- * @param {Function} host
- * @api public
- */
-var HttpProxy = function (host) {
- var proxy = urlparse(host), urlhost;
- var url_callback = function (url) {
- var u = urlparse(url);
- urlhost = u.host;
- url = (proxy.protocol || 'http:')
- + '//' + proxy.host
- + (u.pathname || '/');
-
- if (u.search) {
- url += u.search;
- }
- return url;
- };
- var header_callback = function (headers) {
- headers = headers || {};
- headers.host = urlhost;
- return headers;
- };
- return new Proxy(url_callback, header_callback);
-};
-
-/**
- * Export proxies
- */
-exports.HttpProxy = HttpProxy;
-exports.Proxy = Proxy;
View
5 package.json
@@ -1,6 +1,6 @@
{ "name" : "node.io",
"description" : "A distributed data scraping and processing framework",
- "version" : "0.3.9",
+ "version" : "0.4.0",
"homepage" : "http://github.com/chriso/node.io",
"keywords" : ["data","mapreduce","map","reduce","scraping","html","parsing","parse","scrape","process","processing","data"],
"author" : "Chris O'Hara <cohara87@gmail.com>",
@@ -20,7 +20,8 @@
"htmlparser": ">= 1.7.3",
"coffee-script": ">= 0.9.5",
"jquery": ">= 1.4.4",
- "jsdom": ">= 0.2.0"
+ "jsdom": ">= 0.2.0",
+ "request": ">= 2.1.1"
},
"devDependencies": { "expresso": "*" },
"bin": {
Please sign in to comment.
Something went wrong with that request. Please try again.