Skip to content

Commit

Permalink
Fix #82 Adding merge strategies in client
Browse files Browse the repository at this point in the history
  • Loading branch information
yasserf committed May 6, 2016
1 parent 6f41137 commit ad08b90
Show file tree
Hide file tree
Showing 9 changed files with 587 additions and 25 deletions.
14 changes: 14 additions & 0 deletions src/constants/merge-strategies.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module.exports = {
/**
* Choose the server state over the clients
**/
REMOTE_WINS: function( record, remoteValue, remoteVersion, callback ) {
callback( null, remoteValue );
},
/**
* Choose the local state over the servers
**/
LOCAL_WINS: function( record, remoteValue, remoteVersion, callback ) {
callback( null, record.get() );
}
};
12 changes: 11 additions & 1 deletion src/default-options.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
var MERGE_STRATEGIES = require( './constants/merge-strategies' );

module.exports = {
/************************************************
* Deepstream *
Expand Down Expand Up @@ -175,5 +177,13 @@ module.exports = {
* SSL/TLS connections, or if you know that
* your network does not block websockets.
*/
rememberUpgrade: false
rememberUpgrade: false,

/**
* @param {Function} mergeStrategy This provides the default strategy used to deal with merge conflicts.
* If the merge strategy is not succesfull it will set an error, else set the
* returned data as the latest revision. This can be overriden on a per record
* basis by setting the `setMergeStrategy`.
*/
mergeStrategy: MERGE_STRATEGIES.REMOTE_WINS
};
111 changes: 91 additions & 20 deletions src/record/record.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ var Record = function( name, recordOptions, connection, options, client ) {
this.isReady = false;
this.isDestroyed = false;
this._$data = {};
this._version = null;
this.version = null;
this._paths = {};
this._oldValue = null;
this._oldPathValues = null;
this._eventEmitter = new EventEmitter();
this._queuedMethodCalls = [];

this._mergeStrategy = null;
if( options.mergeStrategy ) {
this.setMergeStrategy( options.mergeStrategy );
}

this._resubscribeNotifier = new ResubscribeNotifier( this._client, this._sendRead.bind( this ) );
this._readAckTimeout = setTimeout( this._onTimeout.bind( this, C.EVENT.ACK_TIMEOUT ), this._options.recordReadAckTimeout );
this._readTimeout = setTimeout( this._onTimeout.bind( this, C.EVENT.RESPONSE_TIMEOUT ), this._options.recordReadTimeout );
Expand All @@ -46,6 +51,27 @@ var Record = function( name, recordOptions, connection, options, client ) {

EventEmitter( Record.prototype );

/**
* Set a merge strategy to resolve any merge conflicts that may occur due
* to offline work or write conflicts. The function will be called with the
* local record, the remote version/data and a callback to call once the merge has
* completed or if an error occurs ( which leaves it in an inconsistent state until
* the next update merge attempt ).
*
* @param {Function} mergeStrategy A Function that can resolve merge issues.
*
* @public
* @returns {void}
*/
Record.prototype.setMergeStrategy = function( mergeStrategy ) {
if( mergeStrategy instanceof Function ) {
this._mergeStrategy = mergeStrategy;
} else {
throw new Error( 'Invalid merge strategy: Must be a Function' );
}
};


/**
* Returns a copy of either the entire dataset of the record
* or - if called with a path - the value of that path within
Expand Down Expand Up @@ -107,20 +133,20 @@ Record.prototype.set = function( pathOrData, data ) {
}

this._beginChange();
this._version++;
this.version++;

if( arguments.length === 1 ) {
this._$data = ( typeof pathOrData == 'object' ) ? utils.deepCopy( pathOrData ) : pathOrData;
this._connection.sendMsg( C.TOPIC.RECORD, C.ACTIONS.UPDATE, [
this.name,
this._version,
this.version,
this._$data
]);
} else {
this._getPath( pathOrData ).setValue( ( typeof data == 'object' ) ? utils.deepCopy( data ): data );
this._connection.sendMsg( C.TOPIC.RECORD, C.ACTIONS.PATCH, [
this.name,
this._version,
this.version,
pathOrData,
messageBuilder.typed( data )
]);
Expand Down Expand Up @@ -250,7 +276,7 @@ Record.prototype.whenReady = function( callback ) {
*/
Record.prototype._$onMessage = function( message ) {
if( message.action === C.ACTIONS.READ ) {
if( this._version === null ) {
if( this.version === null ) {
clearTimeout( this._readTimeout );
this._onRead( message );
} else {
Expand All @@ -264,22 +290,51 @@ Record.prototype._$onMessage = function( message ) {
this._applyUpdate( message, this._client );
}
else if( message.data[ 0 ] === C.EVENT.VERSION_EXISTS ) {
this._recoverRecord( message );
this._recoverRecord( message.data[ 2 ], JSON.parse( message.data[ 3 ] ), message );
}
};

/**
* @todo This resets the record to the latest version the server has whenever a version conflict
* occurs.
* Called when a merge conflict is detected by a VERSION_EXISTS error or if an update recieved
* is directly after the clients. If no merge strategy is configure it will emit a VERSION_EXISTS
* error and the record will remain in an inconsistent state.
*
* Instead it should find a more sophisticated merge strategy
* @param {Number} remoteVersion The remote version number
* @param {Object} remoteData The remote object data
* @param {Object} message parsed and validated deepstream message
*
* @private
* @returns {void}
*/
Record.prototype._recoverRecord = function( message ) {
message.processedError = true;
this.emit( 'error', C.EVENT.VERSION_EXISTS, 'received update for ' + message.version + ' but version is ' + this._version );
Record.prototype._recoverRecord = function( remoteVersion, remoteData, message ) {
if( this._mergeStrategy ) {
this._mergeStrategy( this, remoteData, remoteVersion, this._onRecordRecovered.bind( this, remoteVersion ) );
}
else {
message.processedError = true;
this.emit( 'error', C.EVENT.VERSION_EXISTS, 'received update for ' + remoteVersion + ' but version is ' + this.version );
}
};

/**
* Callback once the record merge has completed. If successful it will set the
* record state, else emit and error and the record will remain in an
* inconsistent state until the next update.
*
* @param {Number} remoteVersion The remote version number
* @param {Object} remoteData The remote object data
* @param {Object} message parsed and validated deepstream message
*
* @private
* @returns {void}
*/
Record.prototype._onRecordRecovered = function( remoteVersion, error, data ) {
if( !error ) {
this.version = remoteVersion;
this.set( data );
} else {
this.emit( 'error', C.EVENT.VERSION_EXISTS, 'received update for ' + remoteVersion + ' but version is ' + this.version );
}
};

/**
Expand Down Expand Up @@ -319,21 +374,37 @@ Record.prototype._processAckMessage = function( message ) {
*/
Record.prototype._applyUpdate = function( message ) {
var version = parseInt( message.data[ 1 ], 10 );
var data;

if( message.action === C.ACTIONS.PATCH ) {
data = messageParser.convertTyped( message.data[ 3 ], this._client );
} else {
data = JSON.parse( message.data[ 2 ] );
}

if( this._version === null ) {
this._version = version;
if( this.version === null ) {
this.version = version;
}
else if( this._version + 1 !== version ) {
this._recoverRecord( message );
else if( this.version + 1 !== version ) {
if( message.action === C.ACTIONS.PATCH ) {
/**
* Request a snapshot so that a merge can be done with the read reply which contains
* the full state of the record
**/
this._connection.sendMsg( C.TOPIC.RECORD, C.ACTIONS.SNAPSHOT, [ this.name ] );
} else {
this._recoverRecord( version, data, message );
}
return;
}

this._beginChange();
this._version = version;
this.version = version;

if( message.action === C.ACTIONS.PATCH ) {
this._getPath( message.data[ 2 ] ).setValue( messageParser.convertTyped( message.data[ 3 ], this._client ) );
this._getPath( message.data[ 2 ] ).setValue( data );
} else {
this._$data = JSON.parse( message.data[ 2 ] );
this._$data = data;
}

this._completeChange();
Expand All @@ -349,7 +420,7 @@ Record.prototype._applyUpdate = function( message ) {
*/
Record.prototype._onRead = function( message ) {
this._beginChange();
this._version = parseInt( message.data[ 1 ], 10 );
this.version = parseInt( message.data[ 1 ], 10 );
this._$data = JSON.parse( message.data[ 2 ] );
this._completeChange();
this._setReady();
Expand Down
6 changes: 5 additions & 1 deletion test-e2e/specs/connection-lostSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ describe( 'it recovers a connection without losing record updates', function() {

/**************** TESTS ****************/
it( 'connects', function( done ) {
clientA = deepstreamClient( 'localhost:6021' );
clientA = deepstreamClient( 'localhost:6021', {
mergeStrategy: function( record, remoteVersion, remoteData, callback ) {
callback( 'Error Merging' );
}
} );
clientA.on( 'error', function(){
clientAErrors.push( arguments );
});
Expand Down
44 changes: 44 additions & 0 deletions test-unit/unit/constants/merge-strategiesSpec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/* global describe, it, expect, jasmine */
var MERGE_STRATEGIES = require( '../../../src/constants/merge-strategies' );

describe( 'merge strategies', function() {

describe( 'remote wins', function() {

beforeEach( function() {
this.mergeCallback = jasmine.createSpy( 'mergeSpy' );
this.record = {
get: function() {
return { type: 'remote' };
}
};
MERGE_STRATEGIES.REMOTE_WINS( {}, { type: 'remote' }, 5, this.mergeCallback );
} );

it( 'returns the remote data', function() {
expect( this.mergeCallback.calls.count() ).toBe( 1 );
expect( this.mergeCallback.calls.mostRecent().args ).toEqual( [ null, { type: 'remote' } ] );
} );

} );

describe( 'local wins', function() {

beforeEach( function() {
this.mergeCallback = jasmine.createSpy( 'mergeSpy' );
this.record = {
get: function() {
return { type: 'local' };
}
};
MERGE_STRATEGIES.LOCAL_WINS( this.record, { type: 'remote' }, 5, this.mergeCallback );
} );

it( 'returns the remote data', function() {
expect( this.mergeCallback.calls.count() ).toBe( 1 );
expect( this.mergeCallback.calls.mostRecent().args ).toEqual( [ null, { type: 'local' } ] );
} );

} );

} );
2 changes: 1 addition & 1 deletion test-unit/unit/record/list-change-listenerSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe( 'lists change listener', function(){
recordHandler._$handle({
topic: 'R',
action: 'U',
data: [ 'someList', 1, '["a","b","c","d","e","x"]' ]
data: [ 'someList', 7, '["a","b","c","d","e","x"]' ]
});
expect( callback ).toHaveBeenCalledWith( 'x', 5 );
expect( callback.calls.count() ).toBe( 1 );
Expand Down
2 changes: 1 addition & 1 deletion test-unit/unit/record/list-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ describe( 'lists contain arrays of record names', function(){
recordHandler._$handle({
topic: 'R',
action: 'R',
data: [ 'someList', 5, '["x","y"]' ]
data: [ 'someList', 7, '["x","y"]' ]
});
expect( list.getEntries() ).toEqual([ 'x','y' ]);
expect( changeCallback ).toHaveBeenCalledWith([ 'x','y' ]);
Expand Down

0 comments on commit ad08b90

Please sign in to comment.