Skip to content

Commit

Permalink
Merge pull request #93 from happner/emit-optimisations-2.7.2
Browse files Browse the repository at this point in the history
Emit optimisations 2.7.2
  • Loading branch information
JensEggers committed Jul 1, 2016
2 parents d7394d0 + 74fab01 commit bc9f201
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 48 deletions.
17 changes: 12 additions & 5 deletions lib/client/plugins/intra-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ module.exports = {
}),

validateRequest:function(path, action, data, parameters, handler){
// this.context.log.info('XXX - validateRequest()');

if (['login','set','get','remove','describe'].indexOf(action) == -1){
var error = new Error("invalid action: " + action);
Expand All @@ -79,6 +80,7 @@ module.exports = {
},

authorizeRequest:function(message, handler, callback){
// this.context.log.info('XXX - authorizeRequest()');

if (this.pubsub.trusted[this.session.id] == undefined) return this.pubsub.handleDataResponseLocal(this.securityService.AccessDeniedError('unauthorized'), message, null, handler, this);

Expand Down Expand Up @@ -110,6 +112,7 @@ module.exports = {
},

performRequest:function(path, action, data, parameters, handler) {
// this.context.log.info('XXX - performRequest()');

if (!parameters)
parameters = {};
Expand All @@ -130,9 +133,12 @@ module.exports = {
var _this = this;

this.authorizeRequest(message, handler, function(){
// _this.context.log.info('XXX - request authorized');

if (action == "set"){
if (parameters.noStore) return _this.pubsub.handleDataResponseLocal(null, message, _this.dataService.formatSetData(path, data), handler, _this);
if (parameters.noStore) {
return _this.pubsub.handleDataResponseLocal(null, message, _this.dataService.formatSetData(path, data), handler, _this);
}

_this.dataService.upsert(path, data, parameters, function(e, response){
return _this.pubsub.handleDataResponseLocal(e, message, response, handler, _this);
Expand Down Expand Up @@ -170,17 +176,18 @@ module.exports = {
});
}),
set: Promise.promisify(function(path, data, parameters, handler){
// this.context.log.info('XXX - set()');
var _this = this;
if (typeof parameters == 'function') {
handler = parameters;
parameters = {};
}

_this.setImmediate(function(){
_this.setInternal(path, data, parameters, handler);
}, _this.options.config.deferSetImmediate);
_this.setImmediate(function(){
_this.setInternal(path, data, parameters, handler);
}, _this.options.config.deferSetImmediate);
}),
setInternal:function(path, data, parameters, handler){
// this.context.log.info('XXX - setInternal()');
this.performRequest(path, "set", data, parameters, handler);
},
_remoteOff:function(channel, refCount, done){
Expand Down
4 changes: 4 additions & 0 deletions lib/services/data_embedded/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ DataEmbeddedService.prototype.get = function(path, parameters, callback){

DataEmbeddedService.prototype.formatSetData = function(path, data){

// this.log.info('XXX - formatSetData()');

if (typeof data != 'object' || data instanceof Array == true || data instanceof Date == true || data == null)
data = {value:data};

Expand All @@ -502,6 +504,7 @@ DataEmbeddedService.prototype.formatSetData = function(path, data){
}

DataEmbeddedService.prototype.upsert = function(path, data, options, callback){
// this.log.info('XXX - upsert()');
var _this = this;

options = options?options:{};
Expand Down Expand Up @@ -580,6 +583,7 @@ DataEmbeddedService.prototype.transform = function(dataObj, meta){
}

DataEmbeddedService.prototype.__upsertInternal = function(path, setData, options, dataWasMerged, callback){
// this.log.info('XXX - __upsertInternal()');
var _this = this;
var setParameters = {$set: {"data":setData.data, "_id":setData._meta.path}};

Expand Down
74 changes: 45 additions & 29 deletions lib/services/pubsub/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ PubSubService.prototype.formatReturnItems = function(items){
}

PubSubService.prototype.createResponse = function(e, message, response, local) {
// this.log.info('XXX - createResponse()');
var _this = this;

if (!response) response = {data:null};
Expand Down Expand Up @@ -347,12 +348,16 @@ PubSubService.prototype.decodeArrayResponse = function(response) {

PubSubService.prototype.handleDataResponseLocal = function(e, message, response, handler, client) {

// this.log.info('XXX - handleDataResponseLocal()');

if (e) return handler(e);

if (!response) return handler(null, response);

var localResponse = this.createResponse(null, message, response, true);

// this.log.info('XXX - createResponse() done');

var decoded;

if (localResponse && localResponse.data){
Expand Down Expand Up @@ -738,30 +743,30 @@ PubSubService.prototype.disconnect = function(socket) {
}
}

PubSubService.prototype.getAudienceGroup = function(channel) {
var _this = this;

if (channel == '/ALL@*')//listeners subscribed to everything
return _this.__listeners_ONALL;
PubSubService.prototype.getAudienceGroup = function(channel, opts) {
// this.log.info('XXX - getAudienceGroup() %s', channel);

// optimisation: indexOf(*) below is probably faster as `channel[channel.length - 1] == '*'`
// (assuming wildcards are only ever /at/the/end/of/the/path/*)
if (channel == '/ALL@*')//listeners subscribed to everything
return this.__listeners_ONALL;

var hasWildcard = channel.indexOf('*') > -1;
if (!opts) {
// opts is missing in calls from addListener() and removeListener()
opts = {
hasWildcard: channel.indexOf('*') > -1,
targetAction: channel.split('@')[0].replace('/','')
}
}

if (hasWildcard && channel.indexOf('/ALL@') == 0){//listeners subscribed to any action, but on a partial path
return _this.__listeners_wildcard_ALL;
} else if (hasWildcard){//listeners subscribed to a specific action, but on a partial path
return _this['__listeners_wildcard_' + channel.split('@')[0].replace('/','')];
} else {
return _this['__listeners_' + channel.split('@')[0].replace('/','')];
}
if (opts.hasWildcard) return this['__listeners_wildcard_' + opts.targetAction];
return this['__listeners_' + opts.targetAction];

}

PubSubService.prototype.emitToAudience = function(publication, channel, sharedRef) {
PubSubService.prototype.emitToAudience = function(publication, channel, opts) {
// this.log.info('XXX - emitToAudience() %s', channel);
var _this = this;

var audienceGroup = _this.getAudienceGroup(channel);
var audienceGroup = _this.getAudienceGroup(channel, opts);

if (audienceGroup[channel] != null && audienceGroup[channel] != undefined) {

Expand All @@ -783,26 +788,30 @@ PubSubService.prototype.emitToAudience = function(publication, channel, sharedRe
// }
// decoupledPublication = sharedRef.publication;

if (!sharedRef.serialized) {
sharedRef.serialized = JSON.stringify(publication)
if (!opts.serialized) {
opts.serialized = JSON.stringify(publication)
}
// still requires a separate copy per subscriber
// but only when payload encryption is turned on,
// for all other cases the data is cloned by virtue
// of the fact that it crosses the network
decoupledPublication = JSON.parse(sharedRef.serialized);
// for all other cases this step could be shared
// because the data is cloned by virtue of the fact
// that it crosses the network
decoupledPublication = JSON.parse(opts.serialized);
}

decoupledPublication._meta.channel = channel.toString();
decoupledPublication._meta.sessionId = _this.__sessions[sessionIndex].session.id;

// this.log.info('XXX - emitToAudience() writing');

_this.__sessions[sessionIndex].write(decoupledPublication);

}
}
}

PubSubService.prototype.publish = function(message, payload) {
// this.log.info('XXX - publish()');

payload._meta.published = true;

Expand All @@ -814,23 +823,30 @@ PubSubService.prototype.publish = function(message, payload) {
payload._meta.action = messageChannel;
payload._meta.type = 'data';

var sharedRef = {}; // so that serialised payload can be shared between
// multiple calls to emitAudience() below
var opts = { // 1. to allow shared .serialized between repetitive calls to emitToAudience()
hasWildcard: false, // 2. to avert repetitive test for indexOf(*) in getAudienceGroup()
targetAction: action // 3. to avert repetitive parsing of channel string to determine action in getAudienceGroup()
};

this.emitToAudience(payload, messageChannel, opts);
opts.targetAction = 'ALL';
this.emitToAudience(payload, '/ALL@' + message.path, opts);
this.emitToAudience(payload, '/ALL@*', opts);

this.emitToAudience(payload, messageChannel, sharedRef);
this.emitToAudience(payload, '/ALL@' + message.path, sharedRef);
this.emitToAudience(payload, '/ALL@*', sharedRef);
opts.hasWildcard = true; // all remaining emit attempts are to wildcard subscribers

for (var allPath in this.__listeners_wildcard_ALL){
if (this.happn.utils.wildcardMatch(allPath.replace('/ALL@/','/*@/'), messageChannel)) {
this.emitToAudience(payload, allPath, sharedRef);
this.emitToAudience(payload, allPath, opts);
}
}

opts.targetAction = action;

var wildcardActionGroup = this['__listeners_wildcard_' + action];
for (var actionPath in wildcardActionGroup){
if (this.happn.utils.wildcardMatch(actionPath, messageChannel)){
this.emitToAudience(payload, actionPath, sharedRef);
this.emitToAudience(payload, actionPath, opts);
}
}

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"benchmarket": "0.0.11",
"chai": "^3.5.0",
"coveralls": "^2.11.6",
"debug": "^2.2.0",
"expect.js": "*",
"gulp": "^3.9.1",
"happn-random-activity-generator": "^0.2.0",
Expand Down
Loading

0 comments on commit bc9f201

Please sign in to comment.