Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging infrastructure #14

Merged
merged 3 commits into from
Feb 8, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/chat/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ io.configure(function () {
io.set('store', new SbStore({
topic: topic,
subscription: subscription,
connectionString: sbconn
connectionString: sbconn,
listeners: SbStore.logging.console
}));

io.set('transports', ['xhr-polling']);
Expand Down
110 changes: 110 additions & 0 deletions lib/logging.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Copyright (c) Microsoft. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

var util = require('util');

/**
* Convenience function to create listener objects
* which wire up to the events on the store
* and service bus interface.
*
* @param {object} handlers
* object containing handlers for the various
* events from the store & service bus. Looks
* for methods with the following names:
*
* onsubscribe(name, consumer) - a new subscriber to a store message
* onreceived(name, message) - message has been received, varargs for rest
* of message arguments
* onunsubscribe(name, consumer) - unsubscribe from store
* onpoll(err, message) - poll cycle on service bus completed
* onsberror(err) - error when interacting with service bus
*
*/

function makeListener(handlers) {
var storeEvents = ['subscribe', 'unsubscribe', 'received'];
var sbEvents = ['poll', 'sberror'];

function wireHandlers(emitter, eventNames) {
eventNames.forEach(function (eventName) {
var handlerFunc = handlers['on' + eventName];
if (handlerFunc) {
emitter.on(eventName, handlerFunc.bind(handlers));
}
});
}

var listener = {
store: function (store) {
wireHandlers(store, storeEvents);
if (handlers.init) {
handlers.init(store);
}
},

sb: function (sb) {
wireHandlers(sb, sbEvents);
}
};

return listener;
}

/**
* Logger object which outputs everything to the console
*
*/
var consoleLog = makeListener({
init: function (store) {
this.nodeId = store.nodeId;
},

onsubscribe: function (name, consumer) {
log('SbStore has new subscriber for %s events', name);
},

onreceived: function (name, message) {
log('SbStore nodeId %s received message of type %s from node %s',
this.nodeId, message.name, message.nodeId);
},

onunsubscribe: function (name, consumer) {
log('SbStore unsubscribing from %s', name);
},

onpoll: function (err, message) {
if (err === 'No messages to receive') {
log('Service bus poll completed, no message');
} else {
log('Service bus poll completed, err = %s, message = %s', err, util.inspect(message));
}
},

onsberror: function (err) {
log('Service bus error: %s', err.message);
}
});

// helper for formatting

function log() {
console.log(util.format.apply(null, arguments));
}

module.exports = {
makeListener: makeListener,
console: consoleLog
};
28 changes: 19 additions & 9 deletions lib/sbstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ var io = require('socket.io')
, uuid = require('node-uuid')
, azure = require('azure')
, Formatter = require('./formatter')
, logging = require('./logging')
, ServiceBusInterface = require('./servicebusinterface');

exports = module.exports = ServiceBusStore;
ServiceBusStore.Client = Client;
ServiceBusStore.logging = logging;

/**
* construct the store that uses Service Bus to communicate
Expand All @@ -32,6 +34,7 @@ ServiceBusStore.Client = Client;
* - nodeId: unique string identifying this node, defaults to random uuid
* - topic: service bus topic name to communicate over
* - subscription: service bus subscription to listen on for messages
* - listeners: optional array of listener objects to hook up. Primarily used for loggers
*/
function ServiceBusStore(options) {
io.Store.apply(this, arguments);
Expand All @@ -41,6 +44,8 @@ function ServiceBusStore(options) {
this.sb = this.createServiceBusInterface(options);
this.subscribers = {};

this.hookupListeners(options && options.listeners);

this.sb.start(this.receiveMessage.bind(this));
}

Expand Down Expand Up @@ -71,7 +76,6 @@ ServiceBusStore.prototype.publish = function (name) {
* @api private
*/
ServiceBusStore.prototype.subscribe = function (name, consumer) {
log('SbStore has new subscriber for %s events', name);
var subscribers = this.subscribers[name] || [];
subscribers.push(consumer);
this.subscribers[name] = subscribers;
Expand All @@ -80,7 +84,6 @@ ServiceBusStore.prototype.subscribe = function (name, consumer) {
}

ServiceBusStore.prototype.unsubscribe = function (name, consumer) {
log('SbStore unsubscribing from %s', name);
var subscribers = this.subscribers[name] || [];
subscribers = subscribers.filter(function (item) { item !== consumer; });
this.subscribers[name] = subscribers;
Expand All @@ -96,9 +99,8 @@ ServiceBusStore.prototype.destroy = function () {

ServiceBusStore.prototype.receiveMessage = function (sbMessage) {
var message = this.formatter.unpack(sbMessage);
log('Store nodeId %s Received message of type %s from node %s', this.nodeId, message.name, message.nodeId);
this.emit.apply(this, ['received', message.name, message]);
if (message.nodeId !== this.nodeId) {
log('Store nodeId %s sending message to subscribers', this.nodeId);
var subscribers = this.subscribers[message.name] || [];
subscribers.forEach(function (sub) {
sub.apply(null, message.args);
Expand Down Expand Up @@ -130,6 +132,19 @@ ServiceBusStore.prototype.createFormatter = function (nodeId) {
return new Formatter(nodeId);
}

ServiceBusStore.prototype.hookupListeners = function (listeners) {
listeners = listeners || [];
if (!(listeners instanceof Array)) {
listeners = [listeners];
}

var that = this;
listeners.forEach(function (l) {
l.store(that);
l.sb(that.sb);
});
}

function Client() {
io.Store.Client.apply(this, arguments);
this.data = {};
Expand Down Expand Up @@ -236,8 +251,3 @@ function invoke(fn) {
});
}
}

// Helper function for logging
function log() {
console.log(util.format.apply(null, arguments));
}
8 changes: 5 additions & 3 deletions lib/servicebusinterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// Interface layer between store and service bus

var azure = require('azure')
, EventEmitter = require('events').EventEmitter
, util = require('util');

module.exports = ServiceBusInterface;
Expand All @@ -26,14 +27,15 @@ function ServiceBusInterface(serviceBusService, topic, subscription) {
this.subscription = subscription;
}

util.inherits(ServiceBusInterface, EventEmitter);

ServiceBusInterface.prototype.start = function (messageHandler) {
var self = this;
this.shouldStop = false;

function pollSb() {
log('Waiting for service bus message from subscription %s', self.subscription);
self.serviceBusService.receiveSubscriptionMessage(self.topic, self.subscription, function (err, receivedMessage) {
log('Received message from sb, err = %s, message = %s', err, util.inspect(receivedMessage));
self.emit('poll', err, receivedMessage);

if (!err) {
messageHandler(receivedMessage);
Expand All @@ -58,7 +60,7 @@ ServiceBusInterface.prototype.stop = function (cb) {
ServiceBusInterface.prototype.send = function (message) {
this.serviceBusService.sendTopicMessage(this.topic, message, function (err) {
if (err) {
log('Failed to write to service bus on topic %s, err = %s', this.topic, util.inspect(err));
this.emit('sberror', new Error('Failed to write to service bus on topic %s, err = %s', this.topic, util.inspect(err)));
}
});
}
Expand Down
3 changes: 2 additions & 1 deletion test/mocks/servicebuscreation.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ function mockServiceBusCreation() {
sinon.stub(SbStore.prototype, 'createServiceBusInterface', function (options) {
return {
start: sinon.stub(),
send: sinon.stub()
send: sinon.stub(),
on: sinon.stub()
};
});
}
Expand Down
37 changes: 27 additions & 10 deletions test/store-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ describe('Service Bus Store objects', function() {

describe('when creating', function () {
var store;
var listener = {
store: sinon.spy(),
sb: sinon.spy()
};

before(function () {
store = new SbStore({});
store = new SbStore({ listeners: [listener]});
});

it('should create a formatter', function () {
Expand All @@ -49,6 +53,13 @@ describe('Service Bus Store objects', function() {
it('should start the service bus polling', function () {
store.sb.start.called.should.be.true;
});

it('should hook up listeners', function () {
listener.store.calledOnce.should.be.true;
listener.store.getCall(0).calledWithExactly(store).should.be.true;
listener.sb.calledOnce.should.be.true;
listener.sb.getCall(0).calledWithExactly(store.sb).should.be.true;
});
});

describe('when publishing', function () {
Expand Down Expand Up @@ -99,12 +110,14 @@ describe('Service Bus Store objects', function() {
var subscriber2a = sinon.spy();
var subscriber3 = sinon.spy();

var subscribeMessageListener = sinon.spy();
var listeners = {
onreceived: sinon.spy(),
onsubscribe: sinon.spy(),
onunsubscribe: sinon.spy()
};

before(function () {
store = new SbStore({});

store.on('subscribe', subscribeMessageListener);
store = new SbStore({listeners: [SbStore.logging.makeListener(listeners)]});

store.subscribe('message1', subscriber1);
store.subscribe('message2', subscriber2);
Expand All @@ -117,11 +130,11 @@ describe('Service Bus Store objects', function() {
});

it('should emit subscribe events', function () {
subscribeMessageListener.callCount.should.equal(4);
subscribeMessageListener.calledWith('message1', subscriber1).should.be.true;
subscribeMessageListener.calledWith('message2', subscriber2).should.be.true;
subscribeMessageListener.calledWith('message2', subscriber2a).should.be.true;
subscribeMessageListener.calledWith('message3', subscriber3).should.be.true;
listeners.onsubscribe.callCount.should.equal(4);
listeners.onsubscribe.calledWith('message1', subscriber1).should.be.true;
listeners.onsubscribe.calledWith('message2', subscriber2).should.be.true;
listeners.onsubscribe.calledWith('message2', subscriber2a).should.be.true;
listeners.onsubscribe.calledWith('message3', subscriber3).should.be.true;
});

it('should call all subscribers when message received', function () {
Expand All @@ -134,6 +147,10 @@ describe('Service Bus Store objects', function() {
subscriber3.called.should.be.false;
});

it('should emit received event when message received', function () {
listeners.onreceived.calledOnce.should.be.true;
});

it('should not call subscribers for messages from itself', function () {
var receivedMessage = store.formatter.pack('message3', 'a message');
store.receiveMessage(receivedMessage);
Expand Down