Skip to content

Commit

Permalink
Cleaning up tests for listening
Browse files Browse the repository at this point in the history
  • Loading branch information
yasserf committed Aug 8, 2016
1 parent da2fbe9 commit ae7ad98
Show file tree
Hide file tree
Showing 9 changed files with 635 additions and 864 deletions.
2 changes: 1 addition & 1 deletion src/event/event-handler.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
var C = require( '../constants/constants' ),
SubscriptionRegistry = require( '../utils/subscription-registry' ),
ListenerRegistry = require( '../utils/listener-registry' ),
ListenerRegistry = require( '../listen/listener-registry' ),
messageParser = require( '../message/message-parser' ),
messageBuilder = require( '../message/message-builder' ),
STRING = 'string';
Expand Down
69 changes: 31 additions & 38 deletions src/utils/listener-registry.js → src/listen/listener-registry.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
var C = require( '../constants/constants' ),
SubscriptionRegistry = require( '../utils/subscription-registry' ),
TimeoutRegistry = require( './timeout-registry' ),
TimeoutRegistry = require( './listener-timeout-registry' ),
messageParser = require( '../message/message-parser' ),
messageBuilder = require( '../message/message-builder' );

Expand Down Expand Up @@ -47,23 +47,15 @@ TODO
*/

ListenerRegistry.prototype.handle = function( socketWrapper, message ) {
//console.log(socketWrapper.toString(), message, this._listenInProgress)
var pattern = message.data[ 0 ];
var subscriptionName = message.data[ 1 ];
if (message.action === C.ACTIONS.LISTEN ) {

this.addListener( socketWrapper, message );

} else if (message.action === C.ACTIONS.UNLISTEN ) {

this.removeListener( socketWrapper, message );

} else if( this._listenerTimeoutRegistery.hasLateProviders( socketWrapper, message ) ) {

} else if( this._listenerTimeoutRegistery.isLateProvider( socketWrapper, message ) ) {
this._listenerTimeoutRegistery.handle( socketWrapper, message );

} else if( this._listenInProgress[ subscriptionName ] ) {

if (message.action === C.ACTIONS.LISTEN_ACCEPT ) {
this.accept( socketWrapper, message );
this._listenerTimeoutRegistery.rejectRemainingRevitalized( subscriptionName );
Expand All @@ -76,13 +68,12 @@ ListenerRegistry.prototype.handle = function( socketWrapper, message ) {
this.triggerNextProvider( subscriptionName );
}
}

} else {

console.log(message)
console.error(new Error('TODO').stack)
// send error that accepting or rejecting listen pattern / subscription
// that isn't being asked for
socketWrapper.send( messageBuilder.getMsg(
this._type,
C.ACTIONS.ERROR,
[ message.action, pattern, subscriptionName ]
) );
}
}

Expand All @@ -103,12 +94,7 @@ ListenerRegistry.prototype.accept = function( socketWrapper, message ) {
createHasProviderMessage( true, this._type, subscriptionName )
);

socketWrapper.socket.once( 'close', (function() {
this._clientRegistry.sendToSubscribers(
subscriptionName,
createHasProviderMessage( false, this._type, subscriptionName )
);
}).bind( this ) );
socketWrapper.socket.once( 'close', this.removeListener.bind( this, socketWrapper, message ) );

this._listenerTimeoutRegistery.clearTimeout( subscriptionName );
delete this._listenInProgress[ subscriptionName ];
Expand All @@ -122,19 +108,10 @@ function createHasProviderMessage(hasProvider, type, subscriptionName) {
);
}

/*
provider 1 times out
provider 2 times out
provider 3 accepts -> set
*/

/*
TODO
/**
*
*/

ListenerRegistry.prototype.hasActiveProvider = function( susbcriptionName ) {
// do i have a local provider ( i know the provider pattern, the subscription name and the provider socket
// does any other deepstream node actually provide this subscriptionName
return !!this._providedRecords[ susbcriptionName ];
}

Expand Down Expand Up @@ -189,8 +166,6 @@ ListenerRegistry.prototype.addListener = function( socketWrapper, message ) {
}
}
}

// now do the same thing but with remote subscriptions
};

/**
Expand Down Expand Up @@ -249,6 +224,18 @@ ListenerRegistry.prototype.removeListener = function( socketWrapper, message ) {
}
}
}

var name = message.data[ 1 ];
if( this._providedRecords[ name ] && this._providedRecords[ name ].socketWrapper === socketWrapper) {
this._clientRegistry.sendToSubscribers(
name,
createHasProviderMessage( false, this._type, name )
);
delete this._providedRecords[ name ];

this.createListenMap( name );
this.triggerNextProvider( name );
}
};

/**
Expand All @@ -261,7 +248,7 @@ ListenerRegistry.prototype.removeListener = function( socketWrapper, message ) {
* @returns {void}
*/
ListenerRegistry.prototype.onSubscriptionMade = function( name, socketWrapper, count ) {
var pattern, message;
var message;
var action = C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND;

if( this.hasActiveProvider( name ) ) {
Expand All @@ -270,12 +257,12 @@ ListenerRegistry.prototype.onSubscriptionMade = function( name, socketWrapper, c
) );
return;
}
this.createListenMap( pattern, name );

