Skip to content

Commit

Permalink
Merge pull request #202 from agco/add-checkpoint-writer
Browse files Browse the repository at this point in the history
Add checkpoint writer
  • Loading branch information
waldemarnt committed Mar 24, 2017
2 parents cd4ba0f + bdf284d commit 36a9bff
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 42 deletions.
59 changes: 59 additions & 0 deletions lib/events-reader-checkpoint-writer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
'use strict'
const EventEmitter = require('events');
const debug = require('debug')('events-reader');
let writerLoopStopped = true;
let lastDoc;
let lastCheckpointId;
let harvesterApp;

const checkpointEventEmitter = new EventEmitter();

checkpointEventEmitter.on('newCheckpoint', (checkpointId, doc) => {
lastCheckpointId = checkpointId;
lastDoc = doc;
});

const persistLastCheckpoint = () => {
if(lastDoc && lastCheckpointId) {
harvesterApp.adapter.update('checkpoint', lastCheckpointId, {ts: lastDoc.ts})
.then(checkpoint => {
debug('last written checking point ' + checkpoint.ts);
})
.catch(error => {
console.log(error);
process.exit(1);
});
};
lastCheckpointId = undefined;
lastDoc = undefined;
};

const persistInInterval = (ms) => {
setInterval(() => {
persistLastCheckpoint();
}, ms);
};

const startWriterLoop = app => {
harvesterApp = app;
const defaultWriteInterval = 1;
const writeInterval = parseInt((harvesterApp.options && harvesterApp.options.eventsReaderDebounceWait) || defaultWriteInterval);
if(writerLoopStopped) {
persistInInterval(writeInterval);
writerLoopStopped = false;
};
};

const getLastCheckpointId = () => lastCheckpointId;

const getLastDoc = () => lastDoc;

const setWriterLoopStopped = shouldStop => writerLoopStopped = shouldStop;

module.exports = {
startWriterLoop: startWriterLoop,
checkpointEventEmitter: checkpointEventEmitter,
getLastCheckpointId: getLastCheckpointId,
getLastDoc: getLastDoc,
setWriterLoopStopped: setWriterLoopStopped
};
49 changes: 7 additions & 42 deletions lib/events-reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,16 @@ var _ = require('lodash'),
BSON = require('mongodb').BSONPure,
mongojs = require('mongojs'),
debounce = require('debounce-promise'),
hl = require('highland');
hl = require('highland'),
checkpointWriter = require('./events-reader-checkpoint-writer');
Joi = require('joi');

var log = require('agco-logger')({
logger: {
log: {
level: process.env.LOG_LEVEL || 'debug',
showLevel: process.env.LOG_SHOW_LEVEL || false,
showTimestamps: process.env.LOG_SHOW_TIMESTAMPS || false
}
}
});

