Skip to content

Commit

Permalink
Call "onSubscribed" handler for pubsub commands
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Aug 1, 2013
1 parent 549e388 commit 8bc0de8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
3 changes: 3 additions & 0 deletions lib/adapter/command.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ util.inherits(PublishSubscribe, Command);
Object.defineProperty(PublishSubscribe.prototype, 'onSubscribe', {
get: function() { return this._options.onSubscribe; }
});
Object.defineProperty(PublishSubscribe.prototype, 'onSubscribed', {
get: function() { return this._options.onSubscribed; }
});
Object.defineProperty(PublishSubscribe.prototype, 'onPublish', {
get: function() { return this._options.onPublish; }
});
Expand Down
27 changes: 15 additions & 12 deletions lib/adapter/socket.io.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,17 @@ exports.register = function(application, server, params) {

var event = commandName;

var callback = null;
var requestHandler = 'onRequest';
var responseHandler = 'onResponse';

if (command.PublishSubscribe.isInstance(commandDefinition)) {
event += '.subscribe';
requestHandler = 'onSubscribe';
responseHandler = 'onSubscribed';
}

var options = {};
if (command.RequestResponse.isInstance(commandDefinition)) {
callback = function(error, envelope) {
var callback = function(error, envelope) {
if (error) {
socket.emit('error', error);
return;
Expand All @@ -59,9 +66,9 @@ exports.register = function(application, server, params) {
if (clientOptions.responseEvent)
responseEvent = clientOptions.responseEvent;

if (commandDefinition.onResponse) {
if (commandDefinition[responseHandler]) {
try {
commandDefinition.onResponse(responseData, wrappedSocket);
commandDefinition[responseHandler](responseData, wrappedSocket);
} catch(error) {
wrappedSocket.emit('error', error.message || error);
}
Expand All @@ -70,17 +77,13 @@ exports.register = function(application, server, params) {
wrappedSocket.destroy();
}
};
options.timeout = DEFAULT_TIMEOUT;
} else if (command.PublishSubscribe.isInstance(commandDefinition)) {
event += '.subscribe';
}

options.timeout = DEFAULT_TIMEOUT;
options.sessionId = socket.id;

var wrappedConection = new wrapper.DroongaProtocolConnectionWrapper(connection, callback, options);
if (commandDefinition.onRequest) {
if (commandDefinition[requestHandler]) {
try {
commandDefinition.onRequest(data, wrappedConection);
commandDefinition[requestHandler](data, wrappedConection);
} catch(error) {
wrappedConection.destroy();
socket.emit('error', error.message || error);
Expand Down

0 comments on commit 8bc0de8

Please sign in to comment.