Skip to content
Browse files

Uses queueleuleu instead of kue and better organisation

  • Loading branch information...
1 parent ed4f514 commit b879b5660ea4fb6d8459c78b45cfd85d09fce419 @dmongeau dmongeau committed Dec 4, 2012
Showing with 228 additions and 276 deletions.
  1. +2 −1 .gitignore
  2. 0 examples/config.sample.js
  3. +19 −0 examples/filter.js
  4. +7 −267 index.js
  5. +193 −0 lib/processor.js
  6. +7 −8 package.json
View
3 .gitignore
@@ -1 +1,2 @@
-node_modules/
+node_modules/
+**/config.js
View
0 examples/config.sample.js
No changes.
View
19 examples/filter.js
@@ -0,0 +1,19 @@
+
+var twitterProcessor = require('../index');
+var CONFIG = require('./config');
+
+var processor = twitterProcessor.createProcessor({
+ 'debug' : true,
+ 'processor' : function(job,done) {
+ job.data.addedKey = 'value';
+ done();
+ },
+ 'twitter' : CONFIG.twitter,
+ 'filter' : {
+ 'track' : 'Twitter'
+ }
+});
+
+processor.on('tweet', function(tweet) {
+ console.log('Tweet',tweet.text);
+});
View
274 index.js
@@ -1,277 +1,17 @@
-var kue = require('kue');
-var twitter = require('ntwitter');
-var EventEmitter = require('events').EventEmitter;
+var TwitterProcessor = require('./lib/processor');
-var utils = require('./lib/utils');
+module.exports = exports = {
-/*
- *
- * TwitterProcessing
- *
- */
-var TwitterProcessing = function(options) {
+ 'TwitterProcessor' : TwitterProcessor,
- //Call eventemitter constructor
- EventEmitter.call(this);
+ 'createProcessor' : function(opts) {
- //Options
- this.options = utils.merge({
- 'debug' : false,
- 'processor' : function(job,done){
- done();
- },
- 'redis' : {
- 'client' : null,
- 'host' : '127.0.0.1',
- 'port' : 6379
- },
- 'twitter' : {
- 'consumer_key' : 'Twitter',
- 'consumer_secret' : 'API',
- 'access_token_key' : 'keys',
- 'access_token_secret' : 'go here'
- },
- 'filter' : {
- 'track' : ['twitter']
- },
- 'pauseTimeout' : 5000,
- 'queueName' : 'twitter_processing_'
- },options);
+ var processor = new TwitterProcessor(opts);
- //Server
- this.twitter = this.createTwitter();
+ return processor;
- //Twitter Stream
- this.currentStream = null;
-
- //Queue
- //this.redis = this.createRedis();
- this.jobs = this.createQueue();
- this.jobsCount = 0;
-
- //Processor
- this.processor = this.options.processor || null;
-
- //State
- this.isProcessing = false;
- this.isStarted = false;
-
- //Initialize
- this.init();
-
-};
-
-//Extend EventEmitter
-TwitterProcessing.prototype = Object.create(EventEmitter.prototype);
-
-/*
- *
- * Create methods
- *
- */
-//Create queue
-TwitterProcessing.prototype.createQueue = function() {
- var self = this;
- kue.redis.createClient = function() {
- if(typeof(self.options.redis.client) != 'undefined' && self.options.redis.client) {
- return self.options.redis.client;
- }
- var redis = require('redis');
- var client = redis.createClient(self.options.redis.port, self.options.redis.host);
- if(self.options.redis.password) {
- client.auth(self.options.redis.password);
- }
- return client;
- };
- var jobs = kue.createQueue();
- return jobs;
-}
-
-//Create Redis
-TwitterProcessing.prototype.createRedis = function() {
- var redis = require('redis');
- var client = redis.createClient(this.options.redis.port, this.options.redis.host);
- if(this.options.redis.password) {
- client.auth(this.options.redis.password);
- }
- return client;
-}
-
-//Create Twitter
-TwitterProcessing.prototype.createTwitter = function() {
- var twit = new twitter(this.options.twitter);
- return twit;
-};
-
-/*
- *
- * Init methods
- *
- */
-TwitterProcessing.prototype.init = function() {
-
- var self = this;
-
- //Init kue
- this.jobs.on('job complete', function(id){
- if(self.options.debug) {
- console.log('Kue: Job completed', id);
- }
- self.jobsCount--;
- kue.Job.get(id, function(err, job){
- if (err || !job) return;
-
- self.emit('tweet',job.data);
-
- job.remove(function(err){
- if(self.options.debug) {
- console.log('Kue: Removing job', err, job.data.text);
- }
- if(self.jobsCount == 0) {
- self.isProcessing = false;
- if(self.options.debug) {
- console.log('Kue: Queue completed');
- }
- }
- if (err) throw err;
- });
- });
- });
-
-};
-
-TwitterProcessing.prototype._startStream = function() {
-
- if(!this._retriesCount) {
- this._retriesCount = 1;
- }
-
- var self = this;
-
- this.twitter.stream('statuses/filter', this.options.filter, function(stream) {
-
- if(self.options.debug) {
- console.log('Twitter stream started');
- }
-
- self.currentStream = stream;
-
- //On new tweet
- stream.on('data', function (tweet) {
-
- //Filter tweet text to remove symbols
- tweet.textFiltered = utils.filterTweetText(tweet.text);
-
- //console.log('Tweet: '+tweet.textFiltered);
-
- //Create the job
- self.jobs.create(self.options.queueName, tweet).save();
- self.jobsCount++;
-
- //Start the queue if it is paused
- if(!self.isProcessing) {
- self.jobs.process(self.options.queueName,function() {
- self.process.apply(self,arguments);
- });
- self.isProcessing = true;
- }
-
- });
-
- //On stream error
- stream.on('error', function (response) {
- if(self.options.debug) {
- console.log('Twitter error',arguments);
- }
- try {
- self._retriesCount = self._retriesCount * 2;
- stream.destroy();
- } catch(e){}
- });
-
- //On stream end
- stream.on('end', function (response) {
- try {
- self._retriesCount = 1;
- stream.destroy();
- } catch(e){}
- });
-
- //On stream destroy, reconnect
- stream.on('destroy', function (response) {
-
- //Reconnect if the destroy was not intented
- if(self.isStarted) {
- if(self.options.debug) {
- console.log('Twitter stream reconnect in '+self._retriesCount+' second(s)');
- }
- setTimeout(function() {
- self._startStream();
- },self._retriesCount * 1000);
- }
- });
-
- });
-
-};
-
-//Start
-TwitterProcessing.prototype.start = function() {
-
- if(this.options.debug) {
- console.log('TwitterProcessing start');
- }
-
- this.isStarted = true;
-
- this._startStream();
-
-};
-
-//Stop the streamer
-TwitterProcessing.prototype.stop = function() {
-
- if(this.options.debug) {
- console.log('TwitterProcessing stop');
}
- this.isStarted = false;
-
- if(this.currentStream) {
- this.currentStream.end();
- }
-
-};
-
-
-//Process tweet
-TwitterProcessing.prototype.process = function(job,done) {
-
- if(!this.processor) done();
-
- if(this.processor) {
- this.processor(job,function() {
- job.save(done);
- });
- }
-
- return;
-
-};
-
-
-/*
- *
- * Factory
- *
- */
-function factory(options) {
-
- var twittertts = new TwitterProcessing(options);
-
- return twittertts;
-
-}
-module.exports = exports = factory;
+};
View
193 lib/processor.js
@@ -0,0 +1,193 @@
+var twitter = require('ntwitter');
+var queueleuleu = require('queueleuleu');
+var EventEmitter = require('events').EventEmitter;
+var utils = require('./utils');
+
+/*
+ *
+ * TwitterProcessor
+ *
+ */
+var TwitterProcessor = function(options) {
+
+ //Call eventemitter constructor
+ EventEmitter.call(this);
+
+ //Options
+ this.options = utils.merge({
+ 'debug' : false,
+ 'processor' : function(job,done){
+ done();
+ },
+ 'twitter' : {
+ 'consumer_key' : 'Twitter',
+ 'consumer_secret' : 'API',
+ 'access_token_key' : 'keys',
+ 'access_token_secret' : 'go here'
+ },
+ 'filter' : {
+ 'track' : 'twitter'
+ },
+ 'pauseTimeout' : 5000
+ },options);
+
+ //Server
+ this.twitter = null;
+
+ //Twitter Stream
+ this.currentStream = null;
+
+ //Queue
+ this.queue = null;
+
+ //Processor
+ this.processor = this.options.processor || null;
+
+ //State
+ this.isProcessing = false;
+ this.isStarted = false;
+
+ //Initialize
+ this.init();
+
+};
+
+//Extend EventEmitter
+TwitterProcessor.prototype = Object.create(EventEmitter.prototype);
+
+/*
+ *
+ * Init method
+ *
+ */
+TwitterProcessor.prototype.init = function() {
+
+ var self = this;
+
+ this.twit = new twitter(this.options.twitter);
+
+ //Create queue
+ this.queue = queueleuleu.createQueue({
+ 'autostart' : true,
+ 'processor' : function(job,done) {
+ self.process.call(self,job,done);
+ }
+ });
+
+ //When a job is processed
+ this.queue.on('job end', function(job) {
+ self.emit('tweet',job.data);
+ });
+
+};
+
+TwitterProcessor.prototype._startStream = function() {
+
+ if(!this._retriesCount) {
+ this._retriesCount = 1;
+ }
+
+ var self = this;
+
+ this.twitter.stream('statuses/filter', this.options.filter, function(stream) {
+
+ if(self.options.debug) {
+ console.log('Twitter stream started');
+ }
+
+ self.currentStream = stream;
+
+ //On new tweet
+ stream.on('data', function (tweet) {
+
+ //Filter tweet text to remove symbols
+ tweet.textFiltered = utils.filterTweetText(tweet.text);
+
+ //Create the job
+ self.jobs.add(tweet);
+
+ });
+
+ //On stream error
+ stream.on('error', function (response) {
+ if(self.options.debug) {
+ console.log('Twitter error',arguments);
+ }
+ try {
+ self._retriesCount = self._retriesCount * 2;
+ stream.destroy();
+ } catch(e){}
+ });
+
+ //On stream end
+ stream.on('end', function (response) {
+ try {
+ self._retriesCount = 1;
+ stream.destroy();
+ } catch(e){}
+ });
+
+ //On stream destroy, reconnect
+ stream.on('destroy', function (response) {
+
+ //Reconnect if the destroy was not intented
+ if(self.isStarted) {
+ if(self.options.debug) {
+ console.log('Twitter stream reconnect in '+self._retriesCount+' second(s)');
+ }
+ setTimeout(function() {
+ self._startStream();
+ },self._retriesCount * 1000);
+ }
+ });
+
+ });
+
+};
+
+//Start
+TwitterProcessor.prototype.start = function() {
+
+ if(this.options.debug) {
+ console.log('TwitterProcessor start');
+ }
+
+ this.isStarted = true;
+
+ this._startStream();
+
+};
+
+//Stop the streamer
+TwitterProcessor.prototype.stop = function() {
+
+ if(this.options.debug) {
+ console.log('TwitterProcessor stop');
+ }
+
+ this.isStarted = false;
+
+ if(this.currentStream) {
+ this.currentStream.end();
+ }
+
+};
+
+
+//Process tweet
+TwitterProcessor.prototype.process = function(job,done) {
+
+ if(!this.processor) done();
+
+ if(this.processor) {
+ this.processor(job,function() {
+ job.save(done);
+ });
+ }
+
+ return;
+
+};
+
+
+module.exports = exports = TwitterProcessor;
View
15 package.json
@@ -1,24 +1,23 @@
{
- "name": "twitter-processing",
- "description" : "Twitter streaming processing",
- "keywords" : ["twitter", "streaming", "nwtitter"],
+ "name": "twitter-processor",
+ "description" : "Twitter streaming processor",
+ "keywords" : ["twitter", "streaming", "ntwitter", "text processor", "processor"],
"author": "Folklore <info@atelierfolklore.ca>",
"maintainers": [
"David Mongeau-Petitpas <dmp@atelierfolklore.ca>"
],
- "version": "0.0.5",
+ "version": "0.0.7",
"dependencies": {
- "kue" : "0.5.x",
"ntwitter" : "0.5.x",
- "redis" : "0.8.x"
+ "queueleuleu" : "0.x"
},
"main": "index.js",
"repository": {
"type": "git",
- "url": "git://github.com/Folkloreatelier/node-twitter-processing.git"
+ "url": "git://github.com/Folkloreatelier/node-twitter-processor.git"
},
"license": {
"type": "MIT",
- "url": "http://github.com/Folkloreatelier/node-twitter-processing/raw/master/LICENSE"
+ "url": "http://github.com/Folkloreatelier/node-twitter-processor/raw/master/LICENSE"
}
}

0 comments on commit b879b56

Please sign in to comment.
Something went wrong with that request. Please try again.