Skip to content

Commit

Permalink
onCluster option on publish to skip emit to cluster replication clients
Browse files Browse the repository at this point in the history
  • Loading branch information
nomilous committed Oct 11, 2016
1 parent 6c508a4 commit 6b1208a
Show file tree
Hide file tree
Showing 3 changed files with 419 additions and 9 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,12 @@ UNSUBSCRIBING FROM EVENTS
TAGGING
----------------------------
*You can do a set command and specify that you want to tag the data at the end of the path (or the data that is created as a result of the command), tagging will take a snapshot of the data as it currently stands, and will save the snapshot to a path that starts with the path you specify, and a '/' with the tag you specify at the end*
*You can do a set command and specify that you want to tag the data at the path. Tagging will take a snapshot of the data as it currently stands, and will save the snapshot to a new path in `/_TAGS`*
```javascript

var randomTag = require('shortid').generate();

my_client_instance.set('e2e_test1/test/tag', {property1:'property1',property2:'property2',property3:'property3'}, {tag:randomTag}, function(e, result){
my_client_instance.set('path/with/existing/data', null, {tag:'tagName'}, function(e, result) {});
my_client_instance.get('_TAGS/path/with/existing/data/*', function(e, result) {});

```
Expand Down
19 changes: 14 additions & 5 deletions lib/services/pubsub/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -702,11 +702,19 @@ PubSubService.prototype.emitToAudience = function (publication, channel, opts) {

if (audienceGroup[channel] != null && audienceGroup[channel] != undefined) {

var decoupledPublication;
var decoupledPublication, socket;

for (var sessionIndex in audienceGroup[channel]) {

if (this.__sessions[sessionIndex].intraProc) {
socket = this.__sessions[sessionIndex];

if (opts.noCluster) {
if (socket.session.info && socket.session.info.clusterName) {
continue;
}
}

if (socket.intraProc) {
//we deep clone for intra process comms
decoupledPublication = this.happn.utils.clone(publication, true);
} else {
Expand All @@ -725,14 +733,14 @@ PubSubService.prototype.emitToAudience = function (publication, channel, opts) {
}

decoupledPublication._meta.channel = channel.toString();
decoupledPublication._meta.sessionId = this.__sessions[sessionIndex].session.id;
decoupledPublication._meta.sessionId = socket.session.id;

delete decoupledPublication._meta.status;
delete decoupledPublication._meta.published;
delete decoupledPublication._meta.eventId;
delete decoupledPublication._meta._id;

this.__sessions[sessionIndex].write(decoupledPublication);
socket.write(decoupledPublication);
}
}
};
Expand All @@ -751,7 +759,8 @@ PubSubService.prototype.publish = function (message, payload) {

var opts = { // 1. to allow shared .serialized between repetitive calls to emitToAudience()
hasWildcard: false, // 2. to avert repetitive test for indexOf(*) in getAudienceGroup()
targetAction: action // 3. to avert repetitive parsing of channel string to determine action in getAudienceGroup()
targetAction: action, // 3. to avert repetitive parsing of channel string to determine action in getAudienceGroup()
noCluster: message.options.noCluster
};

this.emitToAudience(payload, messageChannel, opts);
Expand Down
Loading

0 comments on commit 6b1208a

Please sign in to comment.