Skip to content

Commit

Permalink
More experimental support for the streaming API. Now it really works …
Browse files Browse the repository at this point in the history
…after enabling in the options page.
  • Loading branch information
cezarsa committed Mar 20, 2011
1 parent c1b9b79 commit e6e5516
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 114 deletions.
9 changes: 5 additions & 4 deletions background.html
Expand Up @@ -13,6 +13,8 @@

<script type="text/javascript" src="lib/options_backend.js"></script>

<script type="text/javascript" src="lib/stream_listener.js"></script>

<script type="text/javascript" src="lib/timeline_factory.js"></script>
<script type="text/javascript" src="lib/timeline_template.js"></script>

Expand Down Expand Up @@ -83,10 +85,7 @@

TimelineTemplate.initTemplates(this);
TimelineTemplate.eachTimelineTemplate(function(template) {
var createdTimelines = template.createTimelines();
for(var i in createdTimelines) {
_this.timelines[createdTimelines[i].timelineId] = createdTimelines[i];
}
_this.createTimelineTemplate(template, true);
});

this.orderedEachTimeline(function(timeline) {
Expand Down Expand Up @@ -119,6 +118,7 @@
timeline.init();
}, true);
_this.retrieveFollowingUsers();
StreamListener.start(this);
},
function(remainingHits, nextHitsReset, hourlyLimit) {
_this.onHitsUpdated.call(_this, remainingHits, nextHitsReset, hourlyLimit);
Expand Down Expand Up @@ -770,6 +770,7 @@
timeline.killTimeline();
delete timeline;
}, true);
StreamListener.disconnect();
},

signout: function() {
Expand Down
185 changes: 185 additions & 0 deletions lib/stream_listener.js
@@ -0,0 +1,185 @@
var StreamListener = (function() {

var props = {
streamReconnectCount: 0,
streamReconnectBaseTime: 20000,
streamReconnectWaitTime: this.streamReconnectBaseTime,
streamMaxStaleTime: 90000,
streamMaxReconnectWait: 240000
};

props.subscribers = [];
props.twitterLib = null;

var curry = function(func) {
var slice = Array.prototype.slice, args = slice.call(arguments, 1);
return function() {
func.apply(this, args.concat(slice.call(arguments)));
};
};

var publish = function(data) {
if(props.twitterLib) {
if(data.hasOwnProperty('text')) {
props.twitterLib.normalizeTweets(data);
} else if(data.hasOwnProperty('direct_message')) {
props.twitterLib.normalizeTweets(data.direct_message);
}
}
console.log(data);
for(var i = 0, len = props.subscribers.length; i < len; ++i) {
var sub = props.subscribers[i];
sub.callback.call(sub.context, data);
}
};

var publishDisconnect = curry(publish, {
event: 'disconnected'
});

var xhr, onProgress;

var stopStream = function() {
if(!xhr) {
return;
}
xhr.removeEventListener("progress", onProgress, false);
xhr.abort();
};

var connectStream = function() {
if(!OptionsBackend.get('use_streaming_api')) {
return;
}
var MAX_BUFFER = 1024 * 500;

var url = OptionsBackend.get('user_stream_url');
var params = {
delimited: 'length'
};

props.streamReconnectCount += 1;
console.log('connecting to stream ', props.streamReconnectCount);

xhr = new XMLHttpRequest();
xhr.open('GET', url + '?' + $.param(params), true);
xhr.setRequestHeader('X-User-Agent', 'Chromed Bird ' + JSON.parse(Persistence.version().val()).join('.'));
if(props.twitterLib) {
props.twitterLib.signOauth(xhr, url, params, 'GET');
}

var lastLoaded = 0, lastChunkLen, lastProgressTime = new Date().getTime();
onProgress = function(e) {
console.log('loaded', e.loaded, 'time', new Date().getTime() - lastProgressTime);
lastProgressTime = new Date().getTime();

var totalLen = e.loaded;
if(totalLen > MAX_BUFFER) {
console.log('stream disconnected, buffer too large');
stopStream();
}
var data = xhr.responseText;

while(lastLoaded < totalLen) {
if(!lastChunkLen) {
lastChunkLen = '';
var curChar = data.charAt(lastLoaded);
while(curChar != '\n' || lastChunkLen.length === 0) {
if(curChar.match(/\d/)) {
lastChunkLen += curChar;
}
lastLoaded += 1;
if(lastLoaded >= totalLen) {
return;
}
curChar = data.charAt(lastLoaded);
}
lastLoaded += 1;
lastChunkLen = parseInt(lastChunkLen, 10);
}
if(lastLoaded + lastChunkLen > totalLen) {
// Let's just wait for the rest of our data
return;
}
var jsonChunk = data.substring(lastLoaded, lastLoaded + lastChunkLen);
var parsedChunk = JSON.parse(jsonChunk);
publish(parsedChunk);
lastLoaded += lastChunkLen;
lastChunkLen = null;
}
};

xhr.addEventListener("progress", onProgress, false);
var intervalHandle;
xhr.onreadystatechange = function() {
if(xhr.readyState == 2 && xhr.status == 200) {
console.log('stream connected ok');

var checkStaleConnection = function() {
var time = new Date().getTime();
if(time - lastProgressTime > props.streamMaxStaleTime) {
console.log('stream stale connection');
stopStream();
}
};
intervalHandle = setInterval(checkStaleConnection, props.streamMaxStaleTime / 2);

props.streamReconnectWaitTime = props.streamReconnectBaseTime;
} else if(xhr.readyState == 4) {
console.log('stream disconnected', xhr.status, xhr.statusText);
publishDisconnect();
if(intervalHandle) {
clearInterval(intervalHandle);
intervalHandle = null;
}
if(props.twitterLib) {
setTimeout(function() {
connectStream();
}, props.streamReconnectWaitTime);
if(props.streamReconnectWaitTime < props.streamMaxReconnectWait) {
props.streamReconnectWaitTime *= 2;
}
}
}
};

xhr.send();
};

return {
events: {
DISCONNECTED: 'disconnected'
},

start: function(twitterLib) {
props.twitterLib = twitterLib;
connectStream();
},

disconnect: function() {
props.twitterLib = null;
props.subscribers = [];
stopStream();
},

unsubscribe: function(context) {
console.log('unsubscribe: ' + context.template.id);
var newSubscribers = [];
for(var i = 0, len = props.subscribers.length; i < len; ++i) {
var sub = props.subscribers[i];
if(sub.context != context) {
newSubscribers.push(sub);
}
}
props.subscribers = newSubscribers;
},

subscribe: function(callbackOrOptions, context) {
var options = (typeof callbackOrOptions === 'function') ? {
callback: callbackOrOptions,
context: context
} : callbackOrOptions;
props.subscribers.push(options);
}
};
}).call({});
1 change: 1 addition & 0 deletions lib/timeline_template.js
Expand Up @@ -96,6 +96,7 @@ function TimelineTemplate(timelineTemplateId, tweetManager) {
break;
default:
// bug
throw 'unrecognized timeline template id';
break;
}
};
Expand Down
70 changes: 68 additions & 2 deletions lib/timelines/timeline.js
Expand Up @@ -19,6 +19,8 @@ function TweetsTimeline(timelineId, manager, template) {
this.unifiedRunning = false;
this.currentRequestId = 0;
this.canceledRequests = {};

StreamListener.subscribe(this.onStreamData, this);
}

