Skip to content

Commit

Permalink
Merge pull request #13 from christav/add-logging-working-13
Browse files Browse the repository at this point in the history
Add logging
  • Loading branch information
Chris Tavares committed Mar 18, 2013
2 parents affbd03 + 44c89be commit 6b56a9d
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 147 deletions.
2 changes: 1 addition & 1 deletion examples/chat/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ io.configure(function () {
topic: topic,
subscription: subscription,
connectionString: sbconn,
listeners: SbStore.logging.console
logger: io.get('logger')
}));

io.set('transports', ['xhr-polling']);
Expand Down
111 changes: 0 additions & 111 deletions lib/logging.js

This file was deleted.

41 changes: 20 additions & 21 deletions lib/sbstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ var io = require('socket.io')
, uuid = require('node-uuid')
, azure = require('azure')
, Client = require('./sbclient')
, logging = require('./logging')
, MessageBatcher = require('./messagebatcher')
, MessageSequencer = require('./messagesequencer')
, ServiceBusConnector = require('./servicebusconnector');

exports = module.exports = ServiceBusStore;
ServiceBusStore.Client = Client;
ServiceBusStore.logging = logging;

/**
* construct the store that uses Service Bus to communicate
Expand All @@ -43,8 +41,14 @@ ServiceBusStore.logging = logging;
function ServiceBusStore(options) {
io.Store.apply(this, arguments);

this.log = options && options.logger;

this.nodeId = (options && options.nodeId) || uuid.v4();
this.sb = this.createServiceBusConnector(options);

this.log && this.log.info('Service Bus Store created', 'host:' + this.serviceBusService.host,
'topic:' + options.topic, 'sub:' + options.subscription);

this.subscribers = {};

this.hookupListeners(options && options.listeners);
Expand Down Expand Up @@ -95,10 +99,13 @@ ServiceBusStore.prototype.unsubscribe = function (name, consumer) {
this.emit('unsubscribe', name, consumer);
}

ServiceBusStore.prototype.destroy = function () {
Store.prototype.destroy.call(this);
this.sb.stop();
this.subscribers = {};
ServiceBusStore.prototype.destroy = function (stoppedCallback) {
var self = this;
this.sb.stop(function () {
io.Store.prototype.destroy.call(self);
self.subscribers = {};
stoppedCallback && stoppedCallback();
});
}

ServiceBusStore.prototype.receiveMessage = function (sourceNodeId, name, args) {
Expand All @@ -120,24 +127,17 @@ ServiceBusStore.prototype.createServiceBusConnector = function createServiceBusC
throw new Error('Must specify one of connectionString or serviceBusService in options');
}

var serviceBusService;

if (options.connectionString) {
serviceBusService = azure.createServiceBusService(options.connectionString);
} else {
serviceBusService = options.serviceBusService;
options.serviceBusService = azure.createServiceBusService(options.connectionString);
}

var createOptions = {
nodeId: this.nodeId,
topic: options.topic,
subscription: options.subscription,
serviceBusService: serviceBusService
};
this.serviceBusService = options.serviceBusService;
options.logger = this.log;
options.nodeId = this.nodeId;

return new MessageBatcher(createOptions,
new MessageSequencer(createOptions,
new ServiceBusConnector(createOptions)));
return new MessageBatcher(options,
new MessageSequencer(options,
new ServiceBusConnector(options)));
}

ServiceBusStore.prototype.hookupListeners = function (listeners) {
Expand All @@ -152,4 +152,3 @@ ServiceBusStore.prototype.hookupListeners = function (listeners) {
l.sb(that.sb);
});
}

20 changes: 17 additions & 3 deletions lib/servicebusconnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ function ServiceBusConnector(options) {
this.subscription = options.subscription;
this.numReceives = options.numReceives || DEFAULT_SIMULTANEOUS_RECEIVES;
this.receivesRunning = 0;
this.log = options.logger;
}

util.inherits(ServiceBusConnector, EventEmitter);
Expand All @@ -47,7 +48,9 @@ ServiceBusConnector.prototype.start = function () {
function pollSb() {
self.serviceBusReceiver.receiveSubscriptionMessage(self.topic, self.subscription, function (err, receivedMessage) {

self.emit('poll', err, receivedMessage);
if (err === 'No messages to receive') {
self.log && self.log.debug('Service Bus poll: no message');
}

if (!err) {
var msg = self.unpackMessage(receivedMessage);
Expand All @@ -62,6 +65,7 @@ ServiceBusConnector.prototype.start = function () {
pollSb();
} else {
--self.receivesRunning;
self.log && self.log.info('Service Bus poll stopped', 'num:' + self.receivesRunning);
if(self.receivesRunning === 0) {
self.stopCallback && self.stopCallback(null);
}
Expand All @@ -71,6 +75,7 @@ ServiceBusConnector.prototype.start = function () {

for(var i = 0; i < this.numReceives; ++i) {
pollSb();
this.log && this.log.info('Service Bus poll started', 'num:' + i);
}
this.receivesRunning = this.numReceives;
}
Expand All @@ -84,9 +89,9 @@ ServiceBusConnector.prototype.send = function (name, args) {
var self = this;
var message = this.packMessage(name, args);
this.serviceBusSender.sendTopicMessage(this.topic, message, function (err) {

if (err) {
self.emit('sberror', new Error('Failed to write to service bus on topic %s, err = %s', self.topic, util.inspect(err)));
self.log && self.log.error('Service Bus send to topic failed',
'topic:' + self.topic, 'error:' + err.toString());
}
});
}
Expand All @@ -111,9 +116,18 @@ ServiceBusConnector.prototype.unpackMessage = function(message) {

try {
result.args = JSON.parse(message.body);
this.log && this.log.info('Service Bus received message', 'from:' + result.nodeId, 'message:' + result.name);
this.log && this.log.debug('Service Bus received message', 'messageId:' + message.brokerProperties.MessageId);
return result;
} catch (ex) {
// Issue unpacking the message, assume it's bad and toss it
this.log && this.log.warn('Service Bus bad message received',
'CorrelationId:' + message.brokerProperties.CorrelationId,
'Label:' + message.brokerProperties.Label,
'SequenceNumber:' + message.brokerProperties.SequenceNumber,
'size:' + message.brokerProperties.Size,
'enqueuedTime:' + message.brokerProperties.EnqueuedTimeUtc,
'messageId:' + message.brokerProperties.MessageId);
return result;
}
}
Expand Down
Loading

0 comments on commit 6b56a9d

Please sign in to comment.