Skip to content

Commit

Permalink
Heartbeats. Closes #22
Browse files Browse the repository at this point in the history
  • Loading branch information
hueniverse committed Sep 12, 2015
1 parent c0fe8f2 commit 13ec84c
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 15 deletions.
12 changes: 10 additions & 2 deletions API.md
@@ -1,4 +1,4 @@
# 0.4.x API Reference
# 0.5.x API Reference

- [Registration](#registration)
- [Server](#server)
Expand Down Expand Up @@ -84,9 +84,17 @@ method. The plugin accepts the following optional registration options:
- `isHttpOnly` - the cookie HTTP only flag when using type `'cookie'`. Defaults to `true`.
- `path` - the cookie path when using type `'cookie'`. Defaults to `'/'`.
- `domain` - the cookie domain when using type `'cookie'`. Defaults to no domain.
- `ttl` - the cookie expiration milliseconds when using type `'cookie'`. Defaults to current session only.
- `ttl` - the cookie expiration milliseconds when using type `'cookie'`. Defaults to current
session only.
- `headers` - an optional array of header field names to include in server responses to the client.
If set to `'*'` (without an array), allows all headers. Defaults to `null` (no headers).
- `heartbeat` - configures connection keep-alive settings where value can be:
- `false` - no heartbeats.
- an object with:
- `interval` - time interval between heartbeat messages in milliseconds. Defaults to `15000`
(15 seconds).
- `timeout` - timeout in milliseconds after a heartbeat is sent to the client and before the
client is considered disconnected by the server. Defaults to `5000` (5 seconds).

## Server

Expand Down
53 changes: 46 additions & 7 deletions PROTOCOL.md
@@ -1,21 +1,22 @@
# Protocol v0.4.x
# nes Protocol v0.5.x

## Message

The nes protocol consists of JSON messages sent between the client and server.

Each incoming request from the client to the server contains:
- `type` - the message type:
- `'ping'` - heartbeat response.
- `'hello'` - connection initialization and authentication.
- `'request'` - endpoint request.
- `'sub'` - subscribe to a path.
- `'unsub'` - unsubscribe from a path.
- `'message'` - send custom message.
- `id` - a unique per-client request id (number or string).
- additional type-specific fields.

Each outgoing request from the server to the client contains:
- `type` - the message type:
- `'ping'` - heartbeat request.
- `'hello'` - connection initialization and authentication.
- `'request'` - endpoint request.
- `'sub'` - subscribe to a path.
Expand Down Expand Up @@ -48,6 +49,35 @@ For example:
}
```

## Heartbeat

Flow: `server` -> `client` -> `server`

For cases where it is not possible for the TCP connection to determine if the connection is still active,
the server sends a heartbeat message to the client every configured interval, and then expects the client
to respond within a configured timeout. The server sends:
- `type` - set to `'ping'`.

For example:

```js
{
type: 'ping'
}
```

When the client receives the message, it sends back:
- `type` - set to `'ping'`.
- `id` - a unique per-client request id (number or string).

For example:

```js
{
type: 'ping',
id: 6
}
```

## Hello

Expand All @@ -56,7 +86,7 @@ Flow: `client` -> `server` -> `client`
Every client connection must first be initialized with a `hello` message. The client sends a message to the server
with the following:
- `type` - set to `'hello'`.
- `id` - unique request identifier.
- `id` - a unique per-client request id (number or string).
- `auth` - optional authentication credentials. Can be any value understood by the server.
- `subs` - an optional array of strings indicating the path subscriptions the client is interested in.

Expand All @@ -78,6 +108,15 @@ For example:
The server respond by sending a message back with the following:
- `type` - set to `'hello'`.
- `id` - the same `id` received from the client.
- `heartbeat` - the server heartbeat configuration which can be:
- `false` - no heartbeats will be sent.
- an object with:
- `interval` - the heartbeat interval in milliseconds.
- `timeout` - the time from sending a heartbeat to the client until a response is expected
before a connection is considered closed by the server.

Note: the client should assume the connection is closed if it has not heard from the server in
heartbeat.interval + heartbeat.timeout.

For example:

Expand Down Expand Up @@ -128,7 +167,7 @@ Flow: `client` -> `server` -> `client`

Request a resource from the server where:
- `type` - set to `'request'`.
- `id` - unique request identifier.
- `id` - a unique per-client request id (number or string).
- `method` - the corresponding HTTP method (e.g. `'GET'`).
- `path` - the requested resource (can be an HTTP path of resource name).
- `headers` - an optional object with the request headers (each header name is a key with a corresponding value).
Expand Down Expand Up @@ -178,7 +217,7 @@ Flow: `client` -> `server` [-> `client`]

Sends a custom message to the server where:
- `type` - set to `'message'`.
- `id` - unique request identifier.
- `id` - a unique per-client request id (number or string).
- `message` - any value (string, object, etc.).

For example:
Expand Down Expand Up @@ -215,7 +254,7 @@ Flow: `client` -> `server` [-> `client`]

Sends a subscription request to the server:
- `type` - set to `'sub'`.
- `id` - unique request identifier.
- `id` - a unique per-client request id (number or string).
- `path` - the requested subscription path.

For example:
Expand Down Expand Up @@ -252,7 +291,7 @@ Flow: `client` -> `server`

Unsubscribe from a server subscription:
- `type` - set to `'unsub'`.
- `id` - unique request identifier.
- `id` - a unique per-client request id (number or string).
- `path` - the subscription path.

For example:
Expand Down
32 changes: 32 additions & 0 deletions lib/client.js
Expand Up @@ -37,6 +37,7 @@

this._url = url;
this._settings = options; // node.js only
this._heartbeatTimeout = false; // Server heartbeat configuration

// State

Expand All @@ -45,6 +46,7 @@
this._ids = 0; // Id counter
this._requests = {}; // id -> callback
this._subscriptions = {}; // path -> [callbacks]
this._heartbeat = null;

// Events

Expand Down Expand Up @@ -140,6 +142,7 @@
Client.prototype._onClose = function () {

this._ws = null;
clearTimeout(this._heartbeat);

// Flush pending requests

Expand Down Expand Up @@ -365,6 +368,8 @@

var self = this;

this._beat();

parse(message.data, function (err, update) {

if (err) {
Expand All @@ -384,6 +389,12 @@
error.headers = update.headers;
}

// Ping

if (update.type === 'ping') {
return self._send({ type: 'ping' }, false); // Ignore errors
}

// Broadcast

if (update.type === 'broadcast') {
Expand Down Expand Up @@ -425,13 +436,34 @@
// Authentication

if (update.type === 'hello') {
if (update.heartbeat) {
self._heartbeatTimeout = update.heartbeat.interval + update.heartbeat.timeout;
self._beat(); // Call again once timeout is set
}

return callback(error);
}

return self.onError(new Error('Received unknown response type: ' + update.type));
});
};

Client.prototype._beat = function () {

var self = this;

if (!self._heartbeatTimeout) {
return;
}

clearTimeout(this._heartbeat);

this._heartbeat = setTimeout(function () {

self._ws.close();
}, self._heartbeatTimeout);
};

Client.prototype._notifyHandlers = function (path, err, message) {

var handlers = this._subscriptions[path];
Expand Down
13 changes: 11 additions & 2 deletions lib/index.js
Expand Up @@ -21,7 +21,11 @@ var internals = {
isHttpOnly: true,
path: '/'
},
headers: null
headers: null,
heartbeat: {
interval: 15000, // 15 seconds
timeout: 5000 // 5 seconds
}
}
};

Expand Down Expand Up @@ -52,7 +56,12 @@ internals.schema = Joi.object({
})
.allow(false)
.required(),
headers: Joi.array().items(Joi.string().lowercase()).min(1).allow('*', null)
headers: Joi.array().items(Joi.string().lowercase()).min(1).allow('*', null),
heartbeat: Joi.object({
interval: Joi.number().integer().min(1).required(),
timeout: Joi.number().integer().min(1).less(Joi.ref('interval')).required()
})
.allow(false)
});


Expand Down
61 changes: 61 additions & 0 deletions lib/listener.js
Expand Up @@ -28,6 +28,8 @@ exports = module.exports = internals.Listener = function (connection, settings)
this._router = new Call.Router();
this._authRoute = this._settings.auth && connection.lookup(this._settings.auth.id);
this._socketCounter = internals.counter.min;
this._heartbeat = null;
this._timeout = null;

// WebSocket listener

Expand All @@ -47,6 +49,10 @@ exports = module.exports = internals.Listener = function (connection, settings)
connection.plugins.nes = {
_listener: this
};

// Start heartbeats

this._beat();
};


Expand Down Expand Up @@ -76,6 +82,9 @@ internals.Listener.prototype._add = function (ws) {

internals.Listener.prototype._close = function () {

clearTimeout(this._heartbeat);
clearTimeout(this._timeout);

this._wss.close();
};

Expand All @@ -95,6 +104,52 @@ internals.Listener.prototype._authRequired = function () {
};


internals.Listener.prototype._beat = function () {

var self = this;

if (!this._settings.heartbeat) {
return;
}

if (this._heartbeat && // Skip the first time
this._sockets.length()) {

// Send heartbeats

var update = {
type: 'ping'
};

this._sockets.forEach(function (socket) {

socket._send(update);
});

// Verify client responded

this._timeout = setTimeout(function () {

self._sockets.forEach(function (socket) {

if (!socket._pinged) {
socket.disconnect();
}

socket._pinged = false;
});
}, this._settings.heartbeat.timeout);
}

// Schedule next heartbeat

this._heartbeat = setTimeout(function () {

self._beat();
}, this._settings.heartbeat.interval);
};


internals.Listener.broadcast = function (message) {

var update = {
Expand Down Expand Up @@ -332,6 +387,12 @@ internals.Sockets.prototype.forEach = function (each) {
};


internals.Sockets.prototype.length = function () {

return Object.keys(this._items).length;
};


// Subscribers manager

internals.Subscribers = function () {
Expand Down
9 changes: 8 additions & 1 deletion lib/socket.js
Expand Up @@ -18,6 +18,7 @@ exports = module.exports = internals.Socket = function (ws, listener) {
this._ws = ws;
this._listener = listener;
this._helloed = false;
this._pinged = false;
this._subscriptions = {};

this.id = this._listener._generateId();
Expand Down Expand Up @@ -106,6 +107,11 @@ internals.Socket.prototype._onMessage = function (message) {

// Initialization and Authentication

if (request.type === 'ping') {
self._pinged = true;
return;
}

if (request.type === 'hello') {
return self._processHello(request);
}
Expand Down Expand Up @@ -173,7 +179,8 @@ internals.Socket.prototype._processHello = function (request) {

var response = {
type: 'hello',
id: request.id
id: request.id,
heartbeat: this._listener._settings.heartbeat
};

if (!request.auth) {
Expand Down

0 comments on commit 13ec84c

Please sign in to comment.