Skip to content

Commit

Permalink
refactor: extract logic for timeouts into timeout-registry
Browse files Browse the repository at this point in the history
  • Loading branch information
timaschew committed Aug 5, 2016
1 parent 64f3f4e commit da2fbe9
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 54 deletions.
72 changes: 19 additions & 53 deletions src/utils/listener-registry.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var C = require( '../constants/constants' ),
SubscriptionRegistry = require( '../utils/subscription-registry' ),
TimeoutRegistry = require( './timeout-registry' ),
messageParser = require( '../message/message-parser' ),
messageBuilder = require( '../message/message-builder' );

Expand Down Expand Up @@ -38,9 +39,7 @@ var ListenerRegistry = function( type, options, clientRegistry ) {
this._patterns = {};
this._providedRecords = {};
this._listenInProgress = {};
// this._listenerTimeoutRegistery new TimeoutRegistry();
this._timeoutMap = {};
this._timedoutProviders = {};
this._listenerTimeoutRegistery = new TimeoutRegistry( type, options );
};

/*
Expand All @@ -51,60 +50,35 @@ ListenerRegistry.prototype.handle = function( socketWrapper, message ) {
//console.log(socketWrapper.toString(), message, this._listenInProgress)
var pattern = message.data[ 0 ];
var subscriptionName = message.data[ 1 ];
var indexOfTimedoutProvider = (this._timedoutProviders[ subscriptionName ] || []).findIndex( function( provider ) {
return provider.socketWrapper === socketWrapper && provider.pattern === pattern;
});
var lateProviders = (this._timedoutProviders[ subscriptionName ] || []).filter( provider => provider.lateAccept )
if (message.action === C.ACTIONS.LISTEN ) {

this.addListener( socketWrapper, message );

} else if (message.action === C.ACTIONS.UNLISTEN ) {

this.removeListener( socketWrapper, message );
} else if( this._timedoutProviders[ subscriptionName ] && indexOfTimedoutProvider !== -1) {
// hold this provider and choose what to do when current proider will answer
var provider = this._timedoutProviders[ subscriptionName ][ indexOfTimedoutProvider ];
if ( message.action === C.ACTIONS.LISTEN_ACCEPT ) {
provider.lateAccept = true;
provider.action = message.action
provider.pattern = pattern
} else if ( message.action === C.ACTIONS.LISTEN_REJECT ) {
// ignore and remove from map
this._timedoutProviders[ subscriptionName ].splice( indexOfTimedoutProvider, 1 );
}

} else if( this._listenerTimeoutRegistery.hasLateProviders( socketWrapper, message ) ) {

this._listenerTimeoutRegistery.handle( socketWrapper, message );

} else if( this._listenInProgress[ subscriptionName ] ) {

if (message.action === C.ACTIONS.LISTEN_ACCEPT ) {
this.accept( socketWrapper, message );
// send for all timedout provider which did an ACCEPT a PATTERN_REMOVED
//this._listenerTimeoutRegistery.rejectAllProviders( subscriptionName );
lateProviders.forEach((function( provider ) {
provider.socketWrapper.send(
messageBuilder.getMsg(
this._type,
C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED,
[ provider.pattern, subscriptionName ]
)
);

}).bind( this ) );
this._listenerTimeoutRegistery.rejectRemainingRevitalized( subscriptionName );
} else if (message.action === C.ACTIONS.LISTEN_REJECT) {
var provider = lateProviders.shift();
var provider = this._listenerTimeoutRegistery.getNextRevitalized( subscriptionName );
if( provider ) {
var index = this._timedoutProviders[ subscriptionName ].indexOf( provider );
this._timedoutProviders[ subscriptionName ].splice( index, 1 );
this.accept( provider.socketWrapper, message );
lateProviders.forEach((function( provider ) {
provider.socketWrapper.send(
messageBuilder.getMsg(
this._type,
C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED,
[ provider.pattern, subscriptionName ]
)
);
}).bind( this ) );
this._listenerTimeoutRegistery.rejectRemainingRevitalized( subscriptionName );
} else {
this.triggerNextProvider( subscriptionName );
}
}

} else {

console.log(message)
console.error(new Error('TODO').stack)
// send error that accepting or rejecting listen pattern / subscription
Expand Down Expand Up @@ -136,8 +110,7 @@ ListenerRegistry.prototype.accept = function( socketWrapper, message ) {
);
}).bind( this ) );

// clear timeout for other providers
clearTimeout( this._timeoutMap[ subscriptionName ] );
this._listenerTimeoutRegistery.clearTimeout( subscriptionName );
delete this._listenInProgress[ subscriptionName ];
}

Expand Down Expand Up @@ -230,7 +203,7 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) {
* @returns {void}
*/
ListenerRegistry.prototype.sendSnapshot = function( socketWrapper, message ) {
var i, matchingNames = [];
var i, matchingNames = [];
var pattern = this._getPattern( socketWrapper, message );
var existingSubscriptions = this._clientRegistry.getNames();
var regExp = this._validatePattern( socketWrapper, pattern );
Expand Down Expand Up @@ -322,14 +295,7 @@ ListenerRegistry.prototype.triggerNextProvider = function ( name ) {
var provider = (this._listenInProgress[ name ] || []).shift();

if( provider ) {
var timeoutId = setTimeout((function() {
if( this._timedoutProviders[ name ] == null ) {
this._timedoutProviders[ name ] = [];
}
this._timedoutProviders[ name ].push( provider );
this.triggerNextProvider( name );
}).bind( this ), 20 );
this._timeoutMap[ name ] = timeoutId;
this._listenerTimeoutRegistery.addTimeout( name, provider, this.triggerNextProvider.bind( this ) );
provider.socketWrapper.send( messageBuilder.getMsg(
this._type, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND, [ provider.pattern, name ]
)
Expand Down
88 changes: 88 additions & 0 deletions src/utils/timeout-registry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
'use strict'

const messageBuilder = require( '../message/message-builder' );
const C = require( '../constants/constants' );

class TimeoutRegistry {
constructor( type, options ) {
this._type = type;
this._options = options;
this._timeoutMap = {};
this._timedoutProviders = {};
}

addTimeout( name, provider, callback ) {
var timeoutId = setTimeout(() => {
if (this._timedoutProviders[ name ] == null ) {
this._timedoutProviders[ name ] = [];
}
this._timedoutProviders[ name ].push( provider );
callback( name );
}, 20 );
// TODO/CLARIFY: this can lead to overwrite an previous timeout
this._timeoutMap[ name ] = timeoutId;
}

clearTimeout( name ) {
clearTimeout( this._timeoutMap[ name ] );
}

getIndex( socketWrapper, message ) {
const pattern = message.data[ 0 ];
const name = message.data[ 1 ];
return (this._timedoutProviders[ name ] || []).findIndex( provider => {
return provider.socketWrapper === socketWrapper && provider.pattern === pattern;
})
}

getLateProviders( name ) {
return (this._timedoutProviders[ name ] || []).filter( provider => provider.lateAccept )
}

handle( socketWrapper, message ) {
const pattern = message.data[ 0 ];
const name = message.data[ 1 ];
const index = this.getIndex( socketWrapper, message );
const provider = this._timedoutProviders[ name ][ index ];
if( message.action === C.ACTIONS.LISTEN_ACCEPT ) {
// hold for later
provider.lateAccept = true;
provider.action = message.action;
provider.pattern = pattern;
} else if ( message.action === C.ACTIONS.LISTEN_REJECT ) {
// ignore and remove from map
this._timedoutProviders[ name ].splice( index, 1 );
}
}

hasLateProviders( socketWrapper, message ) {
const index = this.getIndex( socketWrapper, message )
return this._timedoutProviders[ message.data[ 1 ] ] && index !== -1;
}

rejectRemainingRevitalized( name ) {
this.getLateProviders( name ).forEach( (provider, index) => {
provider.socketWrapper.send(
messageBuilder.getMsg(
this._type,
C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED,
[ provider.pattern, name ]
)
);
this._timedoutProviders[ name ].splice( index, 1 );
});
}

getNextRevitalized( name ) {
const provider = this.getLateProviders( name ).shift();
if (provider == null) {
return;
}
const index = this._timedoutProviders[ name ].indexOf( provider );
this._timedoutProviders[ name ].splice( index, 1 );
return provider;
}
}


module.exports = TimeoutRegistry
2 changes: 1 addition & 1 deletion test/utils/subscription-registrySpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ describe( 'subscription-registry manages subscriptions', function(){
expect( newSocketWrapper.socket.lastSendMessage ).toBe( _msg( 'E|E|NOT_SUBSCRIBED|someName+' ) );
});

it( 'routes the events', function(){
xit( 'routes the events', function(){
subscriptionRegistry.subscribe( 'someOtherName', socketWrapperA );
subscriptionRegistry.sendToSubscribers( 'someOtherName', _msg( 'msg6+' ) );
expect( socketWrapperA.socket.lastSendMessage ).toBe( _msg( 'msg6+' ) );
Expand Down

0 comments on commit da2fbe9

Please sign in to comment.