Skip to content

Commit

Permalink
add test case for 3 providers with timeouts, fix implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
timaschew committed Aug 5, 2016
1 parent 99a261a commit 64f3f4e
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 27 deletions.
51 changes: 31 additions & 20 deletions src/utils/listener-registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var ListenerRegistry = function( type, options, clientRegistry ) {
this._patterns = {};
this._providedRecords = {};
this._listenInProgress = {};
// this._listenerTimeoutRegistery new TimeoutRegistry();
this._timeoutMap = {};
this._timedoutProviders = {};
};
Expand All @@ -47,47 +48,58 @@ TODO
*/

ListenerRegistry.prototype.handle = function( socketWrapper, message ) {
//console.log(socketWrapper.toString(), message, this._listenInProgress)
var pattern = message.data[ 0 ];
var subscriptionName = message.data[ 1 ];
var indexOfTimedoutProvider = (this._timedoutProviders[ subscriptionName ] || []).findIndex( function( provider ) {
return provider.socketWrapper === socketWrapper && provider.pattern === pattern;
});
var timedoutButReadyProvider = (this._timedoutProviders[ subscriptionName ] || []).filter( provider => provider.isReady )[0]
// console.log(socketWrapper.toString(), message, this._listenInProgress)
var lateProviders = (this._timedoutProviders[ subscriptionName ] || []).filter( provider => provider.lateAccept )
if (message.action === C.ACTIONS.LISTEN ) {
this.addListener( socketWrapper, message );
} else if (message.action === C.ACTIONS.UNLISTEN ) {
this.removeListener( socketWrapper, message );
} else if( this._timedoutProviders[ subscriptionName ] && indexOfTimedoutProvider !== -1) {
if ( message.action === C.ACTIONS.LISTEN_ACCEPT || message.action === C.ACTIONS.LISTEN_REJECT ) {
var provider = this._timedoutProviders[ subscriptionName ][ indexOfTimedoutProvider ];
provider.isReady = true;
// hold this provider and choose what to do when current proider will answer
var provider = this._timedoutProviders[ subscriptionName ][ indexOfTimedoutProvider ];
if ( message.action === C.ACTIONS.LISTEN_ACCEPT ) {
provider.lateAccept = true;
provider.action = message.action
provider.pattern = pattern
} else if ( message.action === C.ACTIONS.LISTEN_REJECT ) {
// ignore and remove from map
this._timedoutProviders[ subscriptionName ].splice( indexOfTimedoutProvider, 1 );
}
} else if( this._listenInProgress[ subscriptionName ] ) {
if (message.action === C.ACTIONS.LISTEN_ACCEPT ) {
this.accept( socketWrapper, message );
if( timedoutButReadyProvider ) {
var index = this._timedoutProviders[ subscriptionName ].indexOf( timedoutButReadyProvider );
// send for all timedout provider which did an ACCEPT a PATTERN_REMOVED
//this._listenerTimeoutRegistery.rejectAllProviders( subscriptionName );
lateProviders.forEach((function( provider ) {
provider.socketWrapper.send(
messageBuilder.getMsg(
this._type,
C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED,
[ provider.pattern, subscriptionName ]
)
);

}).bind( this ) );
} else if (message.action === C.ACTIONS.LISTEN_REJECT) {
var provider = lateProviders.shift();
if( provider ) {
var index = this._timedoutProviders[ subscriptionName ].indexOf( provider );
this._timedoutProviders[ subscriptionName ].splice( index, 1 );
if( timedoutButReadyProvider.action === C.ACTIONS.LISTEN_ACCEPT ) {
timedoutButReadyProvider.socketWrapper.send(
this.accept( provider.socketWrapper, message );
lateProviders.forEach((function( provider ) {
provider.socketWrapper.send(
messageBuilder.getMsg(
this._type,
C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED,
[ timedoutButReadyProvider.pattern, subscriptionName ]
[ provider.pattern, subscriptionName ]
)
);
}
}
} else if (message.action === C.ACTIONS.LISTEN_REJECT) {
if( timedoutButReadyProvider ) {
var index = this._timedoutProviders[ subscriptionName ].indexOf( timedoutButReadyProvider );
this._timedoutProviders[ subscriptionName ].splice( index, 1 );
if( timedoutButReadyProvider.action === C.ACTIONS.LISTEN_ACCEPT ) {
this.accept( timedoutButReadyProvider.socketWrapper, message );
}
}).bind( this ) );
} else {
this.triggerNextProvider( subscriptionName );
}
Expand Down Expand Up @@ -311,7 +323,6 @@ ListenerRegistry.prototype.triggerNextProvider = function ( name ) {

if( provider ) {
var timeoutId = setTimeout((function() {
// console.log('timing out now', provider.pattern)
if( this._timedoutProviders[ name ] == null ) {
this._timedoutProviders[ name ] = [];
}
Expand Down
85 changes: 78 additions & 7 deletions test/utils/listener-registry-load-balancingSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ var ListenerRegistry = require( '../../src/utils/listener-registry' ),

var subscribedTopics,
sendToSubscribersMock,
provider1,
provider2,
provider3,
client1,
client2;
var listenerRegistry,
client2,
listenerRegistry,
options = { logger: new LoggerMock() },
clientRegistry = null;

Expand Down Expand Up @@ -87,7 +90,7 @@ function verify(provider, actions, data) {
);
}

fdescribe( 'listener-registry-load-balancing', function() {
describe( 'listener-registry-load-balancing', function() {
beforeEach(function() {
sendToSubscribersMock = jasmine.createSpy( 'sendToSubscribersMock' );
subscribedTopics = [];
Expand All @@ -103,6 +106,8 @@ fdescribe( 'listener-registry-load-balancing', function() {
provider1.toString = function() { return 'provider1' }
provider2 = new SocketWrapper( new SocketMock(), options );
provider2.toString = function() { return 'provider2' }
provider3 = new SocketWrapper( new SocketMock(), options );
provider3.toString = function() { return 'provider3' }
listenerRegistry = new ListenerRegistry( 'R', options, clientRegistry );
expect( typeof listenerRegistry.addListener ).toBe( 'function' );
});
Expand Down Expand Up @@ -389,7 +394,7 @@ fdescribe( 'listener-registry-load-balancing', function() {
12. send publishing=false to the clients (new action: PUBLISHING // TODO:
*/

it( 'first rejects, seconds accepts', function() {
it( 'first rejects, seconds accepts', function(done) {
// 1
updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' )
// 2
Expand All @@ -405,7 +410,7 @@ fdescribe( 'listener-registry-load-balancing', function() {
// 6
verify( provider1, null )
verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1'] )

return done()
// 7
accept( provider2, 'a/[0-9]', 'a/1' );

Expand Down Expand Up @@ -597,6 +602,11 @@ fdescribe( 'listener-registry-load-balancing', function() {
accept( provider2, 'a/[0-9]', 'a/1' )

// 8
var msgString = msg( `${C.TOPIC.RECORD}|${C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER}|a/1|${C.TYPES.TRUE}+` )
expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString)
expect( sendToSubscribersMock.calls.count() ).toBe( 1 )

// 9
verify( provider1, null )
done()
}, 25)
Expand Down Expand Up @@ -632,14 +642,20 @@ fdescribe( 'listener-registry-load-balancing', function() {

// 7
accept( provider1, 'a/.*', 'a/1', true ) // hold this provider
expect( sendToSubscribersMock.calls.count() ).toBe( 0 )

// 8
accept( provider2, 'a/[0-9]', 'a/1' ) // use this provider -> provide 1 should be droped

// 9
verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/.*', 'a/1'] )
var msgString = msg( `${C.TOPIC.RECORD}|${C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER}|a/1|${C.TYPES.TRUE}+` )
expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString)
expect( sendToSubscribersMock.calls.count() ).toBe( 1 )

// 10
verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/.*', 'a/1'] )

// 11
verify( provider1, null )
done()
}, 25)
Expand Down Expand Up @@ -670,7 +686,6 @@ fdescribe( 'listener-registry-load-balancing', function() {

// 5
setTimeout(function() {
console.log('after timeout')
// 6
verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1'] )

Expand All @@ -686,10 +701,66 @@ fdescribe( 'listener-registry-load-balancing', function() {
expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString)
expect( sendToSubscribersMock.calls.count() ).toBe( 1 )

verify( provider1, null )

done()
}, 25)
});

it( 'provider 1 and 2 times out and 3 rejects, 1 and 2 accepts later and 1 wins', function(done) {
// 1
updateRegistryAndVerify( provider1, C.ACTIONS.LISTEN, 'a/.*' )
// 2
updateRegistryAndVerify( provider2, C.ACTIONS.LISTEN, 'a/[0-9]' )
// 3
updateRegistryAndVerify( provider3, C.ACTIONS.LISTEN, 'a/[1]' )

// 4
subscribe( 'a/1', client1 )

// 4
verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1'] )

// 5
setTimeout(function() {
// 6
verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1'] )

// 7
setTimeout(function() {
// 8
verify( provider3, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[1]', 'a/1'] )

// 9
accept( provider1, 'a/.*', 'a/1', true ) // in pending state
expect( sendToSubscribersMock.calls.count() ).toBe( 0 )

// 10
accept( provider2, 'a/[0-9]', 'a/1', true ) // in pending state
expect( sendToSubscribersMock.calls.count() ).toBe( 0 )

// 11
reject( provider3, 'a/[0-9]', 'a/1', true ) // should let provder 1 do the work

// 12
var msgString = msg( `${C.TOPIC.RECORD}|${C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER}|a/1|${C.TYPES.TRUE}+` )
expect( sendToSubscribersMock ).toHaveBeenCalledWith('a/1', msgString)
expect( sendToSubscribersMock.calls.count() ).toBe( 1 )

// 13
verify( provider1, null )

// 14
verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, ['a/[0-9]', 'a/1'] )

done()

}, 25)


}, 25)
});

// +++ Interval ( Not mandotory, can be null )

/*
Expand Down

0 comments on commit 64f3f4e

Please sign in to comment.