this.createListenMap( name );
this.triggerNextProvider( name );
};

ListenerRegistry.prototype.createListenMap = function ( pattern, name ) {
ListenerRegistry.prototype.createListenMap = function ( name ) {
// Creating the map
this._listenInProgress[ name ] = [];
for( pattern in this._patterns ) {
Expand Down Expand Up @@ -330,6 +317,12 @@ ListenerRegistry.prototype.onSubscriptionRemoved = function( name, socketWrapper
this._type, C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED, [ provider.pattern, name ]
)
);

this._clientRegistry.sendToSubscribers(
name,
createHasProviderMessage( false, this._type, name )
);

delete this._providedRecords[ name ];
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ class TimeoutRegistry {
this._timedoutProviders = {};
}

handle( socketWrapper, message ) {
const pattern = message.data[ 0 ];
const name = message.data[ 1 ];
const index = this._getIndex( socketWrapper, message );
const provider = this._timedoutProviders[ name ][ index ];
if( message.action === C.ACTIONS.LISTEN_ACCEPT ) {
provider.lateAccept = true;
provider.action = message.action;
provider.pattern = pattern;
} else if ( message.action === C.ACTIONS.LISTEN_REJECT ) {
// ignore and remove from map
this._timedoutProviders[ name ].splice( index, 1 );
}
}

addTimeout( name, provider, callback ) {
var timeoutId = setTimeout(() => {
if (this._timedoutProviders[ name ] == null ) {
Expand All @@ -27,41 +42,13 @@ class TimeoutRegistry {
clearTimeout( this._timeoutMap[ name ] );
}

getIndex( socketWrapper, message ) {
const pattern = message.data[ 0 ];
const name = message.data[ 1 ];
return (this._timedoutProviders[ name ] || []).findIndex( provider => {
return provider.socketWrapper === socketWrapper && provider.pattern === pattern;
})
}

getLateProviders( name ) {
return (this._timedoutProviders[ name ] || []).filter( provider => provider.lateAccept )
}

handle( socketWrapper, message ) {
const pattern = message.data[ 0 ];
const name = message.data[ 1 ];
const index = this.getIndex( socketWrapper, message );
const provider = this._timedoutProviders[ name ][ index ];
if( message.action === C.ACTIONS.LISTEN_ACCEPT ) {
// hold for later
provider.lateAccept = true;
provider.action = message.action;
provider.pattern = pattern;
} else if ( message.action === C.ACTIONS.LISTEN_REJECT ) {
// ignore and remove from map
this._timedoutProviders[ name ].splice( index, 1 );
}
}

hasLateProviders( socketWrapper, message ) {
const index = this.getIndex( socketWrapper, message )
isLateProvider( socketWrapper, message ) {
const index = this._getIndex( socketWrapper, message )
return this._timedoutProviders[ message.data[ 1 ] ] && index !== -1;
}

rejectRemainingRevitalized( name ) {
this.getLateProviders( name ).forEach( (provider, index) => {
this._getLateProviders( name ).forEach( (provider, index) => {
provider.socketWrapper.send(
messageBuilder.getMsg(
this._type,
Expand All @@ -74,14 +61,26 @@ class TimeoutRegistry {
}

getNextRevitalized( name ) {
const provider = this.getLateProviders( name ).shift();
const provider = this._getLateProviders( name ).shift();
if (provider == null) {
return;
}
const index = this._timedoutProviders[ name ].indexOf( provider );
this._timedoutProviders[ name ].splice( index, 1 );
return provider;
}

_getLateProviders( name ) {
return (this._timedoutProviders[ name ] || []).filter( provider => provider.lateAccept )
}

_getIndex( socketWrapper, message ) {
const pattern = message.data[ 0 ];
const name = message.data[ 1 ];
return (this._timedoutProviders[ name ] || []).findIndex( provider => {
return provider.socketWrapper === socketWrapper && provider.pattern === pattern;
})
}
}


Expand Down
2 changes: 1 addition & 1 deletion src/record/record-handler.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
var C = require( '../constants/constants' ),
SubscriptionRegistry = require( '../utils/subscription-registry' ),
ListenerRegistry = require( '../utils/listener-registry' ),
ListenerRegistry = require( '../listen/listener-registry' ),
RecordRequest = require( './record-request' ),
RecordTransition = require( './record-transition' ),
RecordDeletion = require( './record-deletion' ),
Expand Down
Loading

0 comments on commit ae7ad98

Please sign in to comment.