Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

New logging #44

Merged
merged 13 commits into from
This page is out of date. Refresh to see the latest.
View
2  examples/chat/app.js
@@ -79,7 +79,7 @@ io.configure(function () {
topic: topic,
subscription: subscription,
connectionString: sbconn,
- listeners: SbStore.logging.console
+ logger: io.get('logger')
}));
io.set('transports', ['xhr-polling']);
View
111 lib/logging.js
@@ -1,111 +0,0 @@
-/**
-* Copyright (c) Microsoft. All rights reserved.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-'use strict';
-
-var util = require('util');
-
-/**
- * Convenience function to create listener objects
- * which wire up to the events on the store
- * and service bus interface.
- *
- * @param {object} handlers
- * object containing handlers for the various
- * events from the store & service bus. Looks
- * for methods with the following names:
- *
- * onsubscribe(name, consumer) - a new subscriber to a store message
- * onreceived(nodeId, name, args) - message has been received, args as array
- * onunsubscribe(name, consumer) - unsubscribe from store
- * onpoll(err, message) - poll cycle on service bus completed
- * onsberror(err) - error when interacting with service bus
- *
- */
-
-function makeListener(handlers) {
- var storeEvents = ['subscribe', 'unsubscribe', 'received'];
- var sbEvents = ['poll', 'sberror'];
-
- function wireHandlers(emitter, eventNames) {
- eventNames.forEach(function (eventName) {
- var handlerFunc = handlers['on' + eventName];
- if (handlerFunc) {
- emitter.on(eventName, handlerFunc.bind(handlers));
- }
- });
- }
-
- var listener = {
- store: function (store) {
- wireHandlers(store, storeEvents);
- if (handlers.init) {
- handlers.init(store);
- }
- },
-
- sb: function (sb) {
- wireHandlers(sb, sbEvents);
- }
- };
-
- return listener;
-}
-
-/**
- * Logger object which outputs everything to the console
- *
- */
- var consoleLog = makeListener({
- init: function (store) {
- this.nodeId = store.nodeId;
- },
-
- onsubscribe: function (name, consumer) {
- log('SbStore has new subscriber for %s events', name);
- },
-
- onreceived: function (nodeId, name, args) {
- log('SbStore nodeId %s received message of type %s from node %s',
- this.nodeId, name, nodeId);
- },
-
- onunsubscribe: function (name, consumer) {
- log('SbStore unsubscribing from %s', name);
- },
-
- onpoll: function (err, message) {
- if (err === 'No messages to receive') {
- log('Service bus poll completed, no message');
- } else {
- log('Service bus poll completed, err = %s, message = %s', err, util.inspect(message));
- }
- },
-
- onsberror: function (err) {
- log('Service bus error: %s', err.message);
- }
- });
-
-// helper for formatting
-
-function log() {
- console.log(util.format.apply(null, arguments));
-}
-
-module.exports = {
- makeListener: makeListener,
- console: consoleLog
-};
View
41 lib/sbstore.js
@@ -20,14 +20,12 @@ var io = require('socket.io')
, uuid = require('node-uuid')
, azure = require('azure')
, Client = require('./sbclient')
- , logging = require('./logging')
, MessageBatcher = require('./messagebatcher')
, MessageSequencer = require('./messagesequencer')
, ServiceBusConnector = require('./servicebusconnector');
exports = module.exports = ServiceBusStore;
ServiceBusStore.Client = Client;
-ServiceBusStore.logging = logging;
/**
* construct the store that uses Service Bus to communicate
@@ -43,8 +41,14 @@ ServiceBusStore.logging = logging;
function ServiceBusStore(options) {
io.Store.apply(this, arguments);
+ this.log = options && options.logger;
+
this.nodeId = (options && options.nodeId) || uuid.v4();
this.sb = this.createServiceBusConnector(options);
+
+ this.log && this.log.info('Service Bus Store created', 'host:' + this.serviceBusService.host,
+ 'topic:' + options.topic, 'sub:' + options.subscription);
+
this.subscribers = {};
this.hookupListeners(options && options.listeners);
@@ -95,10 +99,13 @@ ServiceBusStore.prototype.unsubscribe = function (name, consumer) {
this.emit('unsubscribe', name, consumer);
}
-ServiceBusStore.prototype.destroy = function () {
- Store.prototype.destroy.call(this);
- this.sb.stop();
- this.subscribers = {};
+ServiceBusStore.prototype.destroy = function (stoppedCallback) {
+ var self = this;
+ this.sb.stop(function () {
+ io.Store.prototype.destroy.call(self);
+ self.subscribers = {};
+ stoppedCallback && stoppedCallback();
+ });
}
ServiceBusStore.prototype.receiveMessage = function (sourceNodeId, name, args) {
@@ -120,24 +127,17 @@ ServiceBusStore.prototype.createServiceBusConnector = function createServiceBusC
throw new Error('Must specify one of connectionString or serviceBusService in options');
}
- var serviceBusService;
-
if (options.connectionString) {
- serviceBusService = azure.createServiceBusService(options.connectionString);
- } else {
- serviceBusService = options.serviceBusService;
+ options.serviceBusService = azure.createServiceBusService(options.connectionString);
}
- var createOptions = {
- nodeId: this.nodeId,
- topic: options.topic,
- subscription: options.subscription,
- serviceBusService: serviceBusService
- };
+ this.serviceBusService = options.serviceBusService;
+ options.logger = this.log;
+ options.nodeId = this.nodeId;
- return new MessageBatcher(createOptions,
- new MessageSequencer(createOptions,
- new ServiceBusConnector(createOptions)));
+ return new MessageBatcher(options,
+ new MessageSequencer(options,
+ new ServiceBusConnector(options)));
}
ServiceBusStore.prototype.hookupListeners = function (listeners) {
@@ -152,4 +152,3 @@ ServiceBusStore.prototype.hookupListeners = function (listeners) {
l.sb(that.sb);
});
}
-
View
20 lib/servicebusconnector.js
@@ -36,6 +36,7 @@ function ServiceBusConnector(options) {
this.subscription = options.subscription;
this.numReceives = options.numReceives || DEFAULT_SIMULTANEOUS_RECEIVES;
this.receivesRunning = 0;
+ this.log = options.logger;
}
util.inherits(ServiceBusConnector, EventEmitter);
@@ -47,7 +48,9 @@ ServiceBusConnector.prototype.start = function () {
function pollSb() {
self.serviceBusReceiver.receiveSubscriptionMessage(self.topic, self.subscription, function (err, receivedMessage) {
- self.emit('poll', err, receivedMessage);
+ if (err === 'No messages to receive') {
+ self.log && self.log.debug('Service Bus poll: no message');
+ }
if (!err) {
var msg = self.unpackMessage(receivedMessage);
@@ -62,6 +65,7 @@ ServiceBusConnector.prototype.start = function () {
pollSb();
} else {
--self.receivesRunning;
+ self.log && self.log.info('Service Bus poll stopped', 'num:' + self.receivesRunning);
if(self.receivesRunning === 0) {
self.stopCallback && self.stopCallback(null);
}
@@ -71,6 +75,7 @@ ServiceBusConnector.prototype.start = function () {
for(var i = 0; i < this.numReceives; ++i) {
pollSb();
+ this.log && this.log.info('Service Bus poll started', 'num:' + i);
}
this.receivesRunning = this.numReceives;
}
@@ -84,9 +89,9 @@ ServiceBusConnector.prototype.send = function (name, args) {
var self = this;
var message = this.packMessage(name, args);
this.serviceBusSender.sendTopicMessage(this.topic, message, function (err) {
-
if (err) {
- self.emit('sberror', new Error('Failed to write to service bus on topic %s, err = %s', self.topic, util.inspect(err)));
+ self.log && self.log.error('Service Bus send to topic failed',
+ 'topic:' + self.topic, 'error:' + err.toString());
}
});
}
@@ -111,9 +116,18 @@ ServiceBusConnector.prototype.unpackMessage = function(message) {
try {
result.args = JSON.parse(message.body);
+ this.log && this.log.info('Service Bus received message', 'from:' + result.nodeId, 'message:' + result.name);
+ this.log && this.log.debug('Service Bus received message', 'messageId:' + message.brokerProperties.MessageId);
return result;
} catch (ex) {
// Issue unpacking the message, assume it's bad and toss it
+ this.log && this.log.warn('Service Bus bad message received',
+ 'CorrelationId:' + message.brokerProperties.CorrelationId,
+ 'Label:' + message.brokerProperties.Label,
+ 'SequenceNumber:' + message.brokerProperties.SequenceNumber,
+ 'size:' + message.brokerProperties.Size,
+ 'enqueuedTime:' + message.brokerProperties.EnqueuedTimeUtc,
+ 'messageId:' + message.brokerProperties.MessageId);
return result;
}
}
View
168 test/logging-tests.js
@@ -0,0 +1,168 @@
+/**
+* Copyright (c) Microsoft. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+'use strict';
+
+var should = require('should');
+var sinon = require('sinon');
+
+var SbStore = require('../lib/sbstore');
+
+describe('logging', function () {
+ var store;
+ var logger;
+ var serviceBusService;
+ var recvFuncs;
+ var clock;
+
+ beforeEach(function () {
+ clock = sinon.useFakeTimers();
+ recvFuncs = [];
+
+ serviceBusService = {
+ receiveSubscriptionMessage: function (topic, sub, callback) {
+ recvFuncs.push(callback);
+ },
+ sendTopicMessage: sinon.spy(),
+ withFilter: function (filter) { return this; },
+ host: 'testnamespace.servicebus.example'
+ };
+
+ logger = {
+ error: sinon.spy(),
+ warn: sinon.spy(),
+ info: sinon.spy(),
+ debug: sinon.spy()
+ };
+
+ store = new SbStore({
+ nodeId: 'logtestnode',
+ topic: 'testtopic',
+ subscription: 'testsubscription',
+ serviceBusService: serviceBusService,
+ numReceives: 2,
+ logger: logger,
+ flushIntervalMS: 100
+ });
+ });
+
+ afterEach(function () {
+ clock.restore();
+ });
+
+ it('should log subscription info on creation', function () {
+ logger.info.calledWith('Service Bus Store created',
+ 'host:' + serviceBusService.host,
+ 'topic:testtopic',
+ 'sub:testsubscription').should.be.true;
+ });
+
+ it('should log when poll request starts up', function () {
+ logger.info.calledWith('Service Bus poll started', 'num:0').should.be.true;
+ logger.info.calledWith('Service Bus poll started', 'num:1').should.be.true;
+ });
+
+ it('should log when polling stops', function (done) {
+ store.destroy(function () {
+ logger.info.calledWith('Service Bus poll stopped', 'num:1').should.be.true;
+ logger.info.calledWith('Service Bus poll stopped', 'num:0').should.be.true;
+ done();
+ });
+
+ // trigger the receive poll callbacks so they exit
+ recvNothing();
+ recvNothing();
+ });
+
+ it('should log when poll completes without a message', function () {
+ recvNothing();
+ logger.debug.calledWith('Service Bus poll: no message').should.be.true;
+ });
+
+ it('should log when a bad message is received', function () {
+ var message = {
+ brokerProperties: {
+ CorrelationId: 'sourceNode',
+ Label: 'aMessage',
+ SequenceNumber: 1
+ },
+ body: 'This will not deserialize'
+ };
+
+ recv(message);
+ logger.warn.calledWith('Service Bus bad message received',
+ 'CorrelationId:sourceNode', 'Label:aMessage', 'SequenceNumber:1')
+ .should.be.true;
+ });
+
+ it('should log when good message is received', function () {
+ var message = {
+ brokerProperties: {
+ CorrelationId: 'sourceNode',
+ Label: 'aMessage',
+ SequenceNumber: 2
+ },
+ body: JSON.stringify([1, 2, 3])
+ };
+
+ recv(message);
+
+ logger.info.calledWith('Service Bus received message',
+ 'from:sourceNode', 'message:aMessage').should.be.true;
+ });
+
+ it('should log details when good message is received', function () {
+ var message = {
+ brokerProperties: {
+ CorrelationId: 'sourceNode',
+ Label: 'aMessage',
+ SequenceNumber: 2,
+ EnqueuedTimeUtc: new Date().toString(),
+ MessageId: 1234
+ },
+ body: JSON.stringify([1, 2, 3])
+ };
+ message.brokerProperties.Size = message.body.length;
+
+ recv(message);
+
+ logger.debug.calledWith('Service Bus received message',
+ 'messageId:' + message.brokerProperties.MessageId).should.be.true;
+ });
+
+ it('should log failure to send', function () {
+ store.publish('aMessage', 1, 2, 3);
+ // message batcher flush
+ clock.tick(300);
+
+ var sendCallback = serviceBusService.sendTopicMessage.getCall(0).args[2];
+ sendCallback('Service Bus send failed');
+
+ logger.error.calledWith('Service Bus send to topic failed',
+ 'topic:testtopic', 'error:Service Bus send failed').should.be.true;
+ });
+
+ // Helpers for sending messages
+ function recvNothing() {
+ var recvFunc = recvFuncs.shift();
+ recvFunc('No messages to receive');
+ }
+
+ function recv(message) {
+ var recvFunc = recvFuncs.shift();
+ recvFunc(null, message);
+ }
+
+});
View
10 test/servicebusconnector-tests.js
@@ -114,16 +114,6 @@ describe('Service Bus connection layer', function () {
sentMessage.body.should.equal('"hello"');
});
-
- it('should emit sberror event if service bus send fails', function () {
- var stub = sinon.stub().callsArgWith(2, new Error('Fake failure'));
- sb.sendTopicMessage = stub;
- var error = null;
- connector.on('sberror', function (err) { console.log('there was an error'); error = err; })
- connector.send('msg', 'hello', {seq: 6, next: 7});
-
- error.should.exist;
- });
});
describe('when receiving with one receive at a time', function () {
View
5 test/store-tests.js
@@ -112,7 +112,10 @@ describe('Service Bus Store objects', function() {
};
before(function () {
- store = new SbStore({listeners: [SbStore.logging.makeListener(listeners)]});
+ store = new SbStore();
+ store.on('subscribe', listeners.onsubscribe);
+ store.on('received', listeners.onreceived);
+ store.on('unsubscribe', listeners.onunsubscribe);
store.subscribe('message1', subscriber1);
store.subscribe('message2', subscriber2);
Something went wrong with that request. Please try again.