Permalink
Browse files

First commit

  • Loading branch information...
dmongeau committed Dec 3, 2012
0 parents commit 0450f108a00d01db254817167fdf438278428d28
Showing with 387 additions and 0 deletions.
  1. +1 −0 .gitattributes
  2. +1 −0 .gitignore
  3. +2 −0 .npmignore
  4. +4 −0 README.md
  5. +272 −0 index.js
  6. +46 −0 lib/processors/http.js
  7. +44 −0 lib/utils.js
  8. +17 −0 package.json
@@ -0,0 +1 @@
+* text=auto
@@ -0,0 +1 @@
+node_modules/
@@ -0,0 +1,2 @@
+intermediate/
+publish/
@@ -0,0 +1,4 @@
+Twitter-tts
+===========
+
+Convert a live twitter stream into sound with Node.js and text-to-speech
272 index.js
@@ -0,0 +1,272 @@
+
+var kue = require('kue');
+var twitter = require('ntwitter');
+var EventEmitter = require('events').EventEmitter;
+
+var utils = require('./lib/utils');
+
+/*
+ *
+ * TwitterProcessing
+ *
+ */
+var TwitterProcessing = function(options) {
+
+ //Call eventemitter constructor
+ EventEmitter.call(this);
+
+ //Options
+ this.options = utils.merge({
+ 'debug' : false,
+ 'processor' : null,
+ 'redis' : {
+ 'host' : '127.0.0.1',
+ 'port' : 6379
+ },
+ 'twitter' : {
+ 'consumer_key' : 'Twitter',
+ 'consumer_secret' : 'API',
+ 'access_token_key' : 'keys',
+ 'access_token_secret' : 'go here'
+ },
+ 'filter' : {
+ 'track' : ['twitter']
+ },
+ 'pauseTimeout' : 5000,
+ 'queueName' : 'twitter_processing'
+ },options);
+
+ //Server
+ this.twitter = this.createTwitter();
+
+ //Twitter Stream
+ this.currentStream = null;
+
+ //Queue
+ //this.redis = this.createRedis();
+ this.jobs = this.createQueue();
+ this.jobsCount = 0;
+
+ //Processor
+ this.processor = this.options.processor || null;
+
+ //State
+ this.isProcessing = false;
+ this.isStarted = false;
+
+ //Initialize
+ this.init();
+
+};
+
+//Extend EventEmitter
+TwitterProcessing.prototype = Object.create(EventEmitter.prototype);
+
+/*
+ *
+ * Create methods
+ *
+ */
+//Create queue
+TwitterProcessing.prototype.createQueue = function() {
+ var self = this;
+ kue.redis.createClient = function() {
+ var redis = require('redis');
+ var client = redis.createClient(self.options.redis.port, self.options.redis.host);
+ if(self.options.redis.password) {
+ client.auth(self.options.redis.password);
+ }
+ client.flushdb(function() {
+
+ });
+ return client;
+ };
+ var jobs = kue.createQueue();
+ return jobs;
+}
+
+//Create Redis
+TwitterProcessing.prototype.createRedis = function() {
+ var redis = require('redis');
+ var client = redis.createClient(this.options.redis.port, this.options.redis.host);
+ if(this.options.redis.password) {
+ client.auth(this.options.redis.password);
+ }
+ return client;
+}
+
+//Create Twitter
+TwitterProcessing.prototype.createTwitter = function() {
+ var twit = new twitter(this.options.twitter);
+ return twit;
+};
+
+/*
+ *
+ * Init methods
+ *
+ */
+TwitterProcessing.prototype.init = function() {
+
+ var self = this;
+
+ //Init kue
+ this.jobs.on('job complete', function(id){
+ if(self.options.debug) {
+ console.log('Kue: Job completed', id);
+ }
+ self.jobsCount--;
+ kue.Job.get(id, function(err, job){
+ if (err || !job) return;
+
+ self.emit('tweet',job.data);
+
+ job.remove(function(err){
+ if(self.options.debug) {
+ console.log('Kue: Removing job', job.data.text);
+ }
+ if(self.jobsCount == 0) {
+ self.isProcessing = false;
+ if(self.options.debug) {
+ console.log('Kue: Queue completed');
+ }
+ }
+ if (err) throw err;
+ });
+ });
+ });
+
+};
+
+TwitterProcessing.prototype._startStream = function() {
+
+ if(!this._retriesCount) {
+ this._retriesCount = 1;
+ }
+
+ var self = this;
+
+ this.twitter.stream('statuses/filter', this.options.filter, function(stream) {
+
+ if(self.options.debug) {
+ console.log('Twitter stream started');
+ }
+
+ self.currentStream = stream;
+
+ //On new tweet
+ stream.on('data', function (tweet) {
+
+ //Filter tweet text to remove symbols
+ tweet.textFiltered = utils.filterTweetText(tweet.text);
+
+ //console.log('Tweet: '+tweet.textFiltered);
+
+ //Create the job
+ self.jobs.create(self.options.queueName, tweet).save();
+ self.jobsCount++;
+
+ //Start the queue if it is paused
+ if(!self.isProcessing) {
+ self.jobs.process(self.options.queueName,function() {
+ self.process.apply(self,arguments);
+ });
+ self.isProcessing = true;
+ }
+
+ });
+
+ //On stream error
+ stream.on('error', function (response) {
+ if(self.options.debug) {
+ console.log('Twitter error',arguments);
+ }
+ try {
+ self._retriesCount = self._retriesCount * 2;
+ stream.destroy();
+ } catch(e){}
+ });
+
+ //On stream end
+ stream.on('end', function (response) {
+ try {
+ self._retriesCount = 1;
+ stream.destroy();
+ } catch(e){}
+ });
+
+ //On stream destroy, reconnect
+ stream.on('destroy', function (response) {
+
+ //Reconnect if the destroy was not intented
+ if(self.isStarted) {
+ if(self.options.debug) {
+ console.log('Twitter stream reconnect in '+self._retriesCount+' second(s)');
+ }
+ setTimeout(function() {
+ self._startStream();
+ },self._retriesCount * 1000);
+ }
+ });
+
+ });
+
+};
+
+//Start
+TwitterProcessing.prototype.start = function() {
+
+ if(this.options.debug) {
+ console.log('TwitterProcessing start');
+ }
+
+ this.isStarted = true;
+
+ this._startStream();
+
+};
+
+//Stop the streamer
+TwitterProcessing.prototype.stop = function() {
+
+ if(this.options.debug) {
+ console.log('TwitterProcessing stop');
+ }
+
+ this.isStarted = false;
+
+ if(this.currentStream) {
+ this.currentStream.end();
+ }
+
+};
+
+
+//Process tweet
+TwitterProcessing.prototype.process = function(job,done) {
+
+ if(!this.processor) done();
+
+ this.processor.process(job,function() {
+ job.save(done);
+ });
+
+ return;
+
+};
+
+
+/*
+ *
+ * Factory
+ *
+ */
+function factory(options) {
+
+ var twittertts = new TwitterProcessing(options);
+
+ return twittertts;
+
+}
+
+module.exports = exports = factory;
@@ -0,0 +1,46 @@
+var http = require('http');
+var fs = require('fs');
+var path = require('path');
+var utils = require('../utils');
+
+var HTTPProcessor = function(options) {
+
+ this.options = utils.merge({
+ 'host' : {
+ 'host': 'http://localhost',
+ 'port': 80,
+ 'path': '/?text=?'
+ }
+ },options);
+
+};
+
+HTTPProcessor.prototype.process = function(job,done) {
+
+ var now = new Date();
+
+ var publicFolder = '/audio/'+(now.getMonth()+1)+'-'+now.getDay()+'/'+now.getHours()+'-'+now.getMinutes();
+ var publicPath = publicFolder+'/'+id+'.mp3';
+ var folder = __dirname+'/../web'+publicFolder
+ var filename = __dirname+'/../web'+publicPath;
+
+ if (!path.existsSync(folder+'/')) {
+ fs.mkdirSync(folder+'/',0775,true);
+ }
+
+ http.get(this.options.host, function(response) {
+
+ var file = fs.createWriteStream(filename);
+ response.on('data', function(chunk){
+ file.write(chunk);
+ });
+ response.on('end', function(){
+ file.end();
+ done();
+ });
+
+ }).on('error', function(e) {
+ done(e.message);
+ });
+
+};
@@ -0,0 +1,44 @@
+
+
+/**
+ *
+ * Filter text of tweet to remove symbols
+ *
+ */
+exports.filterTweetText = function(text) {
+ text = text.replace(/RT ?\@[a-zA-Z0-9\_\-]+ ?\:?/gi,'');
+ text = text.replace(/(https?:\/\/([-\w\.]+)+(:\d+)?(\/([\w\/_\.]*(\?\S+)?)?)?)/gi,'');
+ text = text.replace(/ \#[^\ ]+$/gi,'');
+ text = text.replace(/ \#[^\ ]+$/gi,'');
+ text = text.replace(/ \#[^\ ]+$/gi,'');
+ text = text.replace('@','');
+ text = text.replace('#','');
+ text = text.replace('\n','');
+ text = text.replace('"',' ');
+ text = text.replace(' ',' ');
+ text = text.replace(' ',' ');
+ return text;
+}
+
+/**
+ * Merge object b with object a.
+ *
+ * var a = { foo: 'bar' }
+ * , b = { bar: 'baz' };
+ *
+ * utils.merge(a, b);
+ * // => { foo: 'bar', bar: 'baz' }
+ *
+ * @param {Object} a
+ * @param {Object} b
+ * @return {Object}
+ * @api public
+ */
+exports.merge = function(a, b){
+ if (a && b) {
+ for (var key in b) {
+ a[key] = b[key];
+ }
+ }
+ return a;
+};
Oops, something went wrong.

0 comments on commit 0450f10

Please sign in to comment.