Skip to content
Browse files

Updated to work with new websockets and node API implementations

  • Loading branch information...
1 parent a475f00 commit b92be75a637b5e685fc4f6e0c8fe7051bf87b141 @andregoncalves committed
Showing with 128 additions and 37 deletions.
  1. +2 −2 README.textile
  2. +7 −4 server.js
  3. +119 −31 vendor/ws.js
View
4 README.textile
@@ -1,4 +1,4 @@
-h1. Twitter Node.js WebSocket Example
+h1. Twitter Node.js WebSockets Example
Quick experiment of direct streaming from twitter to the browser, with no polling, using Node.js.
(Follow up to Ruben Fonseca "version":http://github.com/rubenfonseca/twitter-amqp-websocket-example with Ruby and AMQP)
@@ -8,7 +8,7 @@ More info "here":http://bit.ly/71uZNv.
h2. Requirements:
* Node.js.
-* HTML5 WebSocket capable browser.
+* HTML5 WebSockets capable browser.
h2. How To:
View
11 server.js
@@ -13,10 +13,11 @@ if (!USERNAME || !PASSWORD)
return sys.puts("Usage: node server.js <twitter_username> <twitter_password> <keyword>");
// Authentication Headers for Twitter
-var headers = [];
var auth = base64.encode(USERNAME + ':' + PASSWORD);
-headers['Authorization'] = "Basic " + auth;
-headers['Host'] = "stream.twitter.com";
+var headers = {
+ 'Authorization' : "Basic " + auth,
+ 'Host' : "stream.twitter.com"
+};
var clients = [];
@@ -25,9 +26,11 @@ var twitter = http.createClient(80, "stream.twitter.com");
var request = twitter.request("GET", "/1/statuses/filter.json?track=" + KEYWORD, headers);
request.addListener('response', function (response) {
- response.setBodyEncoding("utf8");
+ response.setEncoding("utf8");
+
response.addListener("data", function (chunk) {
// Send response to all connected clients
+
clients.each(function(c) {
c.write(chunk);
});
View
150 vendor/ws.js
@@ -1,6 +1,12 @@
// Github: http://github.com/ncr/node.ws.js
// Compatible with node v0.1.91
// Author: Jacek Becela
+// Contributors:
+// Michael Stillwell http://github.com/ithinkihaveacat
+// Nick Chapman http://github.com/nchapman
+// Dmitriy Shalashov http://github.com/skaurus
+// Johan Dahlberg
+// Andreas Kompanez
// License: MIT
// Based on: http://github.com/Guille/node.websocket.js
@@ -12,16 +18,26 @@ function nano(template, data) {
});
}
-var sys = require("sys"),
- net = require("net"),
- headerExpressions = [
- /^GET (\/[^\s]*) HTTP\/1\.1$/,
- /^Upgrade: WebSocket$/,
- /^Connection: Upgrade$/,
- /^Host: (.+)$/,
- /^Origin: (.+)$/
- ],
- handshakeTemplate = [
+function pack(num) {
+ var result = '';
+ result += String.fromCharCode(num >> 24 & 0xFF);
+ result += String.fromCharCode(num >> 16 & 0xFF);
+ result += String.fromCharCode(num >> 8 & 0xFF);
+ result += String.fromCharCode(num & 0xFF);
+ return result;
+}
+
+var sys = require("sys"),
+ net = require("net"),
+ crypto = require("crypto"),
+ requiredHeaders = {
+ 'get': /^GET (\/[^\s]*)/,
+ 'upgrade': /^WebSocket$/,
+ 'connection': /^Upgrade$/,
+ 'host': /^(.+)$/,
+ 'origin': /^(.+)$/,
+ },
+ handshakeTemplate75 = [
'HTTP/1.1 101 Web Socket Protocol Handshake',
'Upgrade: WebSocket',
'Connection: Upgrade',
@@ -30,13 +46,26 @@ var sys = require("sys"),
'',
''
].join("\r\n"),
- policy_file = '<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>';
+ handshakeTemplate76 = [
+ 'HTTP/1.1 101 WebSocket Protocol Handshake', // note a diff here
+ 'Upgrade: WebSocket',
+ 'Connection: Upgrade',
+ 'Sec-WebSocket-Origin: {origin}',
+ 'Sec-WebSocket-Location: ws://{host}{resource}',
+ '',
+ '{data}'
+ ].join("\r\n"),
+ flashPolicy = '<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>';
+
+
+exports.createServer = function (websocketListener, options) {
+ if (!options) options = {};
+ if (!options.flashPolicy) options.flashPolicy = flashPolicy;
-exports.createServer = function (websocketListener) {
return net.createServer(function (socket) {
socket.setTimeout(0);
socket.setNoDelay(true);
- socket.setEncoding("utf8");
+ socket.setKeepAlive(true, 0);
var emitter = new process.EventEmitter(),
handshaked = false,
@@ -62,43 +91,95 @@ exports.createServer = function (websocketListener) {
}
function handshake(data) {
- var headers = data.split("\r\n");
+ var _headers = data.split("\r\n");
- if(/<policy-file-request.*>/.exec(headers[0])) {
- socket.write(policy_file);
+ if ( /<policy-file-request.*>/.exec(_headers[0]) ) {
+ socket.write( options.flashPolicy );
socket.end();
return;
}
- var matches = [], match;
- for (var i = 0, l = headerExpressions.length; i < l; i++) {
- match = headerExpressions[i].exec(headers[i]);
+ // go to more convenient hash form
+ var headers = {}, upgradeHead, len = _headers.length;
+ if ( _headers[0].match(/^GET /) ) {
+ headers["get"] = _headers[0];
+ } else {
+ socket.end();
+ return;
+ }
+ if ( _headers[ _headers.length - 1 ] ) {
+ upgradeHead = _headers[ _headers.length - 1 ];
+ len--;
+ }
+ while (--len) { // _headers[0] will be skipped
+ var header = _headers[len];
+ if (!header) continue;
- if (match) {
- if(match.length > 1) {
- matches.push(match[1]);
- }
+ var split = header.split(": ", 2); // second parameter actually seems to not work in node
+ headers[ split[0].toLowerCase() ] = split[1];
+ }
+
+ // check if we have all needed headers and fetch data from them
+ var data = {}, match;
+ for (var header in requiredHeaders) {
+ // regexp actual header value
+ if ( match = requiredHeaders[ header ].exec( headers[header] ) ) {
+ data[header] = match;
} else {
socket.end();
return;
}
}
- socket.write(nano(handshakeTemplate, {
- resource: matches[0],
- host: matches[1],
- origin: matches[2],
- }));
+ // draft auto-sensing
+ if ( headers["sec-websocket-key1"] && headers["sec-websocket-key2"] && upgradeHead ) { // 76
+ var strkey1 = headers["sec-websocket-key1"]
+ , strkey2 = headers["sec-websocket-key2"]
+
+ , numkey1 = parseInt(strkey1.replace(/[^\d]/g, ""), 10)
+ , numkey2 = parseInt(strkey2.replace(/[^\d]/g, ""), 10)
+
+ , spaces1 = strkey1.replace(/[^\ ]/g, "").length
+ , spaces2 = strkey2.replace(/[^\ ]/g, "").length;
+
+ if (spaces1 == 0 || spaces2 == 0 || numkey1 % spaces1 != 0 || numkey2 % spaces2 != 0) {
+ socket.end();
+ return;
+ }
+
+ var hash = crypto.createHash("md5")
+ , key1 = pack(parseInt(numkey1/spaces1))
+ , key2 = pack(parseInt(numkey2/spaces2));
+
+ hash.update(key1);
+ hash.update(key2);
+ hash.update(upgradeHead);
+
+ socket.write(nano(handshakeTemplate76, {
+ resource: data.get[1],
+ host: data.host[1],
+ origin: data.origin[1],
+ data: hash.digest("binary"),
+ }), "binary");
+
+ } else { // 75
+ socket.write(nano(handshakeTemplate75, {
+ resource: data.get[1],
+ host: data.host[1],
+ origin: data.origin[1],
+ }));
+
+ }
handshaked = true;
- emitter.emit("connect", matches[0]);
+ emitter.emit("connect", data.get[1]);
}
socket.addListener("data", function (data) {
if(handshaked) {
- handle(data);
+ handle(data.toString("utf8"));
} else {
- handshake(data);
+ handshake(data.toString("binary")); // because of draft76 handshakes
}
}).addListener("end", function () {
socket.end();
@@ -106,6 +187,12 @@ exports.createServer = function (websocketListener) {
if (handshaked) { // don't emit close from policy-requests
emitter.emit("close");
}
+ }).addListener("error", function (exception) {
+ if (emitter.listeners("error").length > 0) {
+ emitter.emit("error", exception);
+ } else {
+ throw exception;
+ }
});
emitter.remoteAddress = socket.remoteAddress;
@@ -129,3 +216,4 @@ exports.createServer = function (websocketListener) {
websocketListener(emitter); // emits: "connect", "data", "close", provides: write(data), end()
});
}
+

0 comments on commit b92be75

Please sign in to comment.
Something went wrong with that request. Please try again.