Browse files

Initial commit

  • Loading branch information...
0 parents commit 35f12d29ccdb820600a16e6359c608e3fe1175d2 @jfd jfd committed Dec 1, 2011
218 README
@@ -0,0 +1,218 @@
+Hydna for nodejs
+================
+
+The node.js hydna library is a straight-forward implementation of the wink binary protocol. The module has been closely modeled to the native node.js modules and the usage pattern should come naturally to developers using other node.js modules. The module does not have any external dependencies.
+
+Creating a connection:
+
+ var hydna = require("hydna");
+ var channel = hydna.createChannel("localhost", "rw");
+
+ channel.on("connect", function() {
+ // read/write connection is ready to use
+ });
+
+ channel.on("error", function() {
+ // an error occured when connecting
+ });
+
+A read/write channel is opened and event-listeners for connect and error are attached to the channel.
+
+Sending Data:
+
+ var hydna = require("hydna");
+ var channel = hydna.createChannel("localhost", "w");
+
+ channel.on("connect", function() {
+ var message = "Hello World!";
+ channel.write(message, "utf8");
+ });
+
+Opens up a channel for writing and, when the connection has been established and the connect event has been emitted, writes a message.
+
+Receiving data:
+
+ var hydna = require("hydna");
+ var channel = hydna.createChannel("localhost", "r");
+
+ channel.setEncoding("utf8");
+
+ channel.on("data", function(data) {
+ console.log(data);
+ });
+
+Opens up a channel for reading and, when the data arrives and the data event is emitted, writes the received data to the console.
+
+
+## Installation
+
+The easiest way to install this library is via npm:
+
+ $ npm install hydna
+
+
+## Test
+
+The test suite requires a running server instance with the behaviors found `test/behaviors` loaded.
+
+The default server address is `localhost:7010`. This can be overridden via the environment variable `TEST_ADDRESS`.
+
+To run the test suite:
+
+ $ TEST_ADDRESS=<server_address_and_port> npm test
+
+
+## API Specification
+
+The node.js hydna client library consists of a single module — hydna — that
+contains everything that is necessary to communicate over hydna.
+
+### hydna.PAYLOAD_MAX_SIZE
+
+Returns the maximum payload size.
+
+### hydna.followRedirects=true
+
+Indicates if HTTP-redirects should be followed or not.
+
+
+### hydna.origin=<hostname>
+
+Sets the origin identity that should be sent to the server on handshake.
+
+
+### hydna.agent=node-winsock-client/<version>
+
+Sets the agent identity that should be sent to the server on handshake.
+
+
+### hydna.createChannel(url, mode, [callback])
+
+Opens a Channel to the specified ´'url'´.
+
+This function is asynchronous. When the `'connect'` event is emitted once connected. If there is a problem connecting, the `'connect'` event will not be emitted, the 'error' event will be emitted with the exception.
+
+Available modes:
+* read (r) - Open channel in read mode
+* write (w) - Open channel in write mode
+* readwrite (rw) - Open channel in read-write mode.
+* +emit(e) - Open channel with emit-signal support (e.g. "rw+emit").
+
+This example opens a Channel and writes data too it. The same data
+is received :
+
+ var hydna = require("hydna");
+ var channel = hydna.createChannel("localhost", "w");
+ channel.write("Hello World!");
+
+
+### hydna.Channel
+
+This object is an abstraction of of a TCP or UNIX socket. hydna.Channel instance implement a duplex stream interface. They can be created by the user and used as a client (with connect()) or they can be created by Node and passed to the user through the 'connection' event of a server.
+
+hydna.Channel instances are EventEmitters with the following events:
+
+Event: `'connect'`
+`function (message) { }`
+
+Emitted when a channel connection successfully is established. See connect(). The `'message'` argument may or may not contain an initial message from server.
+
+Event: `'data'`
+`function (data) { }`
+
+Emitted when data is received. The argument data will be a Buffer or String. Encoding of data is set by channel.setEncoding().
+
+Event: `'drain'`
+`function () { }`
+
+Emitted when the write buffer becomes empty. Can be used to throttle uploads.
+
+Event: `'error'`
+`function (exception) { }`
+
+Emitted when an error occurs. The `'close'` event will be called directly following this event.
+
+Event: `'close'`
+`function (had_error) { }`
+
+Emitted once the channel is fully closed. The argument had_error is a boolean which says if the channel was closed due to an error.
+
+Event: `'signal'`
+`function (data) { }`
+
+Emitted when remote server send's a signal.
+
+#### Channel.readable
+
+Returns `true` if channel is readable, else false.
+
+#### Channel.writable
+
+Returns `true` if channel is writable, else false.
+
+#### Channel.emitable
+
+Returns `true` if channel is emitable, else false.
+
+
+#### Channel.readyState
+
+Either `'closed'`, `'closing'`, `'open'`, `'opening'`,
+`'read'`, `'write'`, `'readwrite'` and/or `'+emit'`.
+
+
+#### Channel.id
+
+Returns channel `id` as a string. Property is `null` if not connected.
+
+
+#### Channel.setEncoding(encoding=null)
+
+Sets the encoding (either `'ascii'`, `'utf8'`, `'base64'`, `'json'`)
+
+
+#### Channel.connect(url, mode='readwrite')
+
+See `hydna.createChannel` for more info.
+
+
+#### Channel.write(data, encoding='ascii', priority=1)
+
+Sends data on the channel. The second parameter specifies the encoding in the case of a string--it defaults to ASCII because encoding to UTF8 is rather slow.
+
+Returns ´true´ if the entire data was flushed successfully to the underlying connection. Returns `false` if all or part of the data was queued in user memory. ´'drain'´ will be emitted when the buffer is again free.
+
+
+#### Channel.dispatch(message)
+
+Dispatch a signal on the channel.
+
+Returns ´true´ if the signal was flushed successfully to the underlying connection. Returns `false` if the all or part of the signal was queued in user memory. ´'drain'´ will be emitted when the buffer is again free.
+
+Example:
+
+ var hydna = require("hydna");
+ var channel = hydna.createChannel("localhost", "read+emit");
+ channel.on("signal", function(message) {
+ if (message == "pong") {
+ console.log("Recevied pong from server");
+ }
+ });
+ channel.dispatch("ping");
+
+
+
+#### Channel.end([message])
+
+Closes channel for reading, writing and emitting. The optional `message` is sent to the server.
+
+Example:
+
+ var hydna = require("hydna");
+ var channel = hydna.createChannel("localhost", "read");
+ channel.end("good bye!");
+
+
+#### Channel.destroy()
+
+Closes channel for reading, writing and emitting.
1,357 index.js
@@ -0,0 +1,1357 @@
+//
+// Copyright 2011 Hydna AB. All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
+// are met:
+//
+// 1. Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY HYDNA AB ``AS IS'' AND ANY EXPRESS OR IMPLIED
+// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+// EVENT SHALL HYDNA AB OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+// The views and conclusions contained in the software and documentation are
+// those of the authors and should not be interpreted as representing
+// official policies, either expressed or implied, of Hydna AB.
+//
+
+var Buffer = require("buffer").Buffer;
+var inherits = require("util").inherits;
+var Stream = require("stream").Stream;
+
+var VERSION = require("./package.json").version;
+
+var READ = 0x01;
+var WRITE = 0x02;
+var READWRITE = 0x03;
+var EMIT = 0x04;
+
+// Packet related sizes
+var PAYLOAD_MAX_SIZE = 0xFFFFF8;
+
+var ALL_CHANNELS = 0;
+
+var VALID_ENCODINGS_RE = /^(ascii|utf8|base64|json)/i;
+var MODE_RE = /^(r|read){0,1}(w|write){0,1}(?:\+){0,1}(e|emit){0,1}$/i;
+
+
+exports.PAYLOAD_MAX_SIZE = PAYLOAD_MAX_SIZE;
+
+// Follow 302 redirects. Adds a `X-Accept-Redirects: no` to the
+// headers of the handshake request.
+exports.followRedirects = true;
+
+
+// Set the origin in handshakes. Set to `null` to disable
+exports.origin = require("os").hostname();
+
+
+// Set the agent header in handshakes. Set to `null` to disable
+exports.agent = "node-winsock-client/" + VERSION;
+
+
+exports.createChannel = function(url, mode, C) {
+ var chan = new Channel();
+ chan.connect(url, mode);
+ if (typeof C == "function") {
+ chan.once("connect", C);
+ }
+ return chan;
+};
+
+
+function Channel() {
+ this.id = null;
+
+ this._connecting = false;
+ this._opening = false;
+ this._closing = false;
+ this._connection = null;
+ this._request = null;
+ this._mode = null;
+ this._writeQueue = null;
+ this._encoding = null;
+ this._url = null;
+
+ this.readable = false;
+ this.writable = false;
+ this.emitable = false;
+}
+
+exports.Channel = Channel;
+inherits(Channel, Stream);
+
+
+Object.defineProperty(Channel.prototype, 'readyState', {
+ get: function () {
+ var state;
+
+ if (this._connecting) {
+ return "opening";
+ } else if (this._closing) {
+ return "closing";
+ } else if (!this.id) {
+ return 'closed';
+ } else if (this.readable && this.writable) {
+ state = "readwrite";
+ } else if (this.readable && !this.writable){
+ state = "read";
+ } else if (!this.readable && this.writable){
+ state = "write";
+ }
+ if (this.emitable) {
+ state += "+emit";
+ }
+
+ return state;
+ }
+});
+
+
+Object.defineProperty(Channel.prototype, 'url', {
+ get: function () {
+ if (!this.id || !this._connection) {
+ return null;
+ }
+
+ return this._url;
+ }
+});
+
+
+Channel.prototype.connect = function(url, mode) {
+ var parse;
+ var self = this;
+ var messagesize;
+ var request;
+ var uri;
+ var id;
+ var host;
+ var mode;
+ var token;
+
+ if (this._connecting) {
+ throw new Error("Already connecting");
+ }
+
+ if (typeof url !== "string") {
+ throw new Error("bad argument, `url`, expected String");
+ }
+
+ if (/^http:\/\/|^https:\/\//.test(url) == false) {
+ url = "http://" + url;
+ }
+
+ url = require("url").parse(url);
+
+ if (url.protocol !== "https:" && url.protocol !== "http:") {
+ throw new Error("bad protocol, expected `http` or `https`");
+ }
+
+ if (url.pathname && url.pathname.length != 1) {
+ if (url.pathname.substr(0, 2) == "/x") {
+ id = parseInt("0" + url.pathname.substr(1));
+ } else {
+ id = parseInt(url.pathname.substr(1));
+ }
+ if (isNaN(id)) {
+ throw new Error("Invalid channel");
+ }
+ } else {
+ id = 1;
+ }
+
+ if (id > 0xFFFFFFFF) {
+ throw new Error("Invalid channel expected no between x0 and xFFFFFFFF");
+ }
+
+ mode = getBinMode(mode);
+
+ if (typeof mode !== "number") {
+ throw new Error("Invalid mode");
+ }
+
+ if (url.query) {
+ token = new Buffer(decodeURIComponent(uri.query), "utf8");
+ }
+
+ this.id = id;
+ this._mode = mode;
+ this._connecting = true;
+ this._url = url.href;
+
+ this.readable = ((this._mode & READ) == READ);
+ this.writable = ((this._mode & WRITE) == WRITE);
+ this.emitable = ((this._mode & EMIT) == EMIT);
+
+ this._connection = Connection.getConnection(url, false);
+ this._request = this._connection.open(this, id, mode, token);
+};
+
+
+Channel.prototype.setEncoding = function(encoding) {
+ if (encoding && !VALID_ENCODINGS_RE.test(encoding)) {
+ throw new Error("Encoding method not supported");
+ }
+ this._encoding = encoding;
+};
+
+
+Channel.prototype.write = function(data, enc, prio) {
+ var encoding = (typeof enc == "string" && enc);
+ var flag = ((encoding && prio) || prio || 1) - 1;
+ var id = this.id;
+ var frame;
+ var payload;
+
+ if (!this.writable) {
+ throw new Error("Channel is not writable");
+ }
+
+ if (flag < 0 || flag > 3 || isNaN(flag)) {
+ throw new Error("Bad priority, expected Number between 1-4");
+ }
+
+ if (!data) {
+ throw new Error("Expected `data`");
+ }
+
+ if (Buffer.isBuffer(data)) {
+ flag = flag << 1 | 0; // Set datatype to BINARY
+ payload = data;
+ } else {
+ flag = flag << 1 | 1; // Set datatype to UTF8
+ if (encoding && !VALID_ENCODINGS_RE.test(encoding)) {
+ throw new Error("Encoding method is not supported");
+ }
+ if (encoding == "json") {
+ payload = new Buffer(JSON.stringify(data), "utf8");
+ } else {
+ payload = new Buffer(data.toString(), encoding);
+ }
+ }
+
+ if (payload.length > PAYLOAD_MAX_SIZE) {
+ throw new Error("Payload overflow");
+ }
+
+ frame = new DataFrame(this.id, flag, payload);
+
+ try {
+ flushed = this._writeOut(frame);
+ } catch (writeException) {
+ this.destroy(writeException);
+ return false;
+ }
+
+ return flushed;
+};
+
+
+Channel.prototype.dispatch = function(message) {
+ var frame;
+ var payload;
+ var flushed;
+
+ if (!this.emitable) {
+ throw new Error("Channel is not emitable.");
+ }
+
+ if (typeof message !== "undefined" && typeof message !== "string") {
+ throw new Error("Expected 'message' as String");
+ }
+
+ if (message) {
+ payload = new Buffer(message, "utf8");
+
+ if (payload.length > PAYLOAD_MAX_SIZE) {
+ throw new Error("Payload overflow");
+ }
+ }
+
+ frame = new SignalFrame(this.id, SignalFrame.FLAG_EMIT, payload);
+
+ try {
+ flushed = this._writeOut(frame);
+ } catch (writeException) {
+ this.destroy(writeException);
+ return false;
+ }
+
+ return flushed;
+};
+
+
+Channel.prototype.end = function(message) {
+ var payload;
+
+ if (this.destroyed || this._closing) {
+ return;
+ }
+
+ if (typeof message !== "undefined" && typeof message !== "string") {
+ throw new Error("Expected 'message' as String");
+ }
+
+ if (message) {
+ payload = new Buffer(message, "utf8");
+
+ if (payload.length > PAYLOAD_MAX_SIZE) {
+ throw new Error("Payload overflow");
+ }
+ }
+
+ this._endsig = new SignalFrame(this.id, SignalFrame.FLAG_END, payload);
+
+ this.destroy();
+};
+
+
+Channel.prototype.destroy = function(err) {
+ var sig;
+
+ if (this.destroyed || this._closing || !this.id) {
+ return;
+ }
+
+ if (!this._connection) {
+ finalizeDestroyChannel(this);
+ }
+
+ this.readable = false;
+ this.writable = false;
+ this.emitable = false;
+ this._closing = true;
+
+ if (this._request && !this._endsig &&
+ this._request.cancel()) {
+ this._request = null;
+ finalizeDestroyChannel(this, err);
+ return;
+ }
+
+ sig = this._endsig || new SignalFrame(this.id, SignalFrame.FLAG_END);
+
+ if (this._request) {
+ // Do not send ENDSIG if _request is present. We need to wait for
+ // the OPENSIG before we can close it.
+
+ this._endsig = sig;
+ } else {
+ // Channel is open and we can therefor send ENDSIG immideitnly. This
+ // can fail, if TCP connection is dead. If so, we can
+ // destroy channel with good conscience.
+
+ try {
+ this._writeOut(sig);
+ } catch (err) {
+ // ignore
+ }
+ }
+};
+
+
+function finalizeDestroyChannel(chan, err, message) {
+ var id = chan.id;
+ var conn;
+
+ if (chan.destroyed) {
+ return;
+ }
+
+ if ((conn = chan._connection) && chan.id) {
+ if (conn.channels[id] == chan) {
+ delete conn.channels[id];
+ conn.chanRefCount--;
+ if (conn.chanRefCount == 0 &&
+ conn.reqRefCount == 0) {
+ conn.setDisposed(true);
+ }
+ }
+ }
+
+ chan.id = null;
+ chan.readable = false;
+ chan.writable = false;
+ chan.emitable = false;
+ chan.destroyed = true;
+ chan._request = null;
+ chan._writequeue = null;
+ chan._connection = null;
+
+ err && chan.emit("error", err);
+
+ chan.emit("close", !(!err), message);
+};
+
+
+Channel.prototype.ondata = function(data, start, end, flag) {
+ var encoding = this._encoding;
+ var message = data.slice(start, end);
+
+ if (encoding || (flag & 1 == 1)) {
+ if (encoding == "json") {
+ try {
+ message = JSON.parse(message.toString("utf8"));
+ } catch (exception) {
+ this.destroy(exception);
+ return;
+ }
+ } else {
+ try {
+ message = message.toString(encoding);
+ } catch (exception) {
+ this.destroy(exception);
+ return;
+ }
+ }
+ }
+
+ if (this._events && this._events["data"]) {
+ this.emit("data", message, (flag >> 1) + 1);
+ }
+};
+
+
+Channel.prototype.onsignal = function(data, start, end) {
+ var message = null;
+
+ if (end - start) {
+ message = data.toString("utf8", start, end);
+ }
+
+ if (this._events && this._events["signal"]) {
+ this.emit("signal", message);
+ }
+};
+
+
+// Internal write method to write raw packets.
+Channel.prototype._writeOut = function(packet) {
+ var written;
+
+ if (this._writeQueue) {
+ this._writeQueue.push(packet);
+ return false;
+ }
+
+ if (this._connecting) {
+ this._writeQueue = [packet];
+ return false;
+ } else if (this._connection) {
+ return this._connection.write(packet);
+ } else {
+ this.destroy(new Error("Channel is not writable"));
+ return false;
+ }
+};
+
+
+Channel.prototype._open = function(newid, message) {
+ var flushed = false;
+ var queue = this._writeQueue;
+ var id = this.id;
+ var packet;
+
+ this.id = newid;
+ this._connecting = false;
+ this._writeQueue = null;
+ this._request = null;
+
+ this._connection.channels[this.id] = this;
+ this._connection.chanRefCount++;
+
+ if (queue && queue.length) {
+ for (var i = 0, l = queue.length; i < l; i++) {
+ packet = queue[i];
+ packet.id = newid;
+ try {
+ flushed = this._writeOut(packet);
+ } catch(writeException) {
+ this.destroy(writeException);
+ return;
+ }
+ }
+ }
+
+ if (this._closing) {
+ if ((packet = self._endsig)) {
+ self._endsig = null;
+ packet.id = newid;
+ try {
+ this._writeOut(packet);
+ } catch (err) {
+ // Ignore
+ }
+ return;
+ }
+ }
+
+ this.emit("connect", message);
+
+ if (flushed) {
+ this.emit("drain");
+ }
+};
+
+
+
+// Represents a server connection.
+function Connection(id) {
+ this.id = id;
+ this.chanRefCount = 0;
+ this.reqRefCount = 0;
+ this.channels = {};
+ this.requests = {};
+ this.sock = null;
+
+ Connection.all[id] = this;
+}
+
+
+Connection.all = {};
+Connection.disposed = {};
+
+
+Connection.getConnection = function(url) {
+ var id;
+ var connection;
+ var datacache = "";
+ var lastException;
+
+ id = url.protocol + url.host + (url.auth && (":" + url.auth) || "");
+
+ if ((connection = Connection.all[id])) {
+ return connection;
+ }
+
+ if ((connection = Connection.disposed[id])) {
+ connection.setDisposed(false);
+ return connection;
+ }
+
+ // rewrite url if initial token is present.
+ url = require("url").parse([
+ url.protocol,
+ "//",
+ url.hostname,
+ url.port ? ":" + url.port : "",
+ "/",
+ url.auth
+ ].join(""));
+
+ connection = new Connection(id);
+ connection.connect(url);
+
+ return connection;
+}
+
+
+Connection.prototype.connect = function(url) {
+ var self = this;
+
+ if (this.sock) {
+ throw new Error("Socket already connected");
+ }
+
+ process.nextTick(function() {
+ getSock(url, function(err, sock) {
+ var requests = self.requests;
+
+ if (err) {
+ return self.destroy(err);
+ }
+
+ sock.setNoDelay(true);
+ sock.setKeepAlive(true);
+
+ sock.on("drain", function() {
+ var channels = self.channels;
+ var chan;
+
+ for (var id in channels) {
+ chan = channels[id];
+ if (chan._events && chan._events["drain"]) {
+ chan.emit("drain");
+ }
+ }
+ });
+
+ sock.on("error", function(err) {
+ self.sock = null;
+ self.destroy(err);
+ });
+
+ sock.on("close", function(hadError) {
+ if (hadError == false) {
+ self.sock = null;
+ self.destroy(new Error("Connection reseted by server"));
+ }
+ });
+
+ self.sock = sock;
+ parserImplementation(self)
+
+ if (self.reqRefCount == 0) {
+ // All requests was cancelled before we got a
+ // handshake from server. Dispose us.
+ self.setDisposed(true);
+ }
+
+ try {
+ for (var id in requests) {
+ self.write(requests[id]);
+ requests[id].sent = true;
+ }
+ } catch (writeException) {
+ self.destroy(writeException);
+ }
+ });
+ });
+};
+
+
+function getSock(url, C) {
+ var parse = require("url").parse;
+ var STATUS_CODES = require("http").STATUS_CODES;
+ var MAX_REDIRECTS = 5;
+ var redirections = 1;
+
+ function dorequest(url) {
+ var request;
+ var opts;
+ var req;
+ var port;
+ var host;
+ var path;
+
+ if (url.protocol !== "http:" && url.protocol !== "https:") {
+ return C(new Error("Redirect, bad protocol `" + url.protocol + "`"));
+ }
+
+ request = require(url.protocol == "http:" ? "http" : "https").request;
+ host = url.hostname;
+ port = url.port || (url.protocol == "http:" ? 80 : 443);
+ path = url.pathname;
+
+ opts = {
+ port: port,
+ host: host,
+ path: path,
+ headers: {
+ "Connection": "Upgrade",
+ "Upgrade": "winksock/1",
+ }
+ }
+
+ if (!exports.followRedirects) {
+ opts.headers["X-Accept-Redirects"] = "no";
+ }
+
+ if (exports.agent) {
+ opts.headers["User-Agent"] = exports.agent;
+ }
+
+ if (exports.origin) {
+ opts.headers["Origin"] = exports.origin;
+ }
+
+ req = request(opts, function(res) {
+ var msg;
+
+ res.setEncoding("utf8");
+
+ res.on("data", function(chunk) {
+ msg = msg ? msg + chunk : chunk;
+ });
+
+ res.on("end", function() {
+ var code = res.statusCode;
+ var url;
+ var err;
+
+ switch (code) {
+ case 301:
+ case 302:
+ case 307:
+ if (exports.followRedirects) {
+ if (redirections++ == MAX_REDIRECTS) {
+ return C(new Error("Max HTTP redirections reached"));
+ }
+ try {
+ url = parse(res.headers["location"]);
+ } catch (err) {
+ return C(err);
+ }
+ return dorequest(url)
+ } else {
+ err = new Error("Redirected by host, followRedirects=false");
+ return C(err);
+ }
+ break;
+ default:
+ if (msg) {
+ err = new Error(STATUS_CODES[code] + " (" + msg + ")");
+ } else {
+ err = new Error(STATUS_CODES[code]);
+ }
+ break;
+ }
+
+ return C(err);
+ });
+ });
+
+ req.on("error", function(err) {
+ return C(err);
+ });
+
+ req.on("upgrade", function(res, sock) {
+ sock.setTimeout(0);
+ sock.removeAllListeners("error");
+ sock.removeAllListeners("close");
+ sock.resume();
+
+ if (res.headers["upgrade"] != "winksock/1") {
+ sock.destroy(new Error("Bad protocol version " + res.headers["upgrade"]));
+ }
+
+ return C(null, sock);
+ });
+
+ req.end();
+ }
+
+ dorequest(url);
+}
+
+
+Connection.prototype.open = function(chan, id, mode, token) {
+ var self = this;
+ var channels = this.channels;
+ var oldchan;
+ var request;
+
+ if ((oldchan = channels[id]) && !oldchan._closing) {
+ process.nextTick(function() {
+ finalizeDestroyChannel(chan, new Error("Channel is already open"));
+ });
+ return null;
+ }
+
+ request = new OpenRequest(this, id, mode, token);
+
+ request.onresponse = function(newid, message) {
+ chan._open(newid, message);
+ };
+
+ request.onclose = function(err) {
+ if (err) { finalizeDestroyChannel(chan, err); }
+ };
+
+ if (this.sock && !oldchan) {
+ // Do not send request if socket isnt handshaked yet, or
+ // if a channel is open and waiting for an ENDSIG.
+ request.send();
+ }
+
+ return request;
+};
+
+
+Connection.prototype.setDisposed = function(state) {
+ var id = this.id;
+ var sock = this.sock;
+ var self = this;
+
+ if (!this.id || !sock) return;
+
+ if (state) {
+
+ if (sock) {
+ sock.setTimeout(200);
+ sock.once("timeout", function() {
+ self.destroy();
+ });
+ }
+
+ Connection.disposed[id] = this;
+ Connection.all[id] = undefined;
+
+ } else {
+
+ delete Connection.disposed[id];
+ Connection.all[id] = this;
+
+ if (sock) {
+ sock.setTimeout(0);
+ sock.removeAllListeners("timeout");
+ }
+ }
+};
+
+
+// Write a `Packet` to the underlying socket.
+Connection.prototype.write = function(frame) {
+ if (this.sock) {
+ return this.sock.write(frame.toBuffer());
+ } else {
+ return false;
+ }
+};
+
+
+Connection.prototype.processOpen = function(id, flag, data, start, end) {
+ var request;
+
+ if (!(request = this.requests[id])) {
+ sock.destroy(new Error("Server sent an open response to unknown"));
+ return;
+ }
+
+ request.processResponse(flag, data, start, end);
+};
+
+
+Connection.prototype.processData = function(id, flag, data, start, end) {
+ var channels = this.channels;
+ var chan;
+
+ if (id === ALL_CHANNELS) {
+ for (var chanid in channels) {
+ chan = channels[chanid];
+ if (chan.readable) {
+ chan.ondata && chan.ondata(data, start, end, flag);
+ }
+ }
+ } else if ((chan = channels[id])) {
+ if (chan.readable) {
+ chan.ondata && chan.ondata(data, start, end, flag);
+ }
+ }
+};
+
+
+Connection.prototype.processSignal = function(id, flag, data, start, end) {
+ var channels = this.channels;
+ var requests = this.requests;
+ var chan;
+ var message;
+
+ switch (flag) {
+
+ case SignalFrame.FLAG_EMIT:
+ if (id === ALL_CHANNELS) {
+ for (var chanid in channels) {
+ chan = channels[chanid];
+ if (chan._closing == false) {
+ chan.onsignal && chan.onsignal(data, start, end);
+ }
+ }
+ } else if ((chan = channels[id])) {
+ if (chan._closing == false) {
+ chan.onsignal && chan.onsignal(data, start, end);
+ }
+ }
+ break;
+
+ case SignalFrame.FLAG_END:
+ case SignalFrame.FLAG_ERROR:
+
+ if (end - start) {
+ message = data.toString("utf8", start, end);
+ }
+
+ if (id === ALL_CHANNELS) {
+ if (flag != SignalFrame.FLAG_END) {
+ this.destroy(new Error(message || "ERR_UNKNOWN"));
+ } else {
+ this.destroy(null, message);
+ }
+ return;
+ }
+
+ if (!(chan = channels[id])) {
+ // Protocol violation. Channel does not exists in client. Ignore
+ // for now.
+
+ return;
+ }
+
+ if (chan._closing) {
+ // User requested to close this channel. This ENDSIG is a
+ // response to that request. It is now safe to destroy
+ // channel. Note: We are intentionally not sending the message
+ // to the function, because channel is closed according
+ // to client.
+
+ finalizeDestroyChannel(chan);
+
+ if (requests[id]) {
+ // Send pending open request if exists.
+ requests[id].send();
+ }
+
+ } else {
+ // Server closed this channel. We need to respond with a
+ // ENDSIG in order to let server now that we received this
+ // signal.
+
+ try {
+ this.write(new SignalFrame(id, SignalFrame.FLAG_END));
+ } catch (writeException) {
+ this.destroy(writeException);
+ }
+
+ if (flag != SignalFrame.FLAG_END) {
+ finalizeDestroyChannel(chan, new Error(message || "ERR_UNKNOWN"));
+ } else {
+ finalizeDestroyChannel(chan, null, message);
+ }
+ }
+ break;
+
+ default:
+ this.destroy(new Error("Server sent an unknown SIGFLAG"));
+ return;
+ }
+
+};
+
+
+// Destroy connection with optional Error
+Connection.prototype.destroy = function(err, message) {
+ var id = this.id;
+ var channels = this.channels;
+ var requests = this.requests;
+ var chan;
+ var request;
+ var queued;
+
+ if (!id) {
+ return;
+ }
+
+ this.id = null;
+
+ for (var chanid in channels) {
+ if ((chan = channels[chanid])) {
+ finalizeDestroyChannel(chan, err, message);
+ }
+ }
+
+ for (var reqid in requests) {
+ if ((request = requests[reqid])) {
+ request.destroyAndNext(err);
+ }
+ }
+
+ this.channels = {};
+ this.requests = {};
+ this.chanRefCount = 0;
+ this.reqRefCount = 0;
+
+ delete Connection.all[id];
+ delete Connection.disposed[id];
+
+ if (this.sock) {
+ this.sock.destroy();
+ this.sock = null;
+ }
+};
+
+// OpenRequest constructor.
+function OpenRequest(conn, id, flag, data) {
+ var requests = conn.requests;
+ var next;
+
+ this.conn = conn;
+ this.id = id;
+ this.flag = flag;
+ this.data = data;
+ this.present = false;
+ this.sent = false;
+ this.destroyed = false;
+
+ this.prev = null;
+ this.next = null;
+
+ if ((next = requests[id])) {
+ while (next.next && (next = next.next)) {};
+ next.next = this;
+ } else {
+ requests[id] = this;
+ }
+
+ conn.reqRefCount++;
+}
+
+
+// Open Flags
+OpenRequest.FLAG_ALLOW = 0x0;
+OpenRequest.FLAG_REDIRECT = 0x1;
+OpenRequest.FLAG_DENY = 0x7;
+
+
+OpenRequest.prototype.send = function() {
+ var self = this;
+
+ if (this.present) {
+ return;
+ }
+
+ this.present = true;
+
+ if (this.sent) {
+ throw new Error("OpenRequest is already sent");
+ }
+
+
+ process.nextTick(function() {
+ self.sent = true;
+ try {
+ self.conn.write(self);
+ } catch (err) {
+ self.conn.destroy(err);
+ }
+ });
+
+};
+
+
+OpenRequest.prototype.cancel = function() {
+ var id = this.id;
+ var conn = this.conn;
+ var requests = conn.requests;
+ var next;
+
+
+ if (this.sent) {
+ // We cannot cancel if request is already sent.
+
+ return false;
+ }
+
+ if (requests[id] == this) {
+ if (this.next) {
+ requests[id] = this.next;
+ } else {
+ delete requests[id];
+ }
+ } else if (this.prev) {
+ this.prev = this.next;
+ }
+
+ this.destroy();
+
+ return true;
+};
+
+
+OpenRequest.prototype.destroy = function(err, message) {
+ var conn;
+
+ if (!this.destroyed) {
+ if ((conn = this.conn) && conn.id) {
+ conn.reqRefCount--;
+ if (conn.reqRefCount == 0 &&
+ conn.chanRefCount == 0) {
+ conn.setDisposed(true);
+ }
+ }
+ this.onclose && this.onclose(err, message);
+ this.destroyed = true;
+ }
+};
+
+
+// Destroy this OpenRequest and all other in chain
+OpenRequest.prototype.destroyAndNext = function(err) {
+ if (this.next) {
+ this.next.destroyAndNext(err);
+ }
+ this.destroy(err);
+}
+
+
+OpenRequest.prototype.processResponse = function(flag, data, start, end) {
+ var conn = this.conn;
+ var request;
+ var newid;
+ var message;
+ var len;
+
+ if (this.next) {
+ if (flag == OpenRequest.FLAG_ALLOW) {
+ this.next.destroyAndNext(new Error("Channel is already open"));
+ } else {
+ this.next.prev = null;
+ conn.requests[this.id] = this.next;
+ conn.requests[this.id].send();
+ }
+ } else {
+ delete conn.requests[this.id];
+ }
+
+ len = end - start;
+
+ switch (flag) {
+
+ case OpenRequest.FLAG_ALLOW:
+ if (len) {
+ try {
+ message = data.toString("utf8", start, end);
+ } catch (err) {
+ this.destroy(err);
+ return;
+ }
+ }
+ this.onresponse(this.id, message);
+ this.destroy();
+ break;
+
+ case OpenRequest.FLAG_REDIRECT:
+
+ if (len < 4) {
+ conn.destroy(new Error("Bad open resp"));
+ return;
+ }
+
+ newid = (data[start + 1] << 16 |
+ data[start + 2] << 8 |
+ data[start + 3]) + (data[start] << 24 >>> 0);
+
+ if (len > 4) {
+ try {
+ message = data.toString("utf8", start + 4, end);
+ } catch (err) {
+ this.destroy(err);
+ return;
+ }
+ }
+
+ this.onresponse(newid, message);
+ this.destroy();
+ break;
+
+ default:
+ try {
+ message = len ? data.toString("utf8", start, end) : null;
+ } catch (err) {}
+ this.destroy(new Error(message || "ERR_OPEN_DENIED"));
+ break;
+ }
+};
+
+
+OpenRequest.prototype.toBuffer = function() {
+ var id = this.id;
+ var data = this.data;
+ var flag = this.flag;
+ var buffer;
+ var length;
+
+ length = 7 + (data ? data.length : 0);
+
+ buffer = new Buffer(length);
+ buffer[0] = length >>> 8;
+ buffer[1] = length % 256;
+ buffer[2] = id >>> 24;
+ buffer[3] = id >>> 16;
+ buffer[4] = id >>> 8;
+ buffer[5] = id % 256;
+ buffer[6] = 0x1 << 3 | flag;
+
+ if (length > 7) {
+ data.copy(buffer, 7);
+ }
+
+ return buffer;
+};
+
+
+function DataFrame(id, flag, data) {
+ this.id = id;
+ this.flag = flag;
+ this.data = data;
+}
+
+DataFrame.prototype.toBuffer = function() {
+ var id = this.id;
+ var data = this.data;
+ var flag = this.flag;
+ var buffer;
+ var length;
+
+ length = 7 + (data ? data.length : 0);
+
+ buffer = new Buffer(length);
+ buffer[0] = length >>> 8;
+ buffer[1] = length % 256;
+ buffer[2] = id >>> 24;
+ buffer[3] = id >>> 16;
+ buffer[4] = id >>> 8;
+ buffer[5] = id % 256;
+ buffer[6] = 0x2 << 3 | flag;
+
+ if (length > 7) {
+ data.copy(buffer, 7);
+ }
+
+ return buffer;
+};
+
+
+function SignalFrame(id, flag, data) {
+ this.id = id;
+ this.flag = flag;
+ this.data = data;
+}
+
+// Signal flags
+SignalFrame.FLAG_EMIT = 0x0;
+SignalFrame.FLAG_END = 0x1;
+SignalFrame.FLAG_ERROR = 0x7;
+
+
+SignalFrame.prototype.toBuffer = function() {
+ var id = this.id;
+ var data = this.data;
+ var flag = this.flag;
+ var buffer;
+ var length;
+
+ length = 7 + (data ? data.length : 0);
+
+ buffer = new Buffer(length);
+ buffer[0] = length >>> 8;
+ buffer[1] = length % 256;
+ buffer[2] = id >>> 24;
+ buffer[3] = id >>> 16;
+ buffer[4] = id >>> 8;
+ buffer[5] = id % 256;
+ buffer[6] = 0x3 << 3 | flag;
+
+ if (length > 7) {
+ data.copy(buffer, 7);
+ }
+
+ return buffer;
+};
+
+
+function parserImplementation(conn) {
+ var buffer = null;
+ var offset = 0;
+ var length = 0;
+
+ conn.sock.ondata = function(chunk, start, end) {
+ var tmpbuff;
+ var packet;
+ var packetlen;
+ var ch;
+ var op;
+ var flag;
+
+ if (buffer) {
+ tmpbuff = new Buffer((length - offset) + (end - start));
+ buffer.copy(tmpbuff, 0, offset, length);
+ chunk.copy(tmpbuff, (length - offset), start, end);
+ buffer = tmpbuff;
+ length = buffer.length;
+ offset = 0;
+ } else {
+ buffer = chunk;
+ offset = start;
+ length = end;
+ }
+
+ while (offset < length && conn.id) {
+
+ if (offset + 2 > length) {
+ // We have not received the length yet
+ break;
+ }
+
+ packetlen = buffer[offset] << 8 | buffer[offset + 1];
+
+ if (packetlen < 0x7) {
+ // Size is lower then packet header. Destroy wire
+ return conn.destroy(new Error("bad packet size"));
+ }
+
+ if (offset + packetlen > length) {
+ // We have not received the whole packet yet. Wait for
+ // more data.
+ break;
+ }
+
+ ch = (buffer[offset + 3] << 16 |
+ buffer[offset + 4] << 8 |
+ buffer[offset + 5]) + (buffer[offset + 2] << 24 >>> 0);
+
+ desc = buffer[offset + 6];
+ op = ((desc >> 1) & 0xf) >> 2;
+ flag = (desc << 1 & 0xf) >> 1;
+
+ switch (op) {
+
+ case 0x0: // NOOP
+ break;
+
+ case 0x1: // OPEN
+ conn.processOpen(ch, flag, buffer, offset + 7, offset + packetlen);
+ break;
+
+ case 0x2: // DATA
+ conn.processData(ch, flag, buffer, offset + 7, offset + packetlen);
+ break;
+
+ case 0x3: // SIGNAL
+ conn.processSignal(ch, flag, buffer, offset + 7, offset + packetlen);
+ break;
+ }
+
+ offset += packetlen;
+ }
+
+ if (length - offset === 0) {
+ buffer = null;
+ }
+ };
+};
+
+
+// Returns the binary representation of a mode expression. Returns null
+// on invalid mode.
+function getBinMode(modeExpr) {
+ var result = 0;
+ var match;
+
+ if (!modeExpr) {
+ return 0;
+ }
+
+ if (typeof modeExpr !== "string" || !(match = modeExpr.match(MODE_RE))) {
+ return null;
+ }
+
+ match[1] && (result |= READ);
+ match[2] && (result |= WRITE);
+ match[3] && (result |= EMIT);
+
+ return result;
+}
20 package.json
@@ -0,0 +1,20 @@
+{
+ "name": "hydna",
+ "version": "1.0.0",
+ "description": "Hydna client",
+ "keywords": ["hermes", "wink", "client", "hydna", "real-time", "messaging"],
+ "homepage": "https://github.com/hydna/node-hydna",
+ "author": "Hydna AB <info@hydna.com> (http://www.hydna.com/)",
+ "main": "./index",
+ "scripts": {
+ "test": "scripts/test.js -r test"
+ },
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/hydna/node-hydna.git"
+ },
+ "os": ["macos", "linux"],
+ "engines": {
+ "node": ">=0.4.7"
+ }
+}
191 scripts/test.js
@@ -0,0 +1,191 @@
+#!/usr/bin/env node
+//
+// Copyright 2010 Johan Dahlberg. All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
+// are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+var print = require("util").print
+var spawn = require("child_process").spawn
+var stat = require("fs").statSync
+var readdir = require("fs").readdirSync
+var basename = require("path").basename
+var join = require("path").join
+
+var USAGE = "Usage: test.js [options] filepath or dirpath";
+
+var HELP = USAGE + "\n" + "\
+Options: \n\
+ -h, --help Show this help \n\
+ -v, --version Shows current version \n\
+ -r, --recursive Recursive-mode. Selects all test in dirpath \n\
+ and its subdirectories. \n\
+ , --usage Show usage for command \n\
+ , --silent Silent-mode. \n\
+ , --logstdout Print's all data sent to test's stdout \n";
+
+function main() {
+ var args = process.argv.slice(2);
+ var arg = null;
+ var paths = [];
+ var tests = [];
+ var opts = {};
+ var longest = 0;
+ var errors = 0;
+ var failures = 0;
+ var passes = 0;
+
+ while ((arg = args.shift())) {
+ if (arg.substr(0, 2) == "--") {
+ opts[arg.substr(2)] = true;
+ } else if (arg[0] == "-") {
+ opts[arg.substr(1)] = true;
+ } else {
+ /^(\/|\~|\.)/.test(arg) ? paths.push(arg) :
+ paths.push(process.cwd() + '/' + arg);
+ }
+ }
+
+ if (!opts.r) {
+ opts.r = opts.recursive;
+ }
+
+ if (opts.help || opts.h) {
+ console.log(HELP);
+ return;
+ }
+
+ paths.forEach(function(path) {
+ stat(path).isDirectory() && (tests = tests.concat(files(path, opts.r)));
+ stat(path).isFile() && tests.push(path);
+ });
+
+ if (!tests.length || opts.usage) {
+ console.log(USAGE);
+ return;
+ }
+
+ if (opts.version || opts.v) {
+ console.log(VERSION);
+ return;
+ }
+
+ tests.forEach(function(path) {
+ longest = (path.length > longest && path.length) || longest;
+ });
+
+ function dots(str, l) {
+ var result = [];
+ var index = (l - str.length) + 3;
+ while (index--) {
+ result.push(".");
+ }
+ return result.join("");
+ }
+
+ function finish() {
+ !opts.silent && console.log("Passed: %s, Failed: %s, Errors: %s",
+ passes, failures, errors);
+
+ process.nextTick(function() {
+ // process.exit();
+ });
+ }
+
+ function runtests() {
+ var s = opts.silent;
+ var test = tests.shift();
+ var now = new Date().getTime();
+ !s && print(test + dots(test, longest));
+
+ exports.test(test, opts, [], function(error, failure) {
+ var secs = "(" + ((new Date().getTime() - now) / 1000) + " sec)";
+ error && ++errors && !s && print("error\n" + error);
+ failure && ++failures && !s && print("failed\n" + failure);
+ !error && !failure && ++passes && !s && print("ok " + secs + "\n");
+ process.nextTick((tests.length && runtests) || finish);
+ });
+ }
+
+ !opts.silent && console.log("Running %s tests", tests.length);
+ process.nextTick(runtests);
+}
+
+/**
+ * ## test.test(path, [options], [execargs], [callback])
+ *
+ * Spawns a new child process and runs specified `'path'`. The optional
+ * `'callback'` is called when child process exits. The first argument is set
+ * if an error occured.
+ *
+ * Available options:
+ * * logstdout - Prints all data from child's stdout to current process stdout.
+ */
+exports.test = function() {
+ var args = Array.prototype.slice.call(arguments);
+ var path = args.shift();
+ var opts = !Array.isArray(args[0]) &&
+ typeof args[0] != "function" ? args.shift() : {};
+ var execArgs = Array.isArray(args[0]) ? args.shift() : [];
+ var callback = typeof args[0] == "function" ? args.shift() : null;
+ var uargs = [path].concat(typeof arguments[1] == "function" ? [] : args);
+ var proc = spawn(process.execPath, [path].concat(execArgs || []));
+ var err = null;
+
+ if (opts.logstdout && !opts.silent) {
+ proc.stdout.on("data", function(data) {
+ print(data);
+ });
+ }
+
+ proc.stderr.on("data", function(error) {
+ err = error;
+ });
+
+ proc.on("exit", function(code) {
+ callback && callback(null, err || code);
+ });
+
+ opts.debug && proc.stdout.on("data", function(data) {
+ print(data);
+ });
+
+}
+
+// Get all tests objects form specified directory.
+function files(dirpath, r) {
+ var result = [];
+ var paths = readdir(dirpath);
+
+ paths.forEach(function(path) {
+ var p = join(dirpath, path);
+ stat(p).isDirectory() && r && (result = result.concat(files(p, r)));
+ stat(p).isFile() && /^test/.test(basename(p)) && result.push(p);
+ });
+
+ return result;
+}
+
+// Run in exec mode if executed from
+// command line
+process.argv[1] == __filename && main();
BIN test/.speedtest.js.swp
Binary file not shown.
33 test/behaviors/redirect.js
@@ -0,0 +1,33 @@
+//
+// Copyright 2011 Hydna AB. All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
+// are met:
+//
+// 1. Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY HYDNA AB ``AS IS'' AND ANY EXPRESS OR IMPLIED
+// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+// EVENT SHALL HYDNA AB OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+// The views and conclusions contained in the software and documentation are
+// those of the authors and should not be interpreted as representing
+// official policies, either expressed or implied, of Hydna AB.
+//
+
+var connection = require("connection");
+
+connection.redirect(Math.floor(Math.random() * 0xFFFFFFFF) % 0xFFFFFFFF);
98 test/behaviors/setup.be
@@ -0,0 +1,98 @@
+//
+// Copyright 2011 Hydna AB. All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
+// are met:
+//
+// 1. Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY HYDNA AB ``AS IS'' AND ANY EXPRESS OR IMPLIED
+// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+// EVENT SHALL HYDNA AB OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+// The views and conclusions contained in the software and documentation are
+// those of the authors and should not be interpreted as representing
+// official policies, either expressed or implied, of Hydna AB.
+//
+
+
+//
+// Behaviors for testing suite
+//
+
+
+
+namespace = "test"
+ script = "redirect"
+ path = "./redirect.js"
+ end
+
+ script = "pong"
+ path = "./signal.js"
+ end
+
+ flag = "redirected"
+ connection
+ end
+
+end
+
+
+directive = "connect"
+
+ token = "redirect"
+ redirect("http://localhost:7010/redirected")
+ end
+
+ token = "redirected"
+ set("test:redirected")
+ end
+
+ token = "deny"
+ deny("DENIED_HANDSHAKE")
+ end
+
+end
+
+
+directive = "open"
+
+ channel = 0x1
+ run("test:redirect")
+ end
+
+ channel = 0x2
+ allow("OK")
+ end
+
+ channel = 0x3
+ deny("NOT_ALLOWED")
+ end
+
+ channel = 0x5
+ when = state("test:redirected")
+ allow("REDIRECTED")
+ end
+ deny("NOT_REDIRECTED")
+ end
+
+end
+
+directive = "emit"
+ channel = 0x00112233
+ run("test:pong")
+ end
+end
37 test/behaviors/signal.js
@@ -0,0 +1,37 @@
+//
+// Copyright 2011 Hydna AB. All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
+// are met:
+//
+// 1. Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY HYDNA AB ``AS IS'' AND ANY EXPRESS OR IMPLIED
+// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+// EVENT SHALL HYDNA AB OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+// The views and conclusions contained in the software and documentation are
+// those of the authors and should not be interpreted as representing
+// official policies, either expressed or implied, of Hydna AB.
+//
+
+var signal = require("signal");
+
+if (script.env.TOKEN == "ping") {
+ signal.reply("pong");
+} else {
+ signal.reply("bad token");
+}
67 test/common.js
@@ -0,0 +1,67 @@
+var Buffer = require("buffer").Buffer;
+var Channel = require("../index").Channel;
+
+exports.TEST_HOST = process.env["TEST_ADDRESS"] || "localhost:7010";
+exports.TEST_CH = exports.TEST_HOST + "/x112233";
+
+var timer = null;
+
+exports.createTestChannel = function(mode, ignoreErrors) {
+ var chan = new Channel();
+ var url = exports.TEST_CH;
+
+ if (typeof ignoreErrors == "number") {
+ url = exports.TEST_HOST + "/" + ignoreErrors;
+ ignoreErrors = false;
+ }
+
+ chan.connect(url, mode);
+
+ if (ignoreErrors) {
+ chan.on("error", function() { });
+ }
+
+ return chan;
+}
+
+exports.shutdown = function() {
+ clearTimeout(timer);
+ process.exit();
+}
+
+exports.timeout = function(timeout) {
+ timer = setTimeout(function() {
+ throw new Error("Timeout reached");
+ }, timeout);
+}
+
+exports.streamErrHandler = function(exception) {
+ throw exception;
+}
+
+exports.createPayload = function(size) {
+ var payload = new Buffer(size);
+ var index = size;
+
+ while (index--) {
+ payload[index] = Math.floor(Math.random() * 256);
+ }
+
+ return payload
+}
+
+exports.compareBuffers = function(bufferA, bufferB) {
+ var index = bufferA.length;
+
+ if (index != bufferB.length) {
+ return false;
+ }
+
+ while (index--) {
+ if (bufferA[index] != bufferB[index]) {
+ return false;
+ }
+ }
+
+ return true;
+}
29 test/test-1000-messages.js
@@ -0,0 +1,29 @@
+var ok = require("assert").ok;
+var timeout = require("./common").timeout;
+var shutdown = require("./common").shutdown;
+var createTestChannel = require("./common").createTestChannel;
+var createPayload = require("./common").createPayload;
+var compareBuffers = require("./common").compareBuffers;
+
+var chan;
+var payload;
+var count = 0;
+
+timeout(5000);
+
+payload = createPayload(512);
+chan = createTestChannel("rw");
+chan.on("connect", function() {
+ for(var i = 0; i < 1000; i++) {
+ chan.write(payload);
+ }
+});
+chan.on("data", function(data) {
+ ok(compareBuffers(payload, data));
+ if (++count == 1000) {
+ chan.destroy();
+ }
+});
+chan.on("close", function() {
+ shutdown();
+});
46 test/test-conn-close.js
@@ -0,0 +1,46 @@
+var ok = require("assert").ok;
+var equal = require("assert").equal;
+var throws = require("assert").throws;
+var timeout = require("./common").timeout;
+var shutdown = require("./common").shutdown;
+var createTestChannel = require("./common").createTestChannel;
+
+timeout(5000);
+
+function partone() {
+ var chan;
+ var errorraised;
+ chan = createTestChannel("rw");
+ chan.on("connect", function() {
+ this._connection.destroy(new Error("Test Error"));
+ });
+ chan.on("error", function(exception) {
+ equal(exception.message, "Test Error");
+ errorraised = true;
+ });
+ chan.on("close", function() {
+ ok(errorraised);
+ equal(this._connection, null);
+ process.nextTick(parttwo);
+ });
+}
+
+function parttwo() {
+ var chan;
+ var errorraised;
+ chan = createTestChannel("rw");
+ chan.on("connect", function() {
+ this._connection.sock.end();
+ });
+ chan.on("error", function(exception) {
+ equal(exception.message, "Connection reseted by server");
+ errorraised = true;
+ });
+ chan.on("close", function() {
+ ok(errorraised);
+ equal(this._connection, null);
+ shutdown();
+ });
+}
+
+process.nextTick(partone);
22 test/test-connect-close.js
@@ -0,0 +1,22 @@
+var ok = require("assert").ok;
+var equal = require("assert").equal;
+var throws = require("assert").throws;
+var timeout = require("./common").timeout;
+var shutdown = require("./common").shutdown;
+var createTestChannel = require("./common").createTestChannel;
+var chanErrHandler = require("./common").chanErrHandler;
+
+var chan;
+
+timeout(5000);
+
+chan = createTestChannel("rw");
+chan.on("connect", function() {
+ ok(this.readable);
+ ok(this.writable);
+ equal(this.readyState, "readwrite");
+ chan.destroy();
+});
+chan.on("close", function() {
+ shutdown();
+});
65 test/test-encoding.js
@@ -0,0 +1,65 @@
+var ok = require("assert").ok;
+var throws = require("assert").throws;
+var doesNotThrow = require("assert").doesNotThrow;
+var equal = require("assert").equal;
+var deepEqual = require("assert").deepEqual;
+var timeout = require("./common").timeout;
+var shutdown = require("./common").shutdown;
+var createTestChannel = require("./common").createTestChannel;
+
+var chan;
+var successfullTests = 0;
+
+timeout(5000);
+
+chan = createTestChannel("rw");
+chan.on("connect", testAscii);
+chan.on("close", function() {
+ equal(successfullTests, 4);
+ shutdown();
+});
+
+throws(function() {
+ chan.setEncoding("NA");
+});
+
+function testAscii() {
+ chan.setEncoding("ascii");
+ chan.once("data", function(data) {
+ equal(data, "ascii");
+ successfullTests++;
+ process.nextTick(testUtf8);
+ });
+ chan.write("ascii", "ascii");
+}
+
+function testUtf8() {
+ chan.setEncoding("utf8");
+ chan.once("data", function(data) {
+ equal(data, "åäö");
+ successfullTests++;
+ process.nextTick(testJson);
+ });
+ chan.write("åäö", "utf8");
+}
+
+function testJson() {
+ var graph = { data: "json" };
+ chan.setEncoding("json");
+ chan.once("data", function(data) {
+ deepEqual(data, graph);
+ successfullTests++;
+ process.nextTick(testBase64);
+ });
+ chan.write(graph, "json");
+}
+
+function testBase64() {
+ chan.setEncoding("ascii");
+ chan.once("data", function(data) {
+ equal(data, "base64");
+ successfullTests++;
+ chan.destroy();
+ });
+ chan.write("YmFzZTY0", "base64");
+}
50 test/test-handshake.js
@@ -0,0 +1,50 @@
+var ok = require("assert").ok;
+var equal = require("assert").equal;
+var throws = require("assert").throws;
+
+var Channel = require("../index").Channel;
+
+var timeout = require("./common").timeout;
+var shutdown = require("./common").shutdown;
+var createTestChannel = require("./common").createTestChannel;
+var chanErrHandler = require("./common").chanErrHandler;
+var TEST_HOST = require("./common").TEST_HOST;
+
+timeout(5000);
+
+
+function handshakeRedirect() {
+ var chan;
+
+ chan = new Channel();
+ chan.connect("redirect@" + TEST_HOST + "/5");
+
+ chan.on("connect", function(message) {
+ equal(message, "REDIRECTED");
+ chan.destroy();
+ });
+ chan.on("close", function() {
+ process.nextTick(handshakeDeny);
+ });
+}
+
+
+function handshakeDeny() {
+ var chan;
+
+ chan = new Channel();
+ chan.connect("deny@" + TEST_HOST + "/5");
+
+ chan.on("connect", function(message) {
+ throw new Error("Received connect");
+ });
+ chan.on("error", function(err) {
+ ok(/DENIED_HANDSHAKE/.test(err.message));
+ chan.destroy();
+ });
+ chan.on("close", function() {
+ shutdown();
+ });
+}
+
+process.nextTick(handshakeRedirect);
24 test/test-maxlimit.js
@@ -0,0 +1,24 @@
+var throws = require("assert").throws;
+var timeout = require("./common").timeout;
+var shutdown = require("./common").shutdown;
+var createTestChannel = require("./common").createTestChannel;
+var createPayload = require("./common").createPayload;
+
+var PAYLOAD_MAX_SIZE = require("../index").PAYLOAD_MAX_SIZE;
+
+var chan;
+var payload;
+
+timeout(5000);
+
+payload = createPayload(PAYLOAD_MAX_SIZE + 1);
+chan = createTestChannel("rw");
+chan.on("connect", function() {
+ throws(function() {
+ chan.write(payload);
+ });
+ chan.destroy();
+});
+chan.on("close", function() {
+ shutdown();
+});
35 test/test-mode.js
@@ -0,0 +1,35 @@
+var doesNotThrow = require("assert").doesNotThrow;
+var throws = require("assert").throws;
+var timeout = require("./common").timeout;
+var shutdown = require("./common").shutdown;
+var createTestChannel = require("./common").createTestChannel;
+
+throws(function() {
+ createTestChannel("readwrite-signal");
+});
+
+throws(function() {
+ createTestChannel("not valid");
+});
+
+doesNotThrow(function() {
+ createTestChannel(null).destroy();
+ createTestChannel("r").destroy();
+ createTestChannel("r+e").destroy();
+ createTestChannel("r+emit").destroy();
+ createTestChannel("read").destroy();
+ createTestChannel("read+e").destroy();
+ createTestChannel("read+emit").destroy();
+ createTestChannel("w").destroy();
+ createTestChannel("w+e").destroy();
+ createTestChannel("w+emit").destroy();
+ createTestChannel("write").destroy();
+ createTestChannel("write+e").destroy();
+ createTestChannel("write+emit").destroy();
+ createTestChannel("rw").destroy();
+ createTestChannel("rw+e").destroy();
+ createTestChannel("rw+emit").destroy();
+ createTestChannel("readwrite").destroy();
+ createTestChannel("readwrite+e").destroy();
+ createTestChannel("readwrite+emit").destroy();
+});
46 test/test-open.js
@@ -0,0 +1,46 @@
+var ok = require("assert").ok;
+var equal = require("assert").equal;
+var throws = require("assert").throws;
+var timeout = require("./common").timeout;
+var shutdown = require("./common").shutdown;
+var createTestChannel = require("./common").createTestChannel;
+var chanErrHandler = require("./common").chanErrHandler;
+
+
+timeout(5000);
+
+
+function openWithMessage() {
+ var chan;
+
+ chan = createTestChannel("rw", 2);
+ chan.on("connect", function(message) {
+ equal(message, "OK");
+ ok(this.readable);
+ ok(this.writable);
+ equal(this.readyState, "readwrite");
+ chan.destroy();
+ });
+ chan.on("close", function() {
+ process.nextTick(denyWithMessage);
+ });
+}
+
+
+function denyWithMessage() {
+ var chan;
+
+ chan = createTestChannel("rw", 3);
+ chan.on("connect", function(message) {
+ throw new Error("Received connect");
+ });
+ chan.on("error", function(err) {
+ equal(err.message, "NOT_ALLOWED");
+ chan.destroy();
+ });
+ chan.on("close", function() {
+ shutdown();
+ });
+}
+
+process.nextTick(openWithMessage);
26 test/test-pending-open.js
@@ -0,0 +1,26 @@
+var ok = require("assert").ok;
+var timeout = require("./common").timeout;
+var shutdown = require("./common").shutdown;
+var Channel = require("../index").Channel;
+
+var TEST_HOST = require("./common").TEST_HOST
+
+var NO_REQUESTS = 100;
+
+var chan;
+var payload;
+var count = 0;
+
+timeout(5000);
+
+function onconnect() {
+ if (++count == NO_REQUESTS) {
+ shutdown();
+ }
+}
+
+for (var i = 0; i < NO_REQUESTS; i++) {
+ chan = new Channel();
+ chan.connect(TEST_HOST + "/1", "r");
+ chan.on("connect", onconnect);
+}
52 test/test-read-write-only.js