Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Added destroy method for stream #12

Merged
merged 1 commit into from

2 participants

Koen Punt Dan Zajdband
Koen Punt

Implemented missing method destroy on stream and added simple test for streaming API

Koen Punt koenpunt Added destroy method for stream
Simple test for streaming API
Replaced hard-tabs with with soft-tabs
ede5f80
Koen Punt koenpunt closed this
Dan Zajdband danzajdband reopened this
Dan Zajdband danzajdband merged commit 75a6477 into from
Koen Punt

Ok, but maybe initiating a Stream object is better than creating a custom destroy... Trying that right now

Dan Zajdband
Owner

I thought about that. First I merged this to have things working and then need to implement the stream as a real stream...

Koen Punt

I guess the custom Stream object isn't even necessary, because req is already an instance of Stream.

Dan Zajdband
Owner

I don't want to return back to the user the whole request to handle. I like this approach with a custom stream object containing only the methods and data that is relevant to the user.

Koen Punt

Ok fair enough, maybe initiating Stream like:

var stream = new Stream(req);

and setting the request in Stream as instance variable

function Stream(req){
    this.request = req;
}
Stream.prototype.destroy = function(){
    this.request.destroy();
}

so the original request is also available?

I've submitted a PR for this: #13

Dan Zajdband
Owner

There is no need to do that. The only thing I want to implement is a function that destroy the stream, so I can start a new one for a possible reconnection.

Also if I'm starting using a stream from the Stream module I need to use another name to the 'error' event, that means I need to change the API of the module and I don't want that.

I think that adding the new destroy function it's good, at least for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 16, 2012
  1. Koen Punt

    Added destroy method for stream

    koenpunt authored
    Simple test for streaming API
    Replaced hard-tabs with with soft-tabs
This page is out of date. Refresh to see the latest.
2  Makefile
View
@@ -1,4 +1,4 @@
-ALL_TESTS = $(shell find test/ -name '*.test.js')
+ALL_TESTS = $(shell find test/ -name '*.test.js' ! -name '._*')
run-tests:
@./node_modules/.bin/mocha \
28 lib/request.js
View
@@ -6,8 +6,8 @@
var OAuth = require('oauth').OAuth
, Tuiter = module.parent.exports
, qs = require('querystring')
- , events = require('events')
- , util = require('util')
+ , events = require('events')
+ , util = require('util')
, config = require('./config.json');
/*
@@ -15,7 +15,7 @@ var OAuth = require('oauth').OAuth
*/
function Stream(){
- events.EventEmitter.call(this);
+ events.EventEmitter.call(this);
}
util.inherits(Stream, events.EventEmitter);
@@ -93,11 +93,11 @@ Tuiter.prototype.post = function(url, params, callback){
*/
Tuiter.prototype.getStream = function(url, params, callback){
- var stream = new Stream();
+ var stream = new Stream();
var req = this.oa.get(url + '?' + qs.stringify(params), this.oa.access_token_key, this.oa.access_token_secret);
handleStream(req, stream, callback);
-
+
reconnect.call(this, 'getStream', url, params, callback, stream);
return this;
@@ -112,11 +112,12 @@ Tuiter.prototype.getStream = function(url, params, callback){
*/
Tuiter.prototype.postStream = function(url, params, callback){
- var stream = new Stream();
-
+ var stream = new Stream();
+
var req = this.oa.post(url, this.oa.access_token_key, this.oa.access_token_secret, params);
+
handleStream(req, stream, callback);
-
+
reconnect.call(this, 'postStream', url, params, callback, stream);
return this;
@@ -124,11 +125,15 @@ Tuiter.prototype.postStream = function(url, params, callback){
var handleStream = function(req, stream, callback){
var buf = '';
+
+ stream.destroy = function(){
+ req.destroy();
+ }
req.on('response', function(res){
res.setEncoding('utf-8');
- callback(stream);
+ callback(stream);
checkResStatus(res.statusCode, stream);
@@ -165,19 +170,18 @@ var handleStream = function(req, stream, callback){
});
stream.on('end', function(){
- res.end();
stream.destroy();
});
});
req.end();
- req.on('error', function(err){
+ req.on('error', function(err){
//todo tcp error reconnection
callback(stream);
stream.emit('error', err);
stream.emit('tcp error', err);
- });
+ });
};
4 lib/rest.js
View
@@ -28,7 +28,7 @@ Object.keys(config.rest.get).forEach(function(key){
params = params || {};
- method = utils.preprocess(method, params);
+ method = utils.preprocess(method, params);
this.get(method, params, callback);
this.last_request = {
@@ -58,7 +58,7 @@ Object.keys(config.rest.post).forEach(function(key){
params = params || {};
- method = utils.preprocess(method, params);
+ method = utils.preprocess(method, params);
this.post(method, params, callback);
this.last_request = {
4 lib/streaming.js
View
@@ -22,7 +22,7 @@ Object.keys(config.streaming.get).forEach(function(key){
params = params || {};
- utils.preprocess(method, params);
+ utils.preprocess(method, params);
this.getStream(method, params, callback);
return this;
};
@@ -44,7 +44,7 @@ Object.keys(config.streaming.post).forEach(function(key){
params = params || {};
- utils.preprocess(method, params);
+ utils.preprocess(method, params);
this.postStream(method, params, callback);
return this;
};
2  lib/utils.js
View
@@ -21,7 +21,7 @@ exports.preprocess = function(method, params){
}
for(var i in params)
- if(util.isArray(params[i]))
+ if(util.isArray(params[i]))
params[i] = params[i].join(',');
return replaceUrl(method, params);
45 test/streaming.test.js
View
@@ -0,0 +1,45 @@
+
+/*
+ * Module dependencies
+ */
+
+var Tuiter = require('../')
+ , keys = require('./keys.json')
+ , should = require('should');
+
+var t;
+
+describe('streaming', function(){
+
+ before(function(done){
+ t = new Tuiter(keys);
+ done();
+ });
+
+ describe('#filter()', function(){
+ it('should stream tweets about twitter', function(done){
+ var tweetsReceived = 0
+ , timeout = 5000
+ , stream;
+ setTimeout(function(){
+ stream.destroy();
+ }, timeout);
+ t.filter({ track: 'twitter' }, function(s) {
+ stream = s;
+ stream.on('tweet', function(status) {
+ ++tweetsReceived;
+ });
+ stream.on('delete', function(status, error) {
+
+ });
+ stream.on('error', function(status) {
+ throw new Error('[Stream] Error ' + status.code + ': ' + status.description);
+ });
+ stream.on('end', function(response) {
+ //console.log('[Stream] Disconnected from Twitter, received ' + tweetsReceived + ' tweets in ' + timeout / 1000 + ' seconds.');
+ done();
+ });
+ });
+ });
+ });
+});
Something went wrong with that request. Please try again.