diff --git a/lib/events-reader-checkpoint-writer.js b/lib/events-reader-checkpoint-writer.js new file mode 100644 index 000000000..019687641 --- /dev/null +++ b/lib/events-reader-checkpoint-writer.js @@ -0,0 +1,59 @@ +'use strict' +const EventEmitter = require('events'); +const debug = require('debug')('events-reader'); +let writerLoopStopped = true; +let lastDoc; +let lastCheckpointId; +let harvesterApp; + +const checkpointEventEmitter = new EventEmitter(); + +checkpointEventEmitter.on('newCheckpoint', (checkpointId, doc) => { + lastCheckpointId = checkpointId; + lastDoc = doc; +}); + +const persistLastCheckpoint = () => { + if(lastDoc && lastCheckpointId) { + harvesterApp.adapter.update('checkpoint', lastCheckpointId, {ts: lastDoc.ts}) + .then(checkpoint => { + debug('last written checking point ' + checkpoint.ts); + }) + .catch(error => { + console.log(error); + process.exit(1); + }); + }; + lastCheckpointId = undefined; + lastDoc = undefined; +}; + +const persistInInterval = (ms) => { + setInterval(() => { + persistLastCheckpoint(); + }, ms); +}; + +const startWriterLoop = app => { + harvesterApp = app; + const defaultWriteInterval = 1; + const writeInterval = parseInt((harvesterApp.options && harvesterApp.options.eventsReaderDebounceWait) || defaultWriteInterval); + if(writerLoopStopped) { + persistInInterval(writeInterval); + writerLoopStopped = false; + }; +}; + +const getLastCheckpointId = () => lastCheckpointId; + +const getLastDoc = () => lastDoc; + +const setWriterLoopStopped = shouldStop => writerLoopStopped = shouldStop; + +module.exports = { + startWriterLoop: startWriterLoop, + checkpointEventEmitter: checkpointEventEmitter, + getLastCheckpointId: getLastCheckpointId, + getLastDoc: getLastDoc, + setWriterLoopStopped: setWriterLoopStopped +}; diff --git a/lib/events-reader.js b/lib/events-reader.js index 89a132567..c09493900 100644 --- a/lib/events-reader.js +++ b/lib/events-reader.js @@ -5,23 +5,16 @@ var _ = require('lodash'), BSON = require('mongodb').BSONPure, mongojs = require('mongojs'), debounce = require('debounce-promise'), - hl = require('highland'); + hl = require('highland'), + checkpointWriter = require('./events-reader-checkpoint-writer'); Joi = require('joi'); -var log = require('agco-logger')({ - logger: { - log: { - level: process.env.LOG_LEVEL || 'debug', - showLevel: process.env.LOG_SHOW_LEVEL || false, - showTimestamps: process.env.LOG_SHOW_TIMESTAMPS || false - } - } -}); - module.exports = function (harvesterApp) { return function (oplogMongodbUri, skip, success) { + checkpointWriter.startWriterLoop(harvesterApp); + var opMap = { "i": "insert", "u": "update", @@ -143,15 +136,10 @@ module.exports = function (harvesterApp) { return _.chain(changeHandlersPerResource) .filter(function (changeHandler, resource) { - var regexProfileMessage = 'matchChangeHandler regex to cl ' + ns; - log.profile(regexProfileMessage); - var resourcePlural = inflect.pluralize(resource); var regex = new RegExp('.*\\.' + resourcePlural + '$', 'i'); - var testedRegex = regex.test(ns); - log.profile(regexProfileMessage); - return testedRegex; + return regex.test(ns); }) .flatten() .value(); @@ -202,14 +190,10 @@ module.exports = function (harvesterApp) { function executeHandler(that, id, dfd, opFn, changeHandler, changeHandlerOp, doc) { debug('processing resource op ' + changeHandlerOp); - var profileMessage = 'executeHandler ' + doc.ts + ' cl ' + doc.ns + ' op ' + changeHandlerOp; - - log.profile(profileMessage); new Promise(function (resolve) { resolve(opFn(id)); }) .then(function () { - log.profile(profileMessage); if (dfd) { dfd.resolve(doc); } @@ -245,37 +229,18 @@ module.exports = function (harvesterApp) { } }; - var checkpointUpdate = function(that,doc){ - return harvesterApp.adapter.update('checkpoint', that.checkpoint.id, {ts: doc.ts}) - }; - - var debouncedCheckpointUpdate = debounce(checkpointUpdate,parseInt(_.get(harvesterApp, 'options.eventsReaderDebounceWait'), 10) || 100,{leading: true}); - EventsReader.prototype.updateCheckpointAndReschedule = function (doc) { var that = this; if (doc != null) { - var regexProfileMessage = 'checkpoint regex to cl ' + doc.ns + ' ts ' + doc.ts; - log.profile(regexProfileMessage); var regexCheckpoint = new RegExp('.*\\.checkpoints$', 'i'); var matchCheckpoint = regexCheckpoint.test(doc.ns); - log.profile(regexProfileMessage); if (!matchCheckpoint) { - - var profileMessage = 'updateCheckpointAndReshedule ' + doc.ts; - - debug('doc checkpoint ' + doc.ts, doc); - log.profile(profileMessage); debug('updating checkpoint with ts: ' + logTs(doc.ts)); + checkpointWriter.checkpointEventEmitter.emit('newCheckpoint', that.checkpoint.id, doc); - return Promise.resolve([that,doc]) - .spread(debouncedCheckpointUpdate) - .then(function () { - log.profile(profileMessage); - that.reschedule(0); - }); - + that.reschedule(0); } else { that.reschedule(0); } diff --git a/test/checkpoint-writer.spec.js b/test/checkpoint-writer.spec.js new file mode 100644 index 000000000..d06aa852c --- /dev/null +++ b/test/checkpoint-writer.spec.js @@ -0,0 +1,113 @@ +'use strict' +const expect = require('chai').expect; +const Joi = require('joi'); +const sinon = require('sinon'); +const checkpointWriter = require('../lib/events-reader-checkpoint-writer'); +const Promise = require('bluebird'); + +describe('checkpoint writer', function () { + + describe('timeout', function () { + context('when using the default config options', () => { + const harvestApp = { + adapter: { + update: () => {} + } + }; + const fakeDoc = {ts: 1}; + const checkpointEventEmitter = checkpointWriter.checkpointEventEmitter; + + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + sinon.stub(harvestApp.adapter, 'update'); + harvestApp.adapter.update.returns(new Promise.resolve(fakeDoc)); + checkpointWriter.startWriterLoop(harvestApp); + checkpointWriter.setWriterLoopStopped(true); + checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc); + }); + + afterEach(() => { + harvestApp.adapter.update.restore(); + clock.restore(); + }); + + it('should clean last doc and checkpoint after handled', done => { + clock.tick(1); + expect(harvestApp.adapter.update.callCount).to.be.eql(1); + clock.tick(1); + expect(checkpointWriter.getLastDoc()).to.be.undefined; + expect(checkpointWriter.getLastCheckpointId()).to.be.undefined; + expect(harvestApp.adapter.update.calledOnce).to.be.true; + + done(); + }); + + it('should write a checkpoint in a given interval', done => { + clock.tick(1); + expect(harvestApp.adapter.update.callCount).to.be.eql(1); + + checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc); + clock.tick(1); + expect(harvestApp.adapter.update.callCount).to.be.eql(2); + + done(); + }); + + it('should debounce excessive checkpoint update function calls', done => { + checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc); + checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc); + checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc); + checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc); + clock.tick(1); + expect(harvestApp.adapter.update.callCount).to.be.eql(1); + + done(); + }); + }); + + context('when passing the option eventsReaderDebounceWait', () => { + const eventsReaderDebounceDelay = 1000; + const harvestApp = { + adapter: { + update: () => {} + }, + options: { + eventsReaderDebounceWait: eventsReaderDebounceDelay + } + }; + const fakeDoc = {ts: 1}; + const checkpointEventEmitter = checkpointWriter.checkpointEventEmitter; + + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + sinon.stub(harvestApp.adapter, 'update'); + harvestApp.adapter.update.returns(new Promise.resolve(fakeDoc)); + checkpointWriter.startWriterLoop(harvestApp); + checkpointWriter.setWriterLoopStopped(true); + checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc); + }); + + afterEach(() => { + harvestApp.adapter.update.restore(); + clock.restore(); + }); + + it('should write a checkpoint in a given interval', done => { + clock.tick(eventsReaderDebounceDelay); + expect(harvestApp.adapter.update.callCount).to.be.eql(1); + + checkpointEventEmitter.emit('newCheckpoint', 1, fakeDoc); + clock.tick(eventsReaderDebounceDelay); + expect(harvestApp.adapter.update.callCount).to.be.eql(2); + + done(); + }); + }); + + }); + +});