Permalink
Browse files

added rate limit handler for streaming API Calls for #28

  • Loading branch information...
1 parent 0dbe45f commit f380942f45596c8d214a0bc53726a8332187d4aa @impronunciable committed May 9, 2013
Showing with 29 additions and 18 deletions.
  1. +28 −16 lib/tuiter.js
  2. +1 −2 test/streaming.test.js
View
@@ -65,7 +65,7 @@ Object.keys(endpoints).forEach(function(key){
}
debug('processing query parameters');
- preProcess(params);
+ params = preProcess(params);
debug('Starting API call for %s', endpoints[key].resource);
APIRequest.call(this, endpoints[key], params, callback);
return this;
@@ -114,18 +114,7 @@ var APIRequest = function(endpoint, params, callback) {
Tuiter.prototype.handleSimpleRequest = function(req, endpoint, params, callback) {
var self = this;
req.end(function(res){
- debug('data received for %s', endpoint.resource);
- if (res.statusCode == 429) {
- self.sleep_duration = (res.header['x-rate-limit-reset']*1000 - new Date()) + 7000;
- debug('rate limit reached, sleeping during', Math.ceil(sleep_duration/60000),
- 'minutes');
- setTimeout(function() {
- debug('rate limit reset, resuming extraction...');
- APIRequest.call(self, endpoint, params, callback);
- }, sleep_duration);
- } else {
- callback(res.error, res.body);
- }
+ self.handleRateLimit.call(self, res, endpoint, params, callback, [res.error, res.body]);
});
};
@@ -144,8 +133,13 @@ Tuiter.prototype.handleStream = function(req, endpoint, params, callback) {
stream.emitAPIEvents();
stream.autoReconnect();
- req.pipe(stream);
- callback(stream);
+ req.pipe(stream);
+
+ req.req.on('response',function(res){
+ self.handleRateLimit.call(self, res, endpoint, params, callback, [stream]);
+ if (res.statusCode == 429)
+ stream.emit('end');
+ });
stream.on('end', function(){
stream.end();
@@ -155,7 +149,7 @@ Tuiter.prototype.handleStream = function(req, endpoint, params, callback) {
stream.on('reconnect', function(new_args){
params = new_args || params;
stream.emit('end');
- preProcess(params);
+ params = preProcess(params);
APIRequest.call(self, endpoint, params, callback);
});
};
@@ -199,6 +193,22 @@ Tuiter.prototype.resetTimers = function() {
this.rate_timeout = this.rate_timeout || 60000;
};
+Tuiter.prototype.handleRateLimit = function(res, endpoint, params, callback, cb_params) {
+ var self = this;
+ if (res.statusCode == 429) {
+ self.sleep_duration = (res.header['x-rate-limit-reset']*1000 - new Date()) + 7000;
+ debug('rate limit reached, sleeping during', Math.ceil(sleep_duration/60000),
+ 'minutes');
+ setTimeout(function() {
+ debug('rate limit reset, resuming extraction...');
+ params = preProcess(params);
+ APIRequest.call(self, endpoint, params, callback);
+ }, sleep_duration);
+ } else {
+ callback.apply(self, cb_params);
+ }
+};
+
/*
* Handle TCP error
*/
@@ -256,6 +266,8 @@ var preProcess = function(obj) {
// join arrays
for(var i in obj)
if(util.isArray(obj[i])) obj[i] = obj[i].join(',');
+
+ return obj;
};
/*
View
@@ -47,8 +47,7 @@ describe('streaming', function(){
if(tweet.text.indexOf('love') != -1) is_love = true;
});
});
-
- setTimeout(st.emit.bind(st, 'reconnect', {track: 'love'}), 2500);
+ setTimeout(function(){st.emit.bind(st, 'reconnect', {track: 'love'})}, 2500);
setTimeout(function(){
st.emit('end');
is_love.should.be.ok;

0 comments on commit f380942

Please sign in to comment.