Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(operation-time): track operationTime in relevant sessions
Browse files Browse the repository at this point in the history
If `causalConsistency` is opted for, then the driver MUST track
`operationTime` on incoming server responses.  This is one part of
full support for causal consistency of reads, the other part will
be implemented in the porcelain driver related to read concerns.

NODE-1106
  • Loading branch information
mbroadst committed Nov 17, 2017
1 parent cfe8606 commit 8d512f1
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 7 deletions.
22 changes: 16 additions & 6 deletions lib/connection/pool.js
Expand Up @@ -544,13 +544,23 @@ function messageHandler(self) {
return handleOperationCallback(self, workItem.cb, new MongoError(err));
}

// Look for clusterTime, and update it if necessary
if (message.documents[0] && message.documents[0].hasOwnProperty('$clusterTime')) {
const $clusterTime = message.documents[0].$clusterTime;
self.topology.clusterTime = $clusterTime;
// Look for clusterTime, and operationTime and update them if necessary
if (message.documents[0]) {
if (message.documents[0].$clusterTime) {
const $clusterTime = message.documents[0].$clusterTime;
self.topology.clusterTime = $clusterTime;

if (workItem.session != null) {
resolveClusterTime(workItem.session, $clusterTime);
}
}

if (workItem.session != null) {
resolveClusterTime(workItem.session, $clusterTime);
if (
message.documents[0].operationTime &&
workItem.session &&
workItem.session.supports.causalConsistency
) {
workItem.session.advanceOperationTime(message.documents[0].operationTime);
}
}

Expand Down
25 changes: 25 additions & 0 deletions lib/sessions.js
Expand Up @@ -19,15 +19,24 @@ class ClientSession {
throw new Error('ClientSession requires a ServerSessionPool');
}

options = options || {};
this.topology = topology;
this.sessionPool = sessionPool;
this.hasEnded = false;
this.serverSession = sessionPool.acquire();

this.supports = {
causalConsistency: !!options.causalConsistency
};

options = options || {};
if (typeof options.initialClusterTime !== 'undefined') {
this.clusterTime = options.initialClusterTime;
} else {
this.clusterTime = null;
}

this.operationTime = null;
}

/**
Expand Down Expand Up @@ -64,6 +73,22 @@ class ClientSession {
// spec indicates that we should ignore all errors for `endSessions`
if (typeof callback === 'function') callback(null, null);
}

/**
* Advances the operationTime for a ClientSession.
*
* @param {object} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
*/
advanceOperationTime(operationTime) {
if (this.operationTime == null) {
this.operationTime = operationTime;
return;
}

if (operationTime.greaterThan(this.operationTime)) {
this.operationTime = operationTime;
}
}
}

Object.defineProperty(ClientSession.prototype, 'id', {
Expand Down
8 changes: 7 additions & 1 deletion test/mock/index.js
Expand Up @@ -52,6 +52,11 @@ const DEFAULT_ISMASTER = {
ok: 1
};

const DEFAULT_ISMASTER_36 = Object.assign({}, DEFAULT_ISMASTER, {
maxWireVersion: 6,
logicalSessionTimeoutMinutes: 10
});

/*
* Main module
*/
Expand All @@ -67,5 +72,6 @@ module.exports = {
},

cleanup: cleanup,
DEFAULT_ISMASTER: DEFAULT_ISMASTER
DEFAULT_ISMASTER: DEFAULT_ISMASTER,
DEFAULT_ISMASTER_36: DEFAULT_ISMASTER_36
};
7 changes: 7 additions & 0 deletions test/tests/unit/common.js
Expand Up @@ -91,6 +91,13 @@ class ReplSetFixture {
}
}

/**
* Creates a cluster time for use in unit testing cluster time gossiping and
* causal consistency.
*
* @param {Number} time the logical time
* @returns a cluster time according to the driver sessions specification
*/
function genClusterTime(time) {
return {
clusterTime: new Timestamp(time),
Expand Down
47 changes: 47 additions & 0 deletions test/tests/unit/single/sessions_tests.js
Expand Up @@ -2,6 +2,7 @@
var Server = require('../../../../lib/topologies/server'),
Long = require('bson').Long,
ObjectId = require('bson').ObjectId,
Timestamp = require('bson').Timestamp,
expect = require('chai').expect,
assign = require('../../../../lib/utils').assign,
mock = require('../../../mock'),
Expand Down Expand Up @@ -578,4 +579,50 @@ describe('Sessions (Single)', function() {
client2.connect();
}
});

it('should track the highest `operationTime` seen, if causal consistency is enabled', {
metadata: { requires: { topology: 'single' } },
test: function(done) {
const client = new Server(test.server.address()),
sessionPool = new ServerSessionPool(client),
session = new ClientSession(client, sessionPool, { causalConsistency: true }),
insertOperationTime1 = Timestamp.fromNumber(Date.now()),
insertOperationTime2 = Timestamp.fromNumber(Date.now() + 10 * 60 * 1000);

let insertCount = 0;
test.server.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(mock.DEFAULT_ISMASTER_36);
} else if (doc.insert) {
request.reply({
ok: 1,
operationTime: insertCount === 0 ? insertOperationTime1 : insertOperationTime2
});

insertCount++;
}
});

client.on('error', done);
client.once('connect', () => {
client.insert('db.test', [{ a: 42 }], { session: session }, err => {
expect(err).to.not.exist;
expect(session.operationTime).to.exist;
expect(session.operationTime).to.eql(insertOperationTime1);

client.insert('db.test', [{ b: 52 }], { session: session }, err => {
expect(err).to.not.exist;
expect(session.operationTime).to.exist;
expect(session.operationTime).to.eql(insertOperationTime2);

client.destroy();
done();
});
});
});

client.connect();
}
});
});

0 comments on commit 8d512f1

Please sign in to comment.