diff --git a/src/constants/constants.js b/src/constants/constants.js index e7b8ca799..c71bb9370 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -90,6 +90,7 @@ exports.ACTIONS.LISTEN = 'L'; exports.ACTIONS.UNLISTEN = 'UL'; exports.ACTIONS.LISTEN_ACCEPT = 'LA'; exports.ACTIONS.LISTEN_REJECT = 'LR'; +exports.ACTIONS.SUBSCRIPTION_HAS_PROVIDER = 'SH'; exports.ACTIONS.SUBSCRIPTIONS_FOR_PATTERN_FOUND = 'SF'; exports.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND = 'SP'; exports.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED = 'SR'; diff --git a/src/utils/listener-registry.js b/src/utils/listener-registry.js index 688b9df09..6d47889c0 100644 --- a/src/utils/listener-registry.js +++ b/src/utils/listener-registry.js @@ -20,21 +20,21 @@ var C = require( '../constants/constants' ), * notified with a SUBSCRIPTION_FOR_PATTERN_REMOVED action * * This class manages the matching of patterns and record names. The subscription / - * notification logic is handled by this._subscriptionRegistry + * notification logic is handled by this._providerRegistry * * @constructor * * @param {Object} options DeepStream options - * @param {SubscriptionRegistry} parentSubscriptionRegistry The SubscriptionRegistry containing the record subscriptions - * to allow new listeners to be notified of existing subscriptions + * @param {SubscriptionRegistry} clientRegistry The SubscriptionRegistry containing the record subscriptions + * to allow new listeners to be notified of existing subscriptions */ -var ListenerRegistry = function( type, options, parentSubscriptionRegistry ) { +var ListenerRegistry = function( type, options, clientRegistry ) { this._type = type; this._options = options; - this._parentSubscriptionRegistry = parentSubscriptionRegistry; - this._subscriptionRegistry = new SubscriptionRegistry( options, this._type ); - this._subscriptionRegistry.setAction( 'subscribe', C.ACTIONS.LISTEN ); - this._subscriptionRegistry.setAction( 'unsubscribe', C.ACTIONS.UNLISTEN ); + this._clientRegistry = clientRegistry; + this._providerRegistry = new SubscriptionRegistry( options, this._type ); + this._providerRegistry.setAction( 'subscribe', C.ACTIONS.LISTEN ); + this._providerRegistry.setAction( 'unsubscribe', C.ACTIONS.UNLISTEN ); this._patterns = {}; this._providedRecords = {}; this._listenInProgress = {}; @@ -51,13 +51,7 @@ ListenerRegistry.prototype.handle = function( socketWrapper, message ) { this.removeListener( socketWrapper, message ); } else if( this._listenInProgress[ message.data[ 1 ] ] ) { if (message.action === C.ACTIONS.LISTEN_ACCEPT ) { - this._providedRecords[ message.data[ 1 ] ] = { - socketWrapper: socketWrapper, - pattern: message.data[ 0 ] - } - // tell all subscribers that this is being published - delete this._listenInProgress[ message.data[ 1 ] ]; - // TODO: clear timeout + this.accept( socketWrapper, message ); } else if (message.action === C.ACTIONS.LISTEN_REJECT) { // try next listener this.triggerNextProvider( message.data[ 1 ] ); @@ -68,8 +62,43 @@ ListenerRegistry.prototype.handle = function( socketWrapper, message ) { // send error that accepting or rejecting listen pattern / subscription // that isn't being asked for } +} +/* +TODO +*/ +ListenerRegistry.prototype.accept = function( socketWrapper, message ) { + var pattern = message.data[ 0 ]; + var subscriptionName = message.data[ 1 ]; + this._providedRecords[ subscriptionName ] = { + socketWrapper: socketWrapper, + pattern: pattern + } + + this._clientRegistry.sendToSubscribers( + subscriptionName, + createHasProviderMessage( true, this._type, subscriptionName ) + ); + + socketWrapper.socket.once( 'close', (function() { + this._clientRegistry.sendToSubscribers( + subscriptionName, + createHasProviderMessage( false, this._type, subscriptionName ) + ); + }).bind( this ) ); + + delete this._listenInProgress[ subscriptionName ]; + + // TODO: clear timeout +} + +function createHasProviderMessage(hasProvider, type, subscriptionName) { + return messageBuilder.getMsg( + type, + C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER, + [subscriptionName, (hasProvider ? C.TYPES.TRUE : C.TYPES.FALSE)] + ); } /* @@ -77,6 +106,8 @@ TODO */ ListenerRegistry.prototype.hasActiveProvider = function( susbcriptionName ) { + // do i have a local provider ( i know the provider pattern, the subscription name and the provider socket + // does any other deepstream node actually provide this subscriptionName return !!this._providedRecords[ susbcriptionName ]; } @@ -102,9 +133,10 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) { return; } - var inSubscriptionRegistry = this._subscriptionRegistry.isSubscriber( socketWrapper ); - this._subscriptionRegistry.subscribe( pattern, socketWrapper ); + var inSubscriptionRegistry = this._providerRegistry.isSubscriber( socketWrapper ); + if( !inSubscriptionRegistry ) { + this._providerRegistry.subscribe( pattern, socketWrapper ); socketWrapper.socket.once( 'close', this._reconcilePatterns.bind( this ) ); } @@ -114,11 +146,12 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) { } // Notify socketWrapper of existing subscriptions that match the provided pattern - existingSubscriptions = this._parentSubscriptionRegistry.getNames(); + existingSubscriptions = this._clientRegistry.getNames(); for( i = 0; i < existingSubscriptions.length; i++ ) { name = existingSubscriptions[ i ]; if( name.match( regExp ) ) { if( this._listenInProgress[ name ] ) { + // if already in queue do not add this._listenInProgress[ name ].push({ socketWrapper: socketWrapper, pattern: pattern @@ -129,6 +162,8 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) { } } } + + // now do the same thing but with remote subscriptions }; /** @@ -143,7 +178,7 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) { ListenerRegistry.prototype.sendSnapshot = function( socketWrapper, message ) { var i, matchingNames = []; var pattern = this._getPattern( socketWrapper, message ); - var existingSubscriptions = this._parentSubscriptionRegistry.getNames(); + var existingSubscriptions = this._clientRegistry.getNames(); var regExp = this._validatePattern( socketWrapper, pattern ); if( !regExp ) { @@ -172,7 +207,7 @@ ListenerRegistry.prototype.removeListener = function( socketWrapper, message ) { var pattern = this._getPattern( socketWrapper, message ); if( pattern ) { - this._subscriptionRegistry.unsubscribe( pattern, socketWrapper ); + this._providerRegistry.unsubscribe( pattern, socketWrapper ); this._reconcilePatterns(); } @@ -198,15 +233,27 @@ ListenerRegistry.prototype.removeListener = function( socketWrapper, message ) { * @public * @returns {void} */ -ListenerRegistry.prototype.onSubscriptionMade = function( name ) { +ListenerRegistry.prototype.onSubscriptionMade = function( name, socketWrapper, count ) { var pattern, message; var action = C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND; - this._listenInProgress[ name ] = []; + if( this.hasActiveProvider( name )) { + socketWrapper.send( messageBuilder.getMsg( + this._type, C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER, [name, C.TYPES.TRUE] + ) ); + return; + } + this.createListenMap( pattern, name ); + this.triggerNextProvider( name ); +}; + +ListenerRegistry.prototype.createListenMap = function ( pattern, name ) { + // Creating the map + this._listenInProgress[ name ] = []; for( pattern in this._patterns ) { if( this._patterns[ pattern ].test( name ) ) { - var providersForPattern = this._subscriptionRegistry.getSubscribers( pattern ); + var providersForPattern = this._providerRegistry.getSubscribers( pattern ); for( var i = 0; i < providersForPattern.length; i++ ) { this._listenInProgress[ name ].push( { pattern: pattern, @@ -215,8 +262,7 @@ ListenerRegistry.prototype.onSubscriptionMade = function( name ) { } } } - this.triggerNextProvider( name ); -}; +} ListenerRegistry.prototype.triggerNextProvider = function ( name ) { // TODO: creat a timeout, if timeout happens -> treat it as a reject @@ -238,11 +284,19 @@ ListenerRegistry.prototype.triggerNextProvider = function ( name ) { * @public * @returns {void} */ -ListenerRegistry.prototype.onSubscriptionRemoved = function( name ) { - var provider = this._providedRecords[ name ]; - if ( provider == null ) { +ListenerRegistry.prototype.onSubscriptionRemoved = function( name, socketWrapper, count ) { + // if there is no provider OR someone else is already listening (provider) + // ensure that clients always initialize the the provider before a normal read + if( !this.hasActiveProvider( name ) || count > 1 ) { + return; + } + + // if there is still one proivder which is not the passed socketWrapper + if( count === 1 && this._providedRecords[ name ].socketWrapper !== socketWrapper) { return; } + + var provider = this._providedRecords[ name ]; provider.socketWrapper.send( messageBuilder.getMsg( this._type, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, [ provider.pattern, name ] @@ -323,7 +377,7 @@ ListenerRegistry.prototype._onMsgDataError = function( socketWrapper, errorMsg ) */ ListenerRegistry.prototype._reconcilePatterns = function() { for( var pattern in this._patterns ) { - if( !this._subscriptionRegistry.hasSubscribers( pattern ) ) { + if( !this._providerRegistry.hasSubscribers( pattern ) ) { delete this._patterns[ pattern ]; } } diff --git a/src/utils/subscription-registry.js b/src/utils/subscription-registry.js index a3a8a9885..686774139 100644 --- a/src/utils/subscription-registry.js +++ b/src/utils/subscription-registry.js @@ -76,9 +76,6 @@ SubscriptionRegistry.prototype.sendToSubscribers = function( name, msgString, se SubscriptionRegistry.prototype.subscribe = function( name, socketWrapper ) { if( this._subscriptions[ name ] === undefined ) { this._subscriptions[ name ] = []; - if( this._subscriptionListener ) { - this._subscriptionListener.onSubscriptionMade( name, socketWrapper ); - } } if( this._subscriptions[ name ].indexOf( socketWrapper ) !== -1 ) { @@ -98,6 +95,11 @@ SubscriptionRegistry.prototype.subscribe = function( name, socketWrapper ) { } this._subscriptions[ name ].push( socketWrapper ); + + if( this._subscriptionListener ) { + this._subscriptionListener.onSubscriptionMade( name, socketWrapper, this._subscriptions[ name ].length ); + } + var logMsg = 'for ' + this._topic + ':' + name + ' by ' + socketWrapper.user; this._options.logger.log( C.LOG_LEVEL.DEBUG, this._constants.SUBSCRIBE, logMsg ); socketWrapper.sendMessage( this._topic, C.ACTIONS.ACK, [ this._constants.SUBSCRIBE, name ] ); @@ -134,14 +136,14 @@ SubscriptionRegistry.prototype.unsubscribe = function( name, socketWrapper, sile if( this._subscriptions[ name ].length === 1 ) { delete this._subscriptions[ name ]; - - if( this._subscriptionListener ) { - this._subscriptionListener.onSubscriptionRemoved( name, socketWrapper ); - } } else { this._subscriptions[ name ].splice( this._subscriptions[ name ].indexOf( socketWrapper ), 1 ); } + if( this._subscriptionListener ) { + this._subscriptionListener.onSubscriptionRemoved( name, socketWrapper, (this._subscriptions[ name ] || [] ).length ); + } + if( !silent ) { var logMsg = 'for ' + this._topic + ':' + name + ' by ' + socketWrapper.user; this._options.logger.log( C.LOG_LEVEL.DEBUG, this._constants.UNSUBSCRIBE, logMsg ); @@ -263,4 +265,4 @@ SubscriptionRegistry.prototype.setSubscriptionListener = function( subscriptionL this._subscriptionListener = subscriptionListener; }; -module.exports = SubscriptionRegistry; \ No newline at end of file +module.exports = SubscriptionRegistry; diff --git a/test/utils/listener-registry-load-balancingSpec.js b/test/utils/listener-registry-load-balancingSpec.js index 0691a5d1a..726ba4ef1 100644 --- a/test/utils/listener-registry-load-balancingSpec.js +++ b/test/utils/listener-registry-load-balancingSpec.js @@ -7,10 +7,13 @@ var ListenerRegistry = require( '../../src/utils/listener-registry' ), LoggerMock = require( '../mocks/logger-mock' ), noopMessageConnector = require( '../../src/default-plugins/noop-message-connector' ); -var subscribedTopics; +var subscribedTopics, + sendToSubscribersMock, + client1, + client2; var listenerRegistry, options = { logger: new LoggerMock() }, - recordSubscriptionRegistryMock = null; + clientRegistry = null; function updateRegistryAndVerify(socketWrapper, action, pattern) { @@ -18,19 +21,16 @@ function updateRegistryAndVerify(socketWrapper, action, pattern) { verify(socketWrapper, [C.ACTIONS.ACK, action], pattern) } -function subcribe( subscriptionName, useParentSubscriptionRegistry ) { - if( useParentSubscriptionRegistry !== false ) { - useParentSubscriptionRegistry = true; - } - listenerRegistry.onSubscriptionMade( subscriptionName ); - if( useParentSubscriptionRegistry ) { - subscribedTopics.push(subscriptionName) - } - +function subscribe( subscriptionName, socketWrapper, count) { + listenerRegistry.onSubscriptionMade( subscriptionName, socketWrapper, undefined ); + subscribedTopics.push(subscriptionName) } -function unsubscribe( subscriptionName ) { - listenerRegistry.onSubscriptionRemoved( subscriptionName ); +function unsubscribe( subscriptionName, socketWrapper, count) { + if(count == null) { + count = 0 + } + listenerRegistry.onSubscriptionRemoved( subscriptionName, socketWrapper, count ); } function updateRegistry( socket, action, data ) { @@ -53,7 +53,7 @@ function reject(socketWrapper, pattern, subscriptionName) { } var messageHistory = {} -function verify(provider, actions, pattern, subscriptionName) { +function verify(provider, actions, data) { var messageIndex = 0; var options = {}; var lastArg = arguments[ arguments.length - 1]; @@ -72,10 +72,8 @@ function verify(provider, actions, pattern, subscriptionName) { if( !( actions instanceof Array ) ) { actions = [ actions ]; } - if( subscriptionName == null || typeof subscriptionName !== 'string' ) { - subscriptionName = ''; - } else { - subscriptionName = '|' + subscriptionName; + if( !( data instanceof Array ) ) { + data = [ data ]; } var message = provider.socket.getMsg( messageIndex ); messageHistory[provider] = { @@ -83,23 +81,27 @@ function verify(provider, actions, pattern, subscriptionName) { size: provider.socket.getMsgSize() } expect( message ).toBe( - msg( `${C.TOPIC.RECORD}|${actions.join('|')}|${pattern}${subscriptionName}+` ) + msg( `${C.TOPIC.RECORD}|${actions.join('|')}|${data.join('|')}+` ) ); } fdescribe( 'listener-registry-load-balancing', function() { beforeEach(function() { + sendToSubscribersMock = jasmine.createSpy( 'sendToSubscribersMock' ); subscribedTopics = []; - recordSubscriptionRegistryMock = { + clientRegistry = { getNames: function() { return subscribedTopics; - } + }, + sendToSubscribers: sendToSubscribersMock }; + client1 = new SocketWrapper( new SocketMock(), options ); + client2 = new SocketWrapper( new SocketMock(), options ); provider1 = new SocketWrapper( new SocketMock(), options ); provider1.toString = function() { return 'provider1' } provider2 = new SocketWrapper( new SocketMock(), options ); provider2.toString = function() { return 'provider2' } - listenerRegistry = new ListenerRegistry( 'R', options, recordSubscriptionRegistryMock ); + listenerRegistry = new ListenerRegistry( 'R', options, clientRegistry ); expect( typeof listenerRegistry.addListener ).toBe( 'function' ); }); @@ -117,20 +119,20 @@ fdescribe( 'listener-registry-load-balancing', function() { 10. recieving unknown accept/reject throws an error */ - it( 'single provider accepts a subscription', function() { + it( 'accepts a subscription', function() { // 1 updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' ) // 2 - subcribe( 'a/1' ) + subscribe( 'a/1', client1 ) // 3 - verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'a/.*', 'a/1') + verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1']) // 4 TODO: accept( provider1, 'a/.*', 'a/1' ); // 6 - unsubscribe( 'a/1' ); + unsubscribe( 'a/1' , client1 ); // 7 - verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, 'a/.*', 'a/1' ) + verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/.*', 'a/1'] ) // 8 TODO: // 9 TODO: fix implementaiton for this expectation @@ -148,25 +150,25 @@ fdescribe( 'listener-registry-load-balancing', function() { 6. provider should not get a SR */ - it( 'single provider rejects a subscription', function() { + it( 'rejects a subscription', function() { // 1 updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' ) // 2 - subcribe( 'a/1' ) + subscribe( 'a/1', client1 ) // 3 - verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'a/.*', 'a/1') + verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1']) // 4 reject( provider1, 'a/.*', 'a/1' ); // 5 - unsubscribe( 'a/1' ); + unsubscribe( 'a/1' , client1 ); // 6 verify(provider1, null) }); /* - 0. subscription already made for b/1 (recordSubscriptionRegistryMock) + 0. subscription already made for b/1 (clientRegistry) 1. provider does listen a/.* 2. provider gets a SP 3. provider responds with REJECT @@ -174,7 +176,7 @@ fdescribe( 'listener-registry-load-balancing', function() { 5. provider should not get a SR */ - it( 'single provider rejects a subscription with a pattern for which subscriptions already exists', function() { + it( 'rejects a subscription with a pattern for which subscriptions already exists', function() { // 0 subscribedTopics.push( 'b/1' ) // 1 @@ -182,18 +184,18 @@ fdescribe( 'listener-registry-load-balancing', function() { verify( provider1, [C.ACTIONS.ACK, C.ACTIONS.LISTEN], 'b/.*', {index: 1}) // 2 - verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'b/.*', 'b/1', {index: 0}) + verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['b/.*', 'b/1'], {index: 0}) // 3 reject( provider1, 'b/.*', 'b/1' ); // 4 - unsubscribe( 'b/1' ); + unsubscribe( 'b/1' , client1 ); // 5 verify( provider1, null) }); /* - 0. subscription already made for b/1 (recordSubscriptionRegistryMock) + 0. subscription already made for b/1 (clientRegistry) 1. provider does listen b/.* 2. provider gets a SP 3. provider responds with ACCEPT @@ -203,7 +205,7 @@ fdescribe( 'listener-registry-load-balancing', function() { 7. send publishing=false to the clients // TODO */ - it( 'single provider accepts a subscription with a pattern for which subscriptions already exists', function() { + it( 'accepts a subscription with a pattern for which subscriptions already exists', function() { // 0 subscribedTopics.push('b/1') @@ -211,7 +213,7 @@ fdescribe( 'listener-registry-load-balancing', function() { updateRegistry( provider1, C.ACTIONS.LISTEN, [ 'b/.*' ] ) verify(provider1, [C.ACTIONS.ACK, C.ACTIONS.LISTEN], 'b/.*', {index: 1}) // 2 - verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'b/.*', 'b/1', {index: 0}) + verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['b/.*', 'b/1'], {index: 0}) // 3 accept( provider1, 'b/.*', 'b/1' ); @@ -219,14 +221,153 @@ fdescribe( 'listener-registry-load-balancing', function() { // 4 TODO: // 5 - unsubscribe( 'b/1' ); + unsubscribe( 'b/1' , client1 ); // 6 - verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, 'b/.*', 'b/1') + verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['b/.*', 'b/1']) // 7 TODO: }); + + /* + 1. provider 1 does listen a/.* + 2. client 1 requests a/1 + 3. provider 1 gets a SP + 4. provider 1 responds with ACCEPT + 5. clients gets has provider=true + 6. 2nd client requests a/1 + 7. provider doesnt get told anything + 8. client 2 gets publishing=true + */ + + it( 'accepts a subscription for 2 clients', function() { + // 1 + updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' ) + // 2 + subscribe( 'a/1', client1 ) + // 3 + verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1']) + + // 4 + accept( provider1, 'a/.*', 'a/1' ); + + // 5 + var msgString = msg( `${C.TOPIC.RECORD}|${C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER}|a/1|${C.TYPES.TRUE}+` ) + expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString) + expect( sendToSubscribersMock.calls.count() ).toBe( 1 ) + + // 6 + subscribe( 'a/1', client2 ) + + // 7 + verify( provider1, null ) + + // 8 + verify( client2, C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER, ['a/1', C.TYPES.TRUE] ) + + // 6 + //unsubscribe( 'a/1' , client1 ); + // 7 + //verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/.*', 'a/1'] ) + // 8 TODO: + + // 9 TODO: fix implementaiton for this expectation + //expect( listenerRegistry.hasActiveProvider( 'a/1' ) ).toBe( false ); + + }); + + /* + 1. provider 1 does listen a/.* + 2. client 1 a/1 + 3. provider 1 gets a SP + 4. provider 1 responds with ACCEPT + 5. clients gets has provider=true + 6. provider 1 requests a/1 ( count == 2 ) + 7. client 1 discards a/1 ( count === 1) + 8. provider gets send Subscription removed because provider is the last subscriber + */ + + it( 'accepts a subscription for 2 clients', function() { + // 1 + updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' ) + // 2 + subscribe( 'a/1', client1 ) + // 3 + verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1']) + + // 4 + accept( provider1, 'a/.*', 'a/1' ) + + // 5 + var msgString = msg( `${C.TOPIC.RECORD}|${C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER}|a/1|${C.TYPES.TRUE}+` ) + expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString) + expect( sendToSubscribersMock.calls.count() ).toBe( 1 ) + + // 6 + subscribe( 'a/1', provider1 ) + verify( provider1, C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER, ['a/1', C.TYPES.TRUE] ) + + // 7 + unsubscribe( 'a/1' , client1, 1 ) + + // 7 + unsubscribe( 'a/1' , provider1, 1 ) + + // 8 + verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/.*', 'a/1'] ) + + }); + + /* + 0. subscription already made for b/1 (clientRegistry) + 1. provider does listen b/.* + 2. provider gets a SP + 3. provider responds with ACCEPT + 4. send HP=true to the clients // TODO + 5. another subscription made for b/1 by client 2 + 6. send HP=true to client 2 + */ + + it( 'accepts a subscription with a pattern for which subscriptions already exists and do another subscription afterwards', function() { + // 0 + subscribedTopics.push('b/1') + + // 1 + updateRegistry( provider1, C.ACTIONS.LISTEN, [ 'b/.*' ] ) + verify(provider1, [C.ACTIONS.ACK, C.ACTIONS.LISTEN], 'b/.*', {index: 1}) + // 2 + verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['b/.*', 'b/1'], {index: 0}) + + // 3 + accept( provider1, 'b/.*', 'b/1' ); + + var msgString = msg( `${C.TOPIC.RECORD}|${C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER}|b/1|${C.TYPES.TRUE}+` ) + expect( sendToSubscribersMock ).toHaveBeenCalledWith('b/1', msgString) + expect( sendToSubscribersMock.calls.count() ).toBe( 1 ) + + // 5 + subscribe( 'b/1', client2 ) + + // 6 + verify( client2, C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER, ['b/1', C.TYPES.TRUE] ) + + // // 7 + // unsubscribe( 'b/1' , client1); + + // // 8 + // verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['b/.*', 'b/1']) + + // 9 TODO: SUBSCRIPTION_HAS_PROVIDER = false + }); + + + + + + + + }); describe( 'with multiple providers', function(){ @@ -252,16 +393,16 @@ fdescribe( 'listener-registry-load-balancing', function() { // 2 updateRegistryAndVerify( provider2, C.ACTIONS.LISTEN, 'a/[0-9]' ) // 3 - subcribe( 'a/1' ) + subscribe( 'a/1', client1 ) // 4 - verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'a/.*', 'a/1') + verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1']) verify(provider2, null) // 5 reject( provider1, 'a/.*', 'a/1' ); // 6 verify( provider1, null ) - verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'a/[0-9]', 'a/1' ) + verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1'] ) // 7 accept( provider2, 'a/[0-9]', 'a/1' ); @@ -269,12 +410,12 @@ fdescribe( 'listener-registry-load-balancing', function() { // 8 TODO // 9 - unsubscribe( 'a/1' ); + unsubscribe( 'a/1' , client1 ); // 10 verify(provider1, null) // 11 - verify(provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, 'a/[0-9]', 'a/1') + verify(provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/[0-9]', 'a/1']) // 12 TODO: }); @@ -292,9 +433,9 @@ fdescribe( 'listener-registry-load-balancing', function() { // 2 updateRegistryAndVerify( provider2, C.ACTIONS.LISTEN, 'a/[0-9]' ) // 3 - subcribe( 'a/1' ) + subscribe( 'a/1', client1 ) // 4 - verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'a/.*', 'a/1') + verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1']) verify(provider2, null) // 5 @@ -306,12 +447,12 @@ fdescribe( 'listener-registry-load-balancing', function() { // 7 TODO // 9 - unsubscribe( 'a/1' ); + unsubscribe( 'a/1' , client1 ); // 10 verify(provider2, null) // 11 - verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, 'a/.*', 'a/1') + verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/.*', 'a/1']) }); @@ -329,9 +470,9 @@ fdescribe( 'listener-registry-load-balancing', function() { // 1 updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' ) // 2 - subcribe( 'a/1' ) + subscribe( 'a/1', client1 ) // 3 - verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'a/.*', 'a/1') + verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1']) // 4 updateRegistryAndVerify( provider2, C.ACTIONS.LISTEN, 'a/[0-9]' ) @@ -340,7 +481,7 @@ fdescribe( 'listener-registry-load-balancing', function() { reject( provider1, 'a/.*', 'a/1' ); // 6 - verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'a/[0-9]', 'a/1') + verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1']) verify( provider1, null ) // 7 @@ -364,10 +505,10 @@ fdescribe( 'listener-registry-load-balancing', function() { // 2 updateRegistryAndVerify( provider2, C.ACTIONS.LISTEN, 'a/[0-9]' ) // 3 - subcribe( 'a/1' ) + subscribe( 'a/1', client1 ) // 4 - verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, 'a/.*', 'a/1' ) + verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1'] ) verify( provider2, null ) // 5 @@ -382,6 +523,128 @@ fdescribe( 'listener-registry-load-balancing', function() { }); + /* + 1. provider does listen a/.* + 2. provider2 does listen a/[0-9] + 2. clients request a/1 + 3. provider gets a SP + 4. provider responds with ACCEPT + 5. send publishing=true to the clients (new action: PUBLISHING) // TODO: + 6. provider disconnects (emit.close()) + 7. send publishing=false to the clients (new action: PUBLISHING // TODO: + 8. provider2 gets a SP + 9. provider2 responds with Accept + 10. sending publishing=true + */ + it( 'provider 1 accepts a subscription and disconnects then provider 2 gets a SP', function() { + // TODO + // 1 + // updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' ) + // // 2 + // subscribe( 'a/1', client1 ) + // // 3 + // verify(provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1']) + + // // 4 TODO: + // accept( provider1, 'a/.*', 'a/1' ); + // // 6 + // unsubscribe( 'a/1' , client1 ); + // // 7 + // verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/.*', 'a/1'] ) + // // 8 TODO: + + // // 9 TODO: fix implementaiton for this expectation + // expect( listenerRegistry.hasActiveProvider( 'a/1' ) ).toBe( false ); + + // 10 + }); + + /** + Publisher Timeouts +*/ + + + /* + 1. provider 1 does listen a/.* + 2. provider 2 does listen a/[0-9] + 3. clients request a/1 + 4. provider 1 gets a SP + 5. provider 1 times out -> treat as REJECT + 7. provider 1 responds with Reject + 6. provider 2 gets a SP + 7. provider 2 responds with ACCEPT + 9. provider 1 does not get anything + */ + + xit( 'provider 1 times out, provider 2 accepts', function() { + // 1 + updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' ) + // 2 + updateRegistryAndVerify( provider2, C.ACTIONS.LISTEN, 'a/[0-9]' ) + // 3 + subscribe( 'a/1', client1 ) + + // 4 + verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1'] ) + verify( provider2, null ) + + // 5 + timeout( provider1, 'a/.*', 'a/1' ); + + + }); + + /* + 1. provider 1 does listen a/.* + 2. provider 2 does listen a/[0-9] + 3. clients request a/1 + 4. provider 1 gets a SP + 5. provider 1 times out -> REJECT + 6. provider 2 gets a SP + 7. provider 1 responds with ACCEPT + 6. provider 1 gets a SR + 7. provider 2 responds with ACCEPT + 9. provider 1 does not get anything + */ + + /* + 1. provider 1 does listen a/.* + 2. provider 2 does listen a/[0-9] + 3. clients request a/1 + 4. provider 1 gets a SP + 5. provider 1 times out -> REJECT + 6. provider 2 gets a SP + 7. provider 1 responds with ACCEPT ( hold in pending and accept if next publisher rejects ) + 7. provider 2 responds with REJECT + 9. client gets publish true + */ + + + // +++ Interval ( Not mandotory, can be null ) + + /* + 1. provider 1 does listen a/.* + 3. clients request a/1 + 4. provider 1 gets a SP + 5. provider 1 rejects + 6. some time passes... + 4. provider 1 gets a SP + ... + */ + + + // Force relisten enquire + + /* + 1. provider 1 does listen a/.* + 3. clients request a/1 + 4. provider 1 gets a SP + 5. provider 1 rejects + 6. provider 1 does a listen to a/.* again without doing an unlisten= + 4. provider 1 gets a SP for a/1 + ... + */ + }); });