Skip to content

Commit

Permalink
add more test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
timaschew committed Aug 4, 2016
1 parent d1723df commit 2e67b3e
Show file tree
Hide file tree
Showing 4 changed files with 411 additions and 91 deletions.
1 change: 1 addition & 0 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ exports.ACTIONS.LISTEN = 'L';
exports.ACTIONS.UNLISTEN = 'UL';
exports.ACTIONS.LISTEN_ACCEPT = 'LA';
exports.ACTIONS.LISTEN_REJECT = 'LR';
exports.ACTIONS.SUBSCRIPTION_HAS_PROVIDER = 'SH';
exports.ACTIONS.SUBSCRIPTIONS_FOR_PATTERN_FOUND = 'SF';
exports.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND = 'SP';
exports.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED = 'SR';
Expand Down
112 changes: 83 additions & 29 deletions src/utils/listener-registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ var C = require( '../constants/constants' ),
* notified with a SUBSCRIPTION_FOR_PATTERN_REMOVED action
*
* This class manages the matching of patterns and record names. The subscription /
* notification logic is handled by this._subscriptionRegistry
* notification logic is handled by this._providerRegistry
*
* @constructor
*
* @param {Object} options DeepStream options
* @param {SubscriptionRegistry} parentSubscriptionRegistry The SubscriptionRegistry containing the record subscriptions
* to allow new listeners to be notified of existing subscriptions
* @param {SubscriptionRegistry} clientRegistry The SubscriptionRegistry containing the record subscriptions
* to allow new listeners to be notified of existing subscriptions
*/
var ListenerRegistry = function( type, options, parentSubscriptionRegistry ) {
var ListenerRegistry = function( type, options, clientRegistry ) {
this._type = type;
this._options = options;
this._parentSubscriptionRegistry = parentSubscriptionRegistry;
this._subscriptionRegistry = new SubscriptionRegistry( options, this._type );
this._subscriptionRegistry.setAction( 'subscribe', C.ACTIONS.LISTEN );
this._subscriptionRegistry.setAction( 'unsubscribe', C.ACTIONS.UNLISTEN );
this._clientRegistry = clientRegistry;
this._providerRegistry = new SubscriptionRegistry( options, this._type );
this._providerRegistry.setAction( 'subscribe', C.ACTIONS.LISTEN );
this._providerRegistry.setAction( 'unsubscribe', C.ACTIONS.UNLISTEN );
this._patterns = {};
this._providedRecords = {};
this._listenInProgress = {};
Expand All @@ -51,13 +51,7 @@ ListenerRegistry.prototype.handle = function( socketWrapper, message ) {
this.removeListener( socketWrapper, message );
} else if( this._listenInProgress[ message.data[ 1 ] ] ) {
if (message.action === C.ACTIONS.LISTEN_ACCEPT ) {
this._providedRecords[ message.data[ 1 ] ] = {
socketWrapper: socketWrapper,
pattern: message.data[ 0 ]
}
// tell all subscribers that this is being published
delete this._listenInProgress[ message.data[ 1 ] ];
// TODO: clear timeout
this.accept( socketWrapper, message );
} else if (message.action === C.ACTIONS.LISTEN_REJECT) {
// try next listener
this.triggerNextProvider( message.data[ 1 ] );
Expand All @@ -68,15 +62,52 @@ ListenerRegistry.prototype.handle = function( socketWrapper, message ) {
// send error that accepting or rejecting listen pattern / subscription
// that isn't being asked for
}
}

/*
TODO
*/

ListenerRegistry.prototype.accept = function( socketWrapper, message ) {
var pattern = message.data[ 0 ];
var subscriptionName = message.data[ 1 ];
this._providedRecords[ subscriptionName ] = {
socketWrapper: socketWrapper,
pattern: pattern
}

this._clientRegistry.sendToSubscribers(
subscriptionName,
createHasProviderMessage( true, this._type, subscriptionName )
);

socketWrapper.socket.once( 'close', (function() {
this._clientRegistry.sendToSubscribers(
subscriptionName,
createHasProviderMessage( false, this._type, subscriptionName )
);
}).bind( this ) );

delete this._listenInProgress[ subscriptionName ];

// TODO: clear timeout
}

function createHasProviderMessage(hasProvider, type, subscriptionName) {
return messageBuilder.getMsg(
type,
C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER,
[subscriptionName, (hasProvider ? C.TYPES.TRUE : C.TYPES.FALSE)]
);
}

/*
TODO
*/

ListenerRegistry.prototype.hasActiveProvider = function( susbcriptionName ) {
// do i have a local provider ( i know the provider pattern, the subscription name and the provider socket
// does any other deepstream node actually provide this subscriptionName
return !!this._providedRecords[ susbcriptionName ];
}

Expand All @@ -102,9 +133,10 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) {
return;
}

var inSubscriptionRegistry = this._subscriptionRegistry.isSubscriber( socketWrapper );
this._subscriptionRegistry.subscribe( pattern, socketWrapper );
var inSubscriptionRegistry = this._providerRegistry.isSubscriber( socketWrapper );

if( !inSubscriptionRegistry ) {
this._providerRegistry.subscribe( pattern, socketWrapper );
socketWrapper.socket.once( 'close', this._reconcilePatterns.bind( this ) );
}

Expand All @@ -114,11 +146,12 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) {
}

// Notify socketWrapper of existing subscriptions that match the provided pattern
existingSubscriptions = this._parentSubscriptionRegistry.getNames();
existingSubscriptions = this._clientRegistry.getNames();
for( i = 0; i < existingSubscriptions.length; i++ ) {
name = existingSubscriptions[ i ];
if( name.match( regExp ) ) {
if( this._listenInProgress[ name ] ) {
// if already in queue do not add
this._listenInProgress[ name ].push({
socketWrapper: socketWrapper,
pattern: pattern
Expand All @@ -129,6 +162,8 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) {
}
}
}

// now do the same thing but with remote subscriptions
};

/**
Expand All @@ -143,7 +178,7 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) {
ListenerRegistry.prototype.sendSnapshot = function( socketWrapper, message ) {
var i, matchingNames = [];
var pattern = this._getPattern( socketWrapper, message );
var existingSubscriptions = this._parentSubscriptionRegistry.getNames();
var existingSubscriptions = this._clientRegistry.getNames();
var regExp = this._validatePattern( socketWrapper, pattern );

if( !regExp ) {
Expand Down Expand Up @@ -172,7 +207,7 @@ ListenerRegistry.prototype.removeListener = function( socketWrapper, message ) {
var pattern = this._getPattern( socketWrapper, message );

if( pattern ) {
this._subscriptionRegistry.unsubscribe( pattern, socketWrapper );
this._providerRegistry.unsubscribe( pattern, socketWrapper );
this._reconcilePatterns();
}

Expand All @@ -198,15 +233,27 @@ ListenerRegistry.prototype.removeListener = function( socketWrapper, message ) {
* @public
* @returns {void}
*/
ListenerRegistry.prototype.onSubscriptionMade = function( name ) {
ListenerRegistry.prototype.onSubscriptionMade = function( name, socketWrapper, count ) {
var pattern, message;
var action = C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND;

this._listenInProgress[ name ] = [];
if( this.hasActiveProvider( name )) {
socketWrapper.send( messageBuilder.getMsg(
this._type, C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER, [name, C.TYPES.TRUE]
) );
return;
}
this.createListenMap( pattern, name );

this.triggerNextProvider( name );
};

ListenerRegistry.prototype.createListenMap = function ( pattern, name ) {
// Creating the map
this._listenInProgress[ name ] = [];
for( pattern in this._patterns ) {
if( this._patterns[ pattern ].test( name ) ) {
var providersForPattern = this._subscriptionRegistry.getSubscribers( pattern );
var providersForPattern = this._providerRegistry.getSubscribers( pattern );
for( var i = 0; i < providersForPattern.length; i++ ) {
this._listenInProgress[ name ].push( {
pattern: pattern,
Expand All @@ -215,8 +262,7 @@ ListenerRegistry.prototype.onSubscriptionMade = function( name ) {
}
}
}
this.triggerNextProvider( name );
};
}

ListenerRegistry.prototype.triggerNextProvider = function ( name ) {
// TODO: creat a timeout, if timeout happens -> treat it as a reject
Expand All @@ -238,11 +284,19 @@ ListenerRegistry.prototype.triggerNextProvider = function ( name ) {
* @public
* @returns {void}
*/
ListenerRegistry.prototype.onSubscriptionRemoved = function( name ) {
var provider = this._providedRecords[ name ];
if ( provider == null ) {
ListenerRegistry.prototype.onSubscriptionRemoved = function( name, socketWrapper, count ) {
// if there is no provider OR someone else is already listening (provider)
// ensure that clients always initialize the the provider before a normal read
if( !this.hasActiveProvider( name ) || count > 1 ) {
return;
}

// if there is still one proivder which is not the passed socketWrapper
if( count === 1 && this._providedRecords[ name ].socketWrapper !== socketWrapper) {
return;
}

var provider = this._providedRecords[ name ];
provider.socketWrapper.send(
messageBuilder.getMsg(
this._type, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, [ provider.pattern, name ]
Expand Down Expand Up @@ -323,7 +377,7 @@ ListenerRegistry.prototype._onMsgDataError = function( socketWrapper, errorMsg )
*/
ListenerRegistry.prototype._reconcilePatterns = function() {
for( var pattern in this._patterns ) {
if( !this._subscriptionRegistry.hasSubscribers( pattern ) ) {
if( !this._providerRegistry.hasSubscribers( pattern ) ) {
delete this._patterns[ pattern ];
}
}
Expand Down
18 changes: 10 additions & 8 deletions src/utils/subscription-registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ SubscriptionRegistry.prototype.sendToSubscribers = function( name, msgString, se
SubscriptionRegistry.prototype.subscribe = function( name, socketWrapper ) {
if( this._subscriptions[ name ] === undefined ) {
this._subscriptions[ name ] = [];
if( this._subscriptionListener ) {
this._subscriptionListener.onSubscriptionMade( name, socketWrapper );
}
}

if( this._subscriptions[ name ].indexOf( socketWrapper ) !== -1 ) {
Expand All @@ -98,6 +95,11 @@ SubscriptionRegistry.prototype.subscribe = function( name, socketWrapper ) {
}

this._subscriptions[ name ].push( socketWrapper );

if( this._subscriptionListener ) {
this._subscriptionListener.onSubscriptionMade( name, socketWrapper, this._subscriptions[ name ].length );
}

var logMsg = 'for ' + this._topic + ':' + name + ' by ' + socketWrapper.user;
this._options.logger.log( C.LOG_LEVEL.DEBUG, this._constants.SUBSCRIBE, logMsg );
socketWrapper.sendMessage( this._topic, C.ACTIONS.ACK, [ this._constants.SUBSCRIBE, name ] );
Expand Down Expand Up @@ -134,14 +136,14 @@ SubscriptionRegistry.prototype.unsubscribe = function( name, socketWrapper, sile

if( this._subscriptions[ name ].length === 1 ) {
delete this._subscriptions[ name ];

if( this._subscriptionListener ) {
this._subscriptionListener.onSubscriptionRemoved( name, socketWrapper );
}
} else {
this._subscriptions[ name ].splice( this._subscriptions[ name ].indexOf( socketWrapper ), 1 );
}

if( this._subscriptionListener ) {
this._subscriptionListener.onSubscriptionRemoved( name, socketWrapper, (this._subscriptions[ name ] || [] ).length );
}

if( !silent ) {
var logMsg = 'for ' + this._topic + ':' + name + ' by ' + socketWrapper.user;
this._options.logger.log( C.LOG_LEVEL.DEBUG, this._constants.UNSUBSCRIBE, logMsg );
Expand Down Expand Up @@ -263,4 +265,4 @@ SubscriptionRegistry.prototype.setSubscriptionListener = function( subscriptionL
this._subscriptionListener = subscriptionListener;
};

module.exports = SubscriptionRegistry;
module.exports = SubscriptionRegistry;
Loading

0 comments on commit 2e67b3e

Please sign in to comment.