Skip to content

Commit

Permalink
Fixing listening related issues
Browse files Browse the repository at this point in the history
  • Loading branch information
yasserf committed Aug 28, 2016
1 parent 815265a commit ad2855e
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 108 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"browserify": "13.0.1",
"coveralls": "^2.11.9",
"cucumber": "^1.2.2",
"deepstream.io": "git+https://github.com/deepstreamIO/deepstream.io.git#feature/#313-listening-non-cluster",
"deepstream.io": "git+https://github.com/deepstreamIO/deepstream.io.git",
"deepstream.io-cache-redis": "latest",
"deepstream.io-msg-redis": "latest",
"derequire": "2.0.3",
Expand Down
5 changes: 3 additions & 2 deletions src/record/record.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,9 @@ Record.prototype._$onMessage = function( message ) {
clearInterval( this._readAckTimeout );
clearInterval( this._readTimeout );
} else if( message.action === C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER ) {
this.hasProvider = message.data[ 1 ];
this.emit( 'hasProviderChanged', message.data[ 1 ] );
var hasProvider = messageParser.convertTyped( message.data[ 1 ], this._client );
this.hasProvider = hasProvider;
this.emit( 'hasProviderChanged', hasProvider );
}
};

Expand Down
97 changes: 52 additions & 45 deletions src/utils/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,22 @@ var ResubscribeNotifier = require( './resubscribe-notifier' );
* @constructor
*/
var Listener = function( type, pattern, callback, options, client, connection ) {
this._type = type;
this._callback = callback;
this._pattern = pattern;
this._options = options;
this._client = client;
this._connection = connection;
this._ackTimeout = setTimeout( this._onAckTimeout.bind( this ), this._options.subscriptionTimeout );
this._resubscribeNotifier = new ResubscribeNotifier( client, this._sendListen.bind( this ) );
this._sendListen();
this._responded = null;
this.destroyPending = false;
this._type = type;
this._callback = callback;
this._pattern = pattern;
this._options = options;
this._client = client;
this._connection = connection;
this._ackTimeout = setTimeout( this._onAckTimeout.bind( this ), this._options.subscriptionTimeout );
this._resubscribeNotifier = new ResubscribeNotifier( client, this._sendListen.bind( this ) );
this._sendListen();
this.destroyPending = false;
};

Listener.prototype.sendDestroy = function() {
this.destroyPending = true;
this._connection.sendMsg( this._type, C.ACTIONS.UNLISTEN, [ this._pattern ] );
this._resubscribeNotifier.destroy();
this.destroyPending = true;
this._connection.sendMsg( this._type, C.ACTIONS.UNLISTEN, [ this._pattern ] );
this._resubscribeNotifier.destroy();

};

Expand All @@ -40,10 +39,10 @@ Listener.prototype.sendDestroy = function() {
* @returns {void}
*/
Listener.prototype.destroy = function() {
this._callback = null;
this._pattern = null;
this._client = null;
this._connection = null;
this._callback = null;
this._pattern = null;
this._client = null;
this._connection = null;
};

/*
Expand All @@ -55,8 +54,7 @@ Listener.prototype.destroy = function() {
* @returns {void}
*/
Listener.prototype.accept = function( name ) {
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN_ACCEPT, [ this._pattern, name ] );
this._responded = true;
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN_ACCEPT, [ this._pattern, name ] );
}

/*
Expand All @@ -69,8 +67,7 @@ Listener.prototype.accept = function( name ) {
* @returns {void}
*/
Listener.prototype.reject = function( name ) {
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN_REJECT, [ this._pattern, name ] );
this._responded = true;
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN_REJECT, [ this._pattern, name ] );
}

/*
Expand All @@ -80,10 +77,10 @@ Listener.prototype.reject = function( name ) {
* @returns {Object}
*/
Listener.prototype._createCallbackResponse = function(message) {
return {
accept: this.accept.bind( this, message.data[ 1 ] ),
reject: this.reject.bind( this, message.data[ 1 ] )
}
return {
accept: this.accept.bind( this, message.data[ 1 ] ),
reject: this.reject.bind( this, message.data[ 1 ] )
}
}

