From 64f3f4e032b57228807393d1e3c6c0e90d559814 Mon Sep 17 00:00:00 2001 From: Anton Wilhelm Date: Fri, 5 Aug 2016 16:09:07 +0200 Subject: [PATCH] add test case for 3 providers with timeouts, fix implementation --- src/utils/listener-registry.js | 51 ++++++----- .../listener-registry-load-balancingSpec.js | 85 +++++++++++++++++-- 2 files changed, 109 insertions(+), 27 deletions(-) diff --git a/src/utils/listener-registry.js b/src/utils/listener-registry.js index 22fad5b66..e5aab2b1c 100644 --- a/src/utils/listener-registry.js +++ b/src/utils/listener-registry.js @@ -38,6 +38,7 @@ var ListenerRegistry = function( type, options, clientRegistry ) { this._patterns = {}; this._providedRecords = {}; this._listenInProgress = {}; + // this._listenerTimeoutRegistery new TimeoutRegistry(); this._timeoutMap = {}; this._timedoutProviders = {}; }; @@ -47,47 +48,58 @@ TODO */ ListenerRegistry.prototype.handle = function( socketWrapper, message ) { + //console.log(socketWrapper.toString(), message, this._listenInProgress) var pattern = message.data[ 0 ]; var subscriptionName = message.data[ 1 ]; var indexOfTimedoutProvider = (this._timedoutProviders[ subscriptionName ] || []).findIndex( function( provider ) { return provider.socketWrapper === socketWrapper && provider.pattern === pattern; }); - var timedoutButReadyProvider = (this._timedoutProviders[ subscriptionName ] || []).filter( provider => provider.isReady )[0] - // console.log(socketWrapper.toString(), message, this._listenInProgress) + var lateProviders = (this._timedoutProviders[ subscriptionName ] || []).filter( provider => provider.lateAccept ) if (message.action === C.ACTIONS.LISTEN ) { this.addListener( socketWrapper, message ); } else if (message.action === C.ACTIONS.UNLISTEN ) { this.removeListener( socketWrapper, message ); } else if( this._timedoutProviders[ subscriptionName ] && indexOfTimedoutProvider !== -1) { - if ( message.action === C.ACTIONS.LISTEN_ACCEPT || message.action === C.ACTIONS.LISTEN_REJECT ) { - var provider = this._timedoutProviders[ subscriptionName ][ indexOfTimedoutProvider ]; - provider.isReady = true; + // hold this provider and choose what to do when current proider will answer + var provider = this._timedoutProviders[ subscriptionName ][ indexOfTimedoutProvider ]; + if ( message.action === C.ACTIONS.LISTEN_ACCEPT ) { + provider.lateAccept = true; provider.action = message.action provider.pattern = pattern + } else if ( message.action === C.ACTIONS.LISTEN_REJECT ) { + // ignore and remove from map + this._timedoutProviders[ subscriptionName ].splice( indexOfTimedoutProvider, 1 ); } } else if( this._listenInProgress[ subscriptionName ] ) { if (message.action === C.ACTIONS.LISTEN_ACCEPT ) { this.accept( socketWrapper, message ); - if( timedoutButReadyProvider ) { - var index = this._timedoutProviders[ subscriptionName ].indexOf( timedoutButReadyProvider ); + // send for all timedout provider which did an ACCEPT a PATTERN_REMOVED + //this._listenerTimeoutRegistery.rejectAllProviders( subscriptionName ); + lateProviders.forEach((function( provider ) { + provider.socketWrapper.send( + messageBuilder.getMsg( + this._type, + C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, + [ provider.pattern, subscriptionName ] + ) + ); + + }).bind( this ) ); + } else if (message.action === C.ACTIONS.LISTEN_REJECT) { + var provider = lateProviders.shift(); + if( provider ) { + var index = this._timedoutProviders[ subscriptionName ].indexOf( provider ); this._timedoutProviders[ subscriptionName ].splice( index, 1 ); - if( timedoutButReadyProvider.action === C.ACTIONS.LISTEN_ACCEPT ) { - timedoutButReadyProvider.socketWrapper.send( + this.accept( provider.socketWrapper, message ); + lateProviders.forEach((function( provider ) { + provider.socketWrapper.send( messageBuilder.getMsg( this._type, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, - [ timedoutButReadyProvider.pattern, subscriptionName ] + [ provider.pattern, subscriptionName ] ) ); - } - } - } else if (message.action === C.ACTIONS.LISTEN_REJECT) { - if( timedoutButReadyProvider ) { - var index = this._timedoutProviders[ subscriptionName ].indexOf( timedoutButReadyProvider ); - this._timedoutProviders[ subscriptionName ].splice( index, 1 ); - if( timedoutButReadyProvider.action === C.ACTIONS.LISTEN_ACCEPT ) { - this.accept( timedoutButReadyProvider.socketWrapper, message ); - } + }).bind( this ) ); } else { this.triggerNextProvider( subscriptionName ); } @@ -311,7 +323,6 @@ ListenerRegistry.prototype.triggerNextProvider = function ( name ) { if( provider ) { var timeoutId = setTimeout((function() { - // console.log('timing out now', provider.pattern) if( this._timedoutProviders[ name ] == null ) { this._timedoutProviders[ name ] = []; } diff --git a/test/utils/listener-registry-load-balancingSpec.js b/test/utils/listener-registry-load-balancingSpec.js index 185a2bcb1..b7af2bf2b 100644 --- a/test/utils/listener-registry-load-balancingSpec.js +++ b/test/utils/listener-registry-load-balancingSpec.js @@ -9,9 +9,12 @@ var ListenerRegistry = require( '../../src/utils/listener-registry' ), var subscribedTopics, sendToSubscribersMock, + provider1, + provider2, + provider3, client1, - client2; -var listenerRegistry, + client2, + listenerRegistry, options = { logger: new LoggerMock() }, clientRegistry = null; @@ -87,7 +90,7 @@ function verify(provider, actions, data) { ); } -fdescribe( 'listener-registry-load-balancing', function() { +describe( 'listener-registry-load-balancing', function() { beforeEach(function() { sendToSubscribersMock = jasmine.createSpy( 'sendToSubscribersMock' ); subscribedTopics = []; @@ -103,6 +106,8 @@ fdescribe( 'listener-registry-load-balancing', function() { provider1.toString = function() { return 'provider1' } provider2 = new SocketWrapper( new SocketMock(), options ); provider2.toString = function() { return 'provider2' } + provider3 = new SocketWrapper( new SocketMock(), options ); + provider3.toString = function() { return 'provider3' } listenerRegistry = new ListenerRegistry( 'R', options, clientRegistry ); expect( typeof listenerRegistry.addListener ).toBe( 'function' ); }); @@ -389,7 +394,7 @@ fdescribe( 'listener-registry-load-balancing', function() { 12. send publishing=false to the clients (new action: PUBLISHING // TODO: */ - it( 'first rejects, seconds accepts', function() { + it( 'first rejects, seconds accepts', function(done) { // 1 updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' ) // 2 @@ -405,7 +410,7 @@ fdescribe( 'listener-registry-load-balancing', function() { // 6 verify( provider1, null ) verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1'] ) - + return done() // 7 accept( provider2, 'a/[0-9]', 'a/1' ); @@ -597,6 +602,11 @@ fdescribe( 'listener-registry-load-balancing', function() { accept( provider2, 'a/[0-9]', 'a/1' ) // 8 + var msgString = msg( `${C.TOPIC.RECORD}|${C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER}|a/1|${C.TYPES.TRUE}+` ) + expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString) + expect( sendToSubscribersMock.calls.count() ).toBe( 1 ) + + // 9 verify( provider1, null ) done() }, 25) @@ -632,14 +642,20 @@ fdescribe( 'listener-registry-load-balancing', function() { // 7 accept( provider1, 'a/.*', 'a/1', true ) // hold this provider + expect( sendToSubscribersMock.calls.count() ).toBe( 0 ) // 8 accept( provider2, 'a/[0-9]', 'a/1' ) // use this provider -> provide 1 should be droped // 9 - verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/.*', 'a/1'] ) + var msgString = msg( `${C.TOPIC.RECORD}|${C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER}|a/1|${C.TYPES.TRUE}+` ) + expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString) + expect( sendToSubscribersMock.calls.count() ).toBe( 1 ) // 10 + verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/.*', 'a/1'] ) + + // 11 verify( provider1, null ) done() }, 25) @@ -670,7 +686,6 @@ fdescribe( 'listener-registry-load-balancing', function() { // 5 setTimeout(function() { - console.log('after timeout') // 6 verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1'] ) @@ -686,10 +701,66 @@ fdescribe( 'listener-registry-load-balancing', function() { expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString) expect( sendToSubscribersMock.calls.count() ).toBe( 1 ) + verify( provider1, null ) + done() }, 25) }); + it( 'provider 1 and 2 times out and 3 rejects, 1 and 2 accepts later and 1 wins', function(done) { + // 1 + updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' ) + // 2 + updateRegistryAndVerify( provider2, C.ACTIONS.LISTEN, 'a/[0-9]' ) + // 3 + updateRegistryAndVerify( provider3, C.ACTIONS.LISTEN, 'a/[1]' ) + + // 4 + subscribe( 'a/1', client1 ) + + // 4 + verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1'] ) + + // 5 + setTimeout(function() { + // 6 + verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1'] ) + + // 7 + setTimeout(function() { + // 8 + verify( provider3, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[1]', 'a/1'] ) + + // 9 + accept( provider1, 'a/.*', 'a/1', true ) // in pending state + expect( sendToSubscribersMock.calls.count() ).toBe( 0 ) + + // 10 + accept( provider2, 'a/[0-9]', 'a/1', true ) // in pending state + expect( sendToSubscribersMock.calls.count() ).toBe( 0 ) + + // 11 + reject( provider3, 'a/[0-9]', 'a/1', true ) // should let provder 1 do the work + + // 12 + var msgString = msg( `${C.TOPIC.RECORD}|${C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER}|a/1|${C.TYPES.TRUE}+` ) + expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString) + expect( sendToSubscribersMock.calls.count() ).toBe( 1 ) + + // 13 + verify( provider1, null ) + + // 14 + verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/[0-9]', 'a/1'] ) + + done() + + }, 25) + + + }, 25) + }); + // +++ Interval ( Not mandotory, can be null ) /*