TweetsTimeline.prototype = {
Expand All @@ -40,6 +42,7 @@ TweetsTimeline.prototype = {
},

killTimeline: function() {
StreamListener.unsubscribe(this);
this.timelineStopped = true;
this._stopTimer();
},
Expand Down Expand Up @@ -188,8 +191,64 @@ TweetsTimeline.prototype = {
this._stopTimer();
},

onStreamData: function(data) {
if(data.event && data.event == StreamListener.events.DISCONNECTED) {
this.shouldListenStream = false;
if(this.listeningStream) {
this.listeningStream = false;
this._fetchNewTweets();
}
}
if(this.shouldListenStream) {
if(!this.listeningStream) {
return;
}
this._handleStreamData(data);
} else {
this.shouldListenStream = this._shouldListenStream();
console.log(this.template.id + ' - shouldListen: ' + this.shouldListenStream);
}
},

/* Private Methods */

_shouldListenStream: function() {
return this.template.id == TimelineTemplate.HOME ||
this.template.id == TimelineTemplate.MENTIONS ||
this.template.id == TimelineTemplate.SENT_DMS ||
this.template.id == TimelineTemplate.RECEIVED_DMS;
},

_handleStreamData: function(data) {
var tweets;
if(data.text) {
var mentionStr = '@' + this.manager.twitterBackend.username();
if(data.text.match(mentionStr)) {
if(this.template.id == TimelineTemplate.MENTIONS) {
tweets = [data];
}
} else if(this.template.id == TimelineTemplate.HOME) {
tweets = [data];
}
} else if(data.direct_message) {
if(data.direct_message.user.screen_name == this.manager.twitterBackend.username()) {
if(this.template.id == TimelineTemplate.SENT_DMS) {
tweets = [data.direct_message];
}
} else {
if(this.template.id == TimelineTemplate.RECEIVED_DMS) {
tweets = [data.direct_message];
}
}
}

if(tweets) {
console.log('notifying tweets: ' + this.template.id);
this._syncNewTweets(tweets, {});
this.manager.notifyNewTweets();
}
},

_setError: function(status) {
this.currentError = status;
},
Expand Down Expand Up @@ -228,9 +287,16 @@ TweetsTimeline.prototype = {
if(tweets.length > 0) {
this.manager.notifyNewTweets();
}
if(context.onFinish)
if(context.onFinish) {
context.onFinish(tweets.length);
this.timerId = setTimeout(function() { _this._fetchNewTweets(); }, this.template.refreshInterval);
}
if(this.shouldListenStream) {
console.log(this.template.id + ' listening true');
this.timerId = null;
this.listeningStream = true;
} else {
this.timerId = setTimeout(function() { _this._fetchNewTweets(); }, this.template.refreshInterval);
}
},

_doBackendRequest: function(path, callback, context, params) {
Expand Down

0 comments on commit e6e5516

Please sign in to comment.