Skip to content

Commit

Permalink
fix implementation for timedout providers
Browse files Browse the repository at this point in the history
  • Loading branch information
timaschew committed Aug 5, 2016
1 parent 41ce3c0 commit 9037c65
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 26 deletions.
59 changes: 39 additions & 20 deletions src/utils/listener-registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,52 @@ TODO
*/

ListenerRegistry.prototype.handle = function( socketWrapper, message ) {
var index = (this._timedoutProviders[ message.data[ 1 ] ] || []).findIndex( function( provider ) {
return provider.socketWrapper === socketWrapper && provider.pattern === message.data[ 0 ];
var pattern = message.data[ 0 ];
var subscriptionName = message.data[ 1 ];
var indexOfTimedoutProvider = (this._timedoutProviders[ subscriptionName ] || []).findIndex( function( provider ) {
return provider.socketWrapper === socketWrapper && provider.pattern === pattern;
});
var timedoutButReadyProvider = (this._timedoutProviders[ subscriptionName ] || []).filter( provider => provider.isReady )[0]
// console.log(socketWrapper.toString(), message, this._listenInProgress)
if (message.action === C.ACTIONS.LISTEN ) {
this.addListener( socketWrapper, message );
} else if (message.action === C.ACTIONS.UNLISTEN ) {
this.removeListener( socketWrapper, message );
} else if( this._timedoutProviders[ message.data[ 1 ] ] && index !== -1) {
if (message.action === C.ACTIONS.LISTEN_ACCEPT ) {
socketWrapper.send(
messageBuilder.getMsg(
this._type,
C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED,
[ message.data[ 0 ], message.data[ 1 ] ]
)
);
} else if( this._timedoutProviders[ subscriptionName ] && indexOfTimedoutProvider !== -1) {
if ( message.action === C.ACTIONS.LISTEN_ACCEPT || message.action === C.ACTIONS.LISTEN_REJECT ) {
var provider = this._timedoutProviders[ subscriptionName ][ indexOfTimedoutProvider ];
provider.isReady = true;
provider.action = message.action
provider.pattern = pattern
}
this._timedoutProviders[ message.data[ 1 ] ].splice( index, 1 );
} else if( this._listenInProgress[ message.data[ 1 ] ] ) {
} else if( this._listenInProgress[ subscriptionName ] ) {
if (message.action === C.ACTIONS.LISTEN_ACCEPT ) {
this.accept( socketWrapper, message );
// notify timeod out methods that accepted SR
if( timedoutButReadyProvider ) {
var index = this._timedoutProviders[ subscriptionName ].indexOf( timedoutButReadyProvider );
this._timedoutProviders[ subscriptionName ].splice( index, 1 );
if( timedoutButReadyProvider.action === C.ACTIONS.LISTEN_ACCEPT ) {
timedoutButReadyProvider.socketWrapper.send(
messageBuilder.getMsg(
this._type,
C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED,
[ timedoutButReadyProvider.pattern, subscriptionName ]
)
);
}
}
} else if (message.action === C.ACTIONS.LISTEN_REJECT) {
this.triggerNextProvider( message.data[ 1 ] );
if( timedoutButReadyProvider ) {
var index = this._timedoutProviders[ subscriptionName ].indexOf( timedoutButReadyProvider );
this._timedoutProviders[ subscriptionName ].splice( index, 1 );
if( timedoutButReadyProvider.action === C.ACTIONS.LISTEN_ACCEPT ) {
this.accept( timedoutButReadyProvider.socketWrapper, message );
}
} else {
this.triggerNextProvider( subscriptionName );
}
}
} else {
} else {
console.log(message)
console.error(new Error('TODO').stack)
// send error that accepting or rejecting listen pattern / subscription
Expand Down Expand Up @@ -240,7 +260,7 @@ ListenerRegistry.prototype.removeListener = function( socketWrapper, message ) {
this._listenInProgress[name][i].socketWrapper === socketWrapper &&
this._listenInProgress[name][i].pattern === pattern
) {
this._listenInProgress[name].splice( i, 1);
this._listenInProgress[name].splice( i, 1 );
}
}
}
Expand All @@ -259,9 +279,9 @@ ListenerRegistry.prototype.onSubscriptionMade = function( name, socketWrapper, c
var pattern, message;
var action = C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND;

if( this.hasActiveProvider( name )) {
if( this.hasActiveProvider( name ) ) {
socketWrapper.send( messageBuilder.getMsg(
this._type, C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER, [name, C.TYPES.TRUE]
this._type, C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER, [ name, C.TYPES.TRUE ]
) );
return;
}
Expand Down Expand Up @@ -299,7 +319,6 @@ ListenerRegistry.prototype.triggerNextProvider = function ( name ) {
this.triggerNextProvider( name );
}).bind( this ), 20 );
this._timeoutMap[ name ] = timeoutId;
// console.log('3>>> found', provider.pattern)
provider.socketWrapper.send( messageBuilder.getMsg(
this._type, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, [ provider.pattern, name ]
)
Expand Down
12 changes: 6 additions & 6 deletions test/utils/listener-registry-load-balancingSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -587,18 +587,16 @@ fdescribe( 'listener-registry-load-balancing', function() {

// 4
verify( provider1, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/.*', 'a/1'] )
// console.log('a', messageHistory)

// 5
setTimeout(function() {
// 7
accept( provider2, 'a/[0-9]', 'a/1' )

// 6
verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1'] )

// 7
accept( provider2, 'a/[0-9]', 'a/1' )

// 8
// console.log('b', messageHistory)
verify( provider1, null )
done()
}, 25)
Expand Down Expand Up @@ -672,11 +670,13 @@ fdescribe( 'listener-registry-load-balancing', function() {

// 5
setTimeout(function() {
console.log('after timeout')
// 6
verify( provider2, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, ['a/[0-9]', 'a/1'] )

// 7
accept( provider1, 'a/.*', 'a/1', true) // in pending state
accept( provider1, 'a/.*', 'a/1', true ) // in pending state
expect( sendToSubscribersMock.calls.count() ).toBe( 0 )

// 10
reject( provider2, 'a/[0-9]', 'a/1', true ) // should let provder 1 do the work
Expand Down

0 comments on commit 9037c65

Please sign in to comment.