module.exports = function (harvesterApp) {

return function (oplogMongodbUri, skip, success) {

checkpointWriter.startWriterLoop(harvesterApp);

var opMap = {
"i": "insert",
"u": "update",
Expand Down Expand Up @@ -143,15 +136,10 @@ module.exports = function (harvesterApp) {

return _.chain(changeHandlersPerResource)
.filter(function (changeHandler, resource) {
var regexProfileMessage = 'matchChangeHandler regex to cl ' + ns;
log.profile(regexProfileMessage);

var resourcePlural = inflect.pluralize(resource);
var regex = new RegExp('.*\\.' + resourcePlural + '$', 'i');
var testedRegex = regex.test(ns);

log.profile(regexProfileMessage);
return testedRegex;
return regex.test(ns);
})
.flatten()
.value();
Expand Down Expand Up @@ -202,14 +190,10 @@ module.exports = function (harvesterApp) {
function executeHandler(that, id, dfd, opFn, changeHandler, changeHandlerOp, doc) {
debug('processing resource op ' + changeHandlerOp);

var profileMessage = 'executeHandler ' + doc.ts + ' cl ' + doc.ns + ' op ' + changeHandlerOp;

log.profile(profileMessage);
new Promise(function (resolve) {
resolve(opFn(id));
})
.then(function () {
log.profile(profileMessage);
if (dfd) {
dfd.resolve(doc);
}
Expand Down Expand Up @@ -245,37 +229,18 @@ module.exports = function (harvesterApp) {
}
};

var checkpointUpdate = function(that,doc){
return harvesterApp.adapter.update('checkpoint', that.checkpoint.id, {ts: doc.ts})
};

var debouncedCheckpointUpdate = debounce(checkpointUpdate,parseInt(_.get(harvesterApp, 'options.eventsReaderDebounceWait'), 10) || 100,{leading: true});

EventsReader.prototype.updateCheckpointAndReschedule = function (doc) {
var that = this;
if (doc != null) {

var regexProfileMessage = 'checkpoint regex to cl ' + doc.ns + ' ts ' + doc.ts;
log.profile(regexProfileMessage);
var regexCheckpoint = new RegExp('.*\\.checkpoints$', 'i');
var matchCheckpoint = regexCheckpoint.test(doc.ns);
log.profile(regexProfileMessage);

if (!matchCheckpoint) {

var profileMessage = 'updateCheckpointAndReshedule ' + doc.ts;

debug('doc checkpoint ' + doc.ts, doc);
log.profile(profileMessage);
debug('updating checkpoint with ts: ' + logTs(doc.ts));
checkpointWriter.checkpointEventEmitter.emit('newCheckpoint', that.checkpoint.id, doc);

return Promise.resolve([that,doc])
.spread(debouncedCheckpointUpdate)
.then(function () {
log.profile(profileMessage);
that.reschedule(0);
});

that.reschedule(0);
} else {
that.reschedule(0);
}
Expand Down
113 changes: 113 additions & 0 deletions test/checkpoint-writer.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
'use strict'
const expect = require('chai').expect;
const Joi = require('joi');
const sinon = require('sinon');
const checkpointWriter = require('../lib/events-reader-checkpoint-writer');
const Promise = require('bluebird');

describe('checkpoint writer', function () {

describe('timeout', function () {
context('when using the default config options', () => {
const harvestApp = {
adapter: {
update: () => {}
}
};
const fakeDoc = {ts: 1};
const checkpointEventEmitter = checkpointWriter.checkpointEventEmitter;

let clock;

beforeEach(() => {
clock = sinon.useFakeTimers();
sinon.stub(harvestApp.adapter, 'update');
harvestApp.adapter.update.returns(new Promise.resolve(fakeDoc));
checkpointWriter.startWriterLoop(harvestApp);
checkpointWriter.setWriterLoopStopped(true);
checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc);
});

afterEach(() => {
harvestApp.adapter.update.restore();
clock.restore();
});

it('should clean last doc and checkpoint after handled', done => {
clock.tick(1);
expect(harvestApp.adapter.update.callCount).to.be.eql(1);
clock.tick(1);
expect(checkpointWriter.getLastDoc()).to.be.undefined;
expect(checkpointWriter.getLastCheckpointId()).to.be.undefined;
expect(harvestApp.adapter.update.calledOnce).to.be.true;

done();
});

it('should write a checkpoint in a given interval', done => {
clock.tick(1);
expect(harvestApp.adapter.update.callCount).to.be.eql(1);

checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc);
clock.tick(1);
expect(harvestApp.adapter.update.callCount).to.be.eql(2);

done();
});

it('should debounce excessive checkpoint update function calls', done => {
checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc);
checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc);
checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc);
checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc);
clock.tick(1);
expect(harvestApp.adapter.update.callCount).to.be.eql(1);

done();
});
});

context('when passing the option eventsReaderDebounceWait', () => {
const eventsReaderDebounceDelay = 1000;
const harvestApp = {
adapter: {
update: () => {}
},
options: {
eventsReaderDebounceWait: eventsReaderDebounceDelay
}
};
const fakeDoc = {ts: 1};
const checkpointEventEmitter = checkpointWriter.checkpointEventEmitter;

let clock;

beforeEach(() => {
clock = sinon.useFakeTimers();
sinon.stub(harvestApp.adapter, 'update');
harvestApp.adapter.update.returns(new Promise.resolve(fakeDoc));
checkpointWriter.startWriterLoop(harvestApp);
checkpointWriter.setWriterLoopStopped(true);
checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc);
});

afterEach(() => {
harvestApp.adapter.update.restore();
clock.restore();
});

it('should write a checkpoint in a given interval', done => {
clock.tick(eventsReaderDebounceDelay);
expect(harvestApp.adapter.update.callCount).to.be.eql(1);

checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc);
clock.tick(eventsReaderDebounceDelay);
expect(harvestApp.adapter.update.callCount).to.be.eql(2);

done();
});
});

});

});

0 comments on commit 36a9bff

Please sign in to comment.