/*
Expand All @@ -93,23 +90,17 @@ Listener.prototype._createCallbackResponse = function(message) {
* @returns {void}
*/
Listener.prototype._$onMessage = function( message ) {
if( message.action === C.ACTIONS.ACK ) {
clearTimeout( this._ackTimeout );
} else if ( message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND ) {
this._callback( message.data[ 1 ], true, this._createCallbackResponse( message) );
} else if ( message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED ) {
this._callback( message.data[ 1 ], false );
} else {
this._client._$onError( this._type, C.EVENT.UNSOLICITED_MESSAGE, message.data[ 0 ] + '|' + message.data[ 1 ] );
}

if( message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND && this._responded !== true ) {
var deprecatedMessage = 'DEPRECATED: listen should explicitly accept or reject for pattern: ' + message.data[ 0 ];
deprecatedMessage += '\nhttps://github.com/deepstreamIO/deepstream.io-client-js/issues/212';
if( console && console.warn ) {
console.warn( deprecatedMessage );
}
}
if( message.action === C.ACTIONS.ACK ) {
clearTimeout( this._ackTimeout );
} else if ( message.action === C
.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND ) {
this._showDeprecatedMessage( message );
this._callback( message.data[ 1 ], true, this._createCallbackResponse( message) );
} else if ( message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED ) {
this._callback( message.data[ 1 ], false );
} else {
this._client._$onError( this._type, C.EVENT.UNSOLICITED_MESSAGE, message.data[ 0 ] + '|' + message.data[ 1 ] );
}
};

/*
Expand All @@ -119,7 +110,7 @@ Listener.prototype._$onMessage = function( message ) {
* @returns {void}
*/
Listener.prototype._sendListen = function() {
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN, [ this._pattern ] );
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN, [ this._pattern ] );
};

/*
Expand All @@ -129,7 +120,23 @@ Listener.prototype._sendListen = function() {
* @returns {void}
*/
Listener.prototype._onAckTimeout = function() {
this._client._$onError( this._type, C.EVENT.ACK_TIMEOUT, 'No ACK message received in time for ' + this._pattern );
this._client._$onError( this._type, C.EVENT.ACK_TIMEOUT, 'No ACK message received in time for ' + this._pattern );
};

/*
* Shows a deprecation message to users before 1.1
*
* @private
* @returns {void}
*/
Listener.prototype._showDeprecatedMessage = function( message ) {
if( this._callback.length !== 3 ) {
var deprecatedMessage = 'DEPRECATED: listen should explicitly accept or reject for pattern: ' + message.data[ 0 ];
deprecatedMessage += '\nhttps://github.com/deepstreamIO/deepstream.io-client-js/issues/212';
if( console && console.warn ) {
console.warn( deprecatedMessage );
}
}
};

module.exports = Listener;
2 changes: 1 addition & 1 deletion test-e2e-gherkin/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Cluster.prototype._startServer = function( port, done ) {
host: config.redisHost
}));
if( this._enableLogging !== true ) {
//this.servers[ port ].set( 'logger', new Logger() );
this.servers[ port ].set( 'logger', new Logger() );
}

this.servers[ port ].set( 'showLogo', false );
Expand Down
2 changes: 1 addition & 1 deletion test-e2e/config.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module.exports = {
redisPort: process.env.REDIS_PORT || 6379,
redisHost: process.env.REDIS_HOST || 'localhost',
messageTimeout: 2000
messageTimeout: 50
};
4 changes: 2 additions & 2 deletions test-e2e/specs/connection-lostSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe( 'it recovers a connection without losing record updates', function() {
//Ensure all writes went to the server
setTimeout( function() {
done();
}, 200 );
}, 50 );
}
}, 30 );
});
Expand Down Expand Up @@ -88,7 +88,7 @@ describe( 'it recovers a connection without losing record updates', function() {
clientA.on( 'connectionStateChanged', function(){
if( clientA.getConnectionState() === 'OPEN' ) {
//Time needed for reads to come back once connection is reestablished
setTimeout( done, 200 );
setTimeout( done, 50 );
}
});
});
Expand Down
11 changes: 9 additions & 2 deletions test-e2e/specs/event-listenerSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ describe( 'event listener', function() {
var deepstreamServer,
logger = new TestLogger(),
clientA,
clientB;
clientB,
clientC;

/**************** SETUP ****************/
it( 'starts the server', function( done ){
Expand All @@ -30,6 +31,12 @@ describe( 'event listener', function() {
clientB.on( 'error', () => {})
});

it( 'creates clientC', function( done ) {
clientC = deepstreamClient( 'localhost:6021' );
clientC.login( null, function(){ done(); });
clientC.on( 'error', () => {})
});

/**************** TEST ****************/
it( 'listens for event subscriptions', function(done){
var matches = [];
Expand All @@ -47,7 +54,7 @@ describe( 'event listener', function() {

clientB.event.subscribe( 'event/matchespattern', function() {} );
clientB.event.subscribe( 'event/DOES_NOT_MATCH', function() {} );
clientA.event.subscribe( 'event/some33', function() {} );
clientC.event.subscribe( 'event/some33', function() {} );
});

it( 'listens, gets notified and unlistens', function(done) {
Expand Down

0 comments on commit ad2855e

Please sign in to comment.