Skip to content

Commit

Permalink
files
Browse files Browse the repository at this point in the history
  • Loading branch information
aivis committed Oct 26, 2012
0 parents commit 80743d3
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules/
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require('./lib/user-stream');
195 changes: 195 additions & 0 deletions lib/user-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
var oauth = require('oauth');
events = require('events'),
util = require("util");

var user_stream_url = 'https://userstream.twitter.com/1.1/user.json',
request_token_url = 'https://api.twitter.com/oauth/request_token',
access_token_url = 'https://api.twitter.com/oauth/access_token';

module.exports = Stream;

function Stream(params) {

if (!(this instanceof Stream)) {
return new Stream(params);
}

events.EventEmitter.call(this);

this.params = params;

this.oauth = new oauth.OAuth(
request_token_url,
access_token_url,
this.params.consumer_key,
this.params.consumer_secret,
'1.0',
null,
'HMAC-SHA1',
null,
{
'Accept': '*/*',
'Connection'
: 'close',
'User-Agent': 'teeter.js'
}
);

}

//inherit
util.inherits(Stream, events.EventEmitter);

/**
* Create twitter use stream
*
* Events:
* - data
* - garbage
* - close
* - error
* - connected
* - heartbeat
*
*/
Stream.prototype.stream = function() {

var stream = this;

var request = this.oauth.post(
user_stream_url,
this.params.access_token_key,
this.params.access_token_secret,
{
delimited: 'length',
stall_warnings: 'true'
},
null
);

/**
* Destroy socket
*/
this.destroy = function() {
request.socket.destroy();
//remove heartbeat checking interval
clearInterval(stream.heart_int_id);
}

//last heartbeat/onData time
this.last_heartbeat = new Date().getTime();
//heartbeat interval id
this.heart_int_id = null;

//check heartbeats
this.check_heartbeat = function() {

stream.heart_int_id = setInterval(function(){

//if last heartbeat is smaller than one minute - destroy socket (each 30 sec. twiiter emit heartbeat)
if (stream.last_heartbeat < new Date().getTime()-30000) {
stream.destroy();
}

}, 60000); //check every 1 min.

}();

request.on('response', function(response) {

// Any response code greater then 200 from steam API is an error
if(response.statusCode > 200) {

stream.emit('error', {type: 'response', data: {code:response.statusCode}});

} else {

var buffer = '',
next_data_length = 0,
end = '\r\n';

//emit connected event
stream.emit('connected');

//set chunk encoding
response.setEncoding('utf8');

response.on('data', function(chunk) {

stream.last_heartbeat = new Date().getTime();

//is heartbeat?
if (chunk == end) {
stream.emit('heartbeat');
return;
}

//check whether new incomming data set
if (!buffer.length) {
//get length of incomming data
var line_end_pos = chunk.indexOf(end);
next_data_length = parseInt(chunk.slice(0, line_end_pos));
//slice data length string from chunk
chunk = chunk.slice(line_end_pos+end.length);
}

if (buffer.length != next_data_length) {
//data set recieved
//first remove end and append to buffer
buffer+= chunk.slice(0, chunk.indexOf(end));
//parse json
var parsed = false;
try {
//try parse & emit
buffer = JSON.parse(buffer);
parsed = true;
} catch(e) {
stream.emit('garbage', buffer);
}

//don't emit into "try" and emit only if data formatted
if (parsed) {
stream.emit('data', buffer);
}

//clean buffer
buffer = '';

} else {
//append to buffer
buffer+=chunk;
}

});

response.on('error', function(error) {

stream.emit('close', error);

});

response.on('end', function() {

stream.emit('close', 'socket end');

});

response.on('close', function() {

stream.destroy();

});
}

});

request.on('error', function(error) {

stream.emit('error', {type: 'request', data: error});

});

request.end();

}

11 changes: 11 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"name": "user-stream",
"version": "0.0.1",
"description": "Simple twitter user stream client. Only one dependency (oauth).",
"keywords": ["twitter","streaming"],
"preferGlobal": "true",
"author": "Aivis Silins",
"dependencies": {
"oatuh": "0.9.8"
}
}
51 changes: 51 additions & 0 deletions test/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
var Stream = require('./../index');
var stream = new Stream({
consumer_key: 'xxx',
consumer_secret: 'xxx',
access_token_key: 'xxx',
access_token_secret: 'xxx'
});

// * - data
// * - garbage
// * - close
// * - error
// * - connected
// * - heartbeat

//create stream
stream.stream();

//stream JSON data
stream.on('data', function(data){
console.log('Data:');
console.log(data);
});

//incorrect json strings (can't parse to json)
stream.on('garbage', function(data){
console.log('Can\'t be formatted:');
console.log(data);
});

//heartbeats
stream.on('heartbeat', function(){
console.log('Heartbeat');
});

//connected
stream.on('connected', function(){
console.log('Stream created');
});

//connection errors (request || response)
stream.on('error', function(error){
console.log('Connection error:');
console.log(error);
});

//stream close event
stream.on('close', function(error){
console.log('Stream closed');
console.log(error);
});

0 comments on commit 80743d3

Please sign in to comment.