Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Added destroy method for stream #12

Merged
merged 1 commit into from

2 participants

@koenpunt

Implemented missing method destroy on stream and added simple test for streaming API

@koenpunt koenpunt Added destroy method for stream
Simple test for streaming API
Replaced hard-tabs with with soft-tabs
ede5f80
@koenpunt koenpunt closed this
@impronunciable impronunciable merged commit 75a6477 into impronunciable:master
@koenpunt

Ok, but maybe initiating a Stream object is better than creating a custom destroy... Trying that right now

@impronunciable

I thought about that. First I merged this to have things working and then need to implement the stream as a real stream...

@koenpunt

I guess the custom Stream object isn't even necessary, because req is already an instance of Stream.

@impronunciable

I don't want to return back to the user the whole request to handle. I like this approach with a custom stream object containing only the methods and data that is relevant to the user.

@koenpunt

Ok fair enough, maybe initiating Stream like:

var stream = new Stream(req);

and setting the request in Stream as instance variable

function Stream(req){
    this.request = req;
}
Stream.prototype.destroy = function(){
    this.request.destroy();
}

so the original request is also available?

I've submitted a PR for this: #13

@impronunciable

There is no need to do that. The only thing I want to implement is a function that destroy the stream, so I can start a new one for a possible reconnection.

Also if I'm starting using a stream from the Stream module I need to use another name to the 'error' event, that means I need to change the API of the module and I don't want that.

I think that adding the new destroy function it's good, at least for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 16, 2012
  1. @koenpunt

    Added destroy method for stream

    koenpunt authored
    Simple test for streaming API
    Replaced hard-tabs with with soft-tabs
This page is out of date. Refresh to see the latest.
View
2  Makefile
@@ -1,4 +1,4 @@
-ALL_TESTS = $(shell find test/ -name '*.test.js')
+ALL_TESTS = $(shell find test/ -name '*.test.js' ! -name '._*')
run-tests:
@./node_modules/.bin/mocha \
View
28 lib/request.js
@@ -6,8 +6,8 @@
var OAuth = require('oauth').OAuth
, Tuiter = module.parent.exports
, qs = require('querystring')
- , events = require('events')
- , util = require('util')
+ , events = require('events')
+ , util = require('util')
, config = require('./config.json');
/*
@@ -15,7 +15,7 @@ var OAuth = require('oauth').OAuth
*/
function Stream(){
- events.EventEmitter.call(this);
+ events.EventEmitter.call(this);
}
util.inherits(Stream, events.EventEmitter);
@@ -93,11 +93,11 @@ Tuiter.prototype.post = function(url, params, callback){
*/
Tuiter.prototype.getStream = function(url, params, callback){
- var stream = new Stream();
+ var stream = new Stream();
var req = this.oa.get(url + '?' + qs.stringify(params), this.oa.access_token_key, this.oa.access_token_secret);
handleStream(req, stream, callback);
-
+
reconnect.call(this, 'getStream', url, params, callback, stream);
return this;
@@ -112,11 +112,12 @@ Tuiter.prototype.getStream = function(url, params, callback){
*/
Tuiter.prototype.postStream = function(url, params, callback){
- var stream = new Stream();
-
+ var stream = new Stream();
+
var req = this.oa.post(url, this.oa.access_token_key, this.oa.access_token_secret, params);
+
handleStream(req, stream, callback);
-
+
reconnect.call(this, 'postStream', url, params, callback, stream);
return this;
@@ -124,11 +125,15 @@ Tuiter.prototype.postStream = function(url, params, callback){
var handleStream = function(req, stream, callback){
var buf = '';
+
+ stream.destroy = function(){
+ req.destroy();
+ }
req.on('response', function(res){
res.setEncoding('utf-8');
- callback(stream);
+ callback(stream);
checkResStatus(res.statusCode, stream);
@@ -165,19 +170,18 @@ var handleStream = function(req, stream, callback){
});
stream.on('end', function(){
- res.end();
stream.destroy();
});
});
req.end();
- req.on('error', function(err){
+ req.on('error', function(err){
//todo tcp error reconnection
callback(stream);
stream.emit('error', err);
stream.emit('tcp error', err);
- });
+ });
};
View
4 lib/rest.js
@@ -28,7 +28,7 @@ Object.keys(config.rest.get).forEach(function(key){
params = params || {};
- method = utils.preprocess(method, params);
+ method = utils.preprocess(method, params);
this.get(method, params, callback);
this.last_request = {
@@ -58,7 +58,7 @@ Object.keys(config.rest.post).forEach(function(key){
params = params || {};
- method = utils.preprocess(method, params);
+ method = utils.preprocess(method, params);
this.post(method, params, callback);
this.last_request = {
View
4 lib/streaming.js
@@ -22,7 +22,7 @@ Object.keys(config.streaming.get).forEach(function(key){
params = params || {};
- utils.preprocess(method, params);
+ utils.preprocess(method, params);
this.getStream(method, params, callback);
return this;
};
@@ -44,7 +44,7 @@ Object.keys(config.streaming.post).forEach(function(key){
params = params || {};
- utils.preprocess(method, params);
+ utils.preprocess(method, params);
this.postStream(method, params, callback);
return this;
};
View
2  lib/utils.js
@@ -21,7 +21,7 @@ exports.preprocess = function(method, params){
}
for(var i in params)
- if(util.isArray(params[i]))
+ if(util.isArray(params[i]))
params[i] = params[i].join(',');
return replaceUrl(method, params);
View
45 test/streaming.test.js
@@ -0,0 +1,45 @@
+
+/*
+ * Module dependencies
+ */
+
+var Tuiter = require('../')
+ , keys = require('./keys.json')
+ , should = require('should');
+
+var t;
+
+describe('streaming', function(){
+
+ before(function(done){
+ t = new Tuiter(keys);
+ done();
+ });
+
+ describe('#filter()', function(){
+ it('should stream tweets about twitter', function(done){
+ var tweetsReceived = 0
+ , timeout = 5000
+ , stream;
+ setTimeout(function(){
+ stream.destroy();
+ }, timeout);
+ t.filter({ track: 'twitter' }, function(s) {
+ stream = s;
+ stream.on('tweet', function(status) {
+ ++tweetsReceived;
+ });
+ stream.on('delete', function(status, error) {
+
+ });
+ stream.on('error', function(status) {
+ throw new Error('[Stream] Error ' + status.code + ': ' + status.description);
+ });
+ stream.on('end', function(response) {
+ //console.log('[Stream] Disconnected from Twitter, received ' + tweetsReceived + ' tweets in ' + timeout / 1000 + ' seconds.');
+ done();
+ });
+ });
+ });
+ });
+});
Something went wrong with that request. Please try again.