From f584fb085ac6d7638dc6fd441667cb87eb585a90 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 14 Aug 2018 18:26:41 -0700 Subject: [PATCH 1/4] let _claimPendingJobs have a valid job when updating leads to version conflict --- .../reporting/server/lib/esqueue/worker.js | 61 +++++++++++-------- 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/x-pack/plugins/reporting/server/lib/esqueue/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/worker.js index 20d487c9128c75..c19dffcd71a2be 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/worker.js @@ -22,6 +22,22 @@ function formatJobObject(job) { }; } +function getLogger(opts, id, logLevel) { + return (msg, err) => { + const logger = opts.logger || function () {}; + + const message = `${id} - ${msg}`; + const tags = ['worker', logLevel]; + + if (err) { + logger(`${message}: ${err.stack ? err.stack : err }`, tags); + return; + } + + logger(message, tags); + }; +} + export class Worker extends events.EventEmitter { constructor(queue, type, workerFn, opts) { if (typeof type !== 'string') throw new Error('type must be a string'); @@ -40,19 +56,8 @@ export class Worker extends events.EventEmitter { this.checkSize = opts.size || 10; this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE; - this.debug = (msg, err) => { - const logger = opts.logger || function () {}; - - const message = `${this.id} - ${msg}`; - const tags = ['worker', 'debug']; - - if (err) { - logger(`${message}: ${err.stack ? err.stack : err }`, tags); - return; - } - - logger(message, tags); - }; + this.debug = getLogger(opts, this.id, 'debug'); + this.warn = getLogger(opts, this.id, 'warn'); this._running = true; this.debug(`Created worker for job type ${this.jobtype}`); @@ -136,15 +141,18 @@ export class Worker extends events.EventEmitter { return updatedJob; }) .catch((err) => { - if (err.statusCode === 409) return true; - this.debug(`_claimJob failed on job ${job._id}`, err); + if (err.statusCode === 409) { + this.warn(`_claimJob got version conflict when updating job ${job._id}`, err); + return true; + } + this.warn(`_claimJob failed on job ${job._id}`, err); this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job)); return false; }); } _failJob(job, output = false) { - this.debug(`Failing job ${job._id}`); + this.warn(`Failing job ${job._id}`); const completedTime = moment().toISOString(); const docOutput = this._formatOutput(output); @@ -170,7 +178,7 @@ export class Worker extends events.EventEmitter { .then(() => true) .catch((err) => { if (err.statusCode === 409) return true; - this.debug(`_failJob failed to update job ${job._id}`, err); + this.warn(`_failJob failed to update job ${job._id}`, err); this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job)); return false; }); @@ -215,7 +223,7 @@ export class Worker extends events.EventEmitter { if (isResolved) return; cancellationToken.cancel(); - this.debug(`Timeout processing job ${job._id}`); + this.warn(`Timeout processing job ${job._id}`); reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, { timeout: job._source.timeout, jobId: job._id, @@ -253,7 +261,7 @@ export class Worker extends events.EventEmitter { }) .catch((err) => { if (err.statusCode === 409) return false; - this.debug(`Failure saving job output ${job._id}`, err); + this.warn(`Failure saving job output ${job._id}`, err); this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job)); }); }, (jobErr) => { @@ -265,7 +273,7 @@ export class Worker extends events.EventEmitter { // job execution failed if (jobErr.name === 'WorkerTimeoutError') { - this.debug(`Timeout on job ${job._id}`); + this.warn(`Timeout on job ${job._id}`); this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job)); return; @@ -278,7 +286,7 @@ export class Worker extends events.EventEmitter { } } - this.debug(`Failure occurred on job ${job._id}`, jobErr); + this.warn(`Failure occurred on job ${job._id}`, jobErr); this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job)); return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false); }); @@ -316,6 +324,11 @@ export class Worker extends events.EventEmitter { return this._claimJob(job) .then((claimResult) => { + // could not update the job, use the previous object + if (claimResult === true) { + claimed = true; + return claimedJob; + } if (claimResult !== false) { claimed = true; return claimResult; @@ -325,14 +338,14 @@ export class Worker extends events.EventEmitter { }, Promise.resolve()) .then((claimedJob) => { if (!claimedJob) { - this.debug(`All ${jobs.length} jobs already claimed`); + this.debug(`All ${jobs.length} jobs already claimed`); // FIXME this means _claimJob failed, and an error was emitted return; } this.debug(`Claimed job ${claimedJob._id}`); return this._performJob(claimedJob); }) .catch((err) => { - this.debug('Error claiming jobs', err); + this.warn('Error claiming jobs', err); }); } @@ -384,7 +397,7 @@ export class Worker extends events.EventEmitter { // ignore missing indices errors if (err && err.status === 404) return []; - this.debug('job querying failed', err); + this.warn('job querying failed', err); this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err)); throw err; }); From 044902b1bbf10648533413f298c1dfe7d56ecff4 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 17 Aug 2018 17:34:58 -0700 Subject: [PATCH 2/4] change _claimJob to reject the promise instead of resolve to true/false --- .../server/lib/esqueue/__tests__/worker.js | 12 ++++++--- .../reporting/server/lib/esqueue/worker.js | 25 ++++++++----------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js index 188bdf92ed651d..5920f0f40bb156 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js @@ -494,18 +494,22 @@ describe('Worker class', function () { expect(msg).to.equal(false); }); - it('should return true on version errors', function () { + it('should reject the promise on version errors', function () { mockQueue.client.update.restore(); sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 })); return worker._claimJob(job) - .then((res) => expect(res).to.equal(true)); + .catch(err => { + expect(err).to.eql({ statusCode: 409 }); + }); }); - it('should return false on other errors', function () { + it('should reject the promise on other errors', function () { mockQueue.client.update.restore(); sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 })); return worker._claimJob(job) - .then((res) => expect(res).to.equal(false)); + .catch(err => { + expect(err).to.eql({ statusCode: 401 }); + }); }); it('should emit on other errors', function (done) { diff --git a/x-pack/plugins/reporting/server/lib/esqueue/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/worker.js index c19dffcd71a2be..500b3defaa8880 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/worker.js @@ -141,13 +141,8 @@ export class Worker extends events.EventEmitter { return updatedJob; }) .catch((err) => { - if (err.statusCode === 409) { - this.warn(`_claimJob got version conflict when updating job ${job._id}`, err); - return true; - } - this.warn(`_claimJob failed on job ${job._id}`, err); this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job)); - return false; + return Promise.reject(err); }); } @@ -324,21 +319,21 @@ export class Worker extends events.EventEmitter { return this._claimJob(job) .then((claimResult) => { - // could not update the job, use the previous object - if (claimResult === true) { - claimed = true; - return claimedJob; - } - if (claimResult !== false) { - claimed = true; - return claimResult; + claimed = true; + return claimResult; + }) + .catch((err) => { + if (err.statusCode === 409) { + this.warn(`_claimPendingJobs encountered a version conflict on updating pending job ${job._id}`, err); + return; // continue reducing and looking for a different job to claim } + return Promise.reject(err); }); }); }, Promise.resolve()) .then((claimedJob) => { if (!claimedJob) { - this.debug(`All ${jobs.length} jobs already claimed`); // FIXME this means _claimJob failed, and an error was emitted + this.debug(`Found no claimable jobs out of ${jobs.length} total`); return; } this.debug(`Claimed job ${claimedJob._id}`); From c759016c4b5791f2cd3874ec060f4e71e3b2a524 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 17 Aug 2018 19:32:03 -0700 Subject: [PATCH 3/4] add _claimPendingJobs tests --- .../server/lib/esqueue/__tests__/worker.js | 64 ++++++++++++++++++- .../reporting/server/lib/esqueue/worker.js | 5 +- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js index 5920f0f40bb156..d453b1b2502e0f 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js @@ -511,12 +511,60 @@ describe('Worker class', function () { expect(err).to.eql({ statusCode: 401 }); }); }); + }); + + describe('find a pending job to claim', function () { + let updateSpy; + let jobs; + + beforeEach(function () { + anchorMoment = moment(anchor); + clock = sinon.useFakeTimers(anchorMoment.valueOf()); + + updateSpy = sinon.spy(mockQueue.client, 'update'); + + jobs = [{ + _index: 'myIndex', + _type: 'test', + _id: 12345, + _version: 3, + found: true, + _source: { + jobtype: 'jobtype', + created_by: false, + payload: { id: 'sample-job-1', now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' }, + priority: 10, + timeout: 10000, + created_at: '2016-04-25T21:13:04.738Z', + attempts: 0, + max_attempts: 3, + status: 'pending', + }, + }]; + }); + + afterEach(() => { + clock.restore(); + }); - it('should emit on other errors', function (done) { + it('should use version on update', function () { + mockQueue.client.update.restore(); + return worker._claimPendingJobs(jobs) + .then(() => { + const [ job ] = jobs; + const query = updateSpy.firstCall.args[0]; + expect(query).to.have.property('index', job._index); + expect(query).to.have.property('type', job._type); + expect(query).to.have.property('id', job._id); + expect(query).to.have.property('version', job._version); + }); + }); + + it('should emit for errors from claiming job', function (done) { mockQueue.client.update.restore(); sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 })); - worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) { + worker.once(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) { try { expect(err).to.have.property('error'); expect(err).to.have.property('job'); @@ -527,7 +575,17 @@ describe('Worker class', function () { done(e); } }); - worker._claimJob(job); + + worker._claimPendingJobs(jobs); + }); + + it('should reject the promise if an error claiming the job', function () { + mockQueue.client.update.restore(); + sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 })); + return worker._claimPendingJobs(jobs) + .catch(err => { + expect(err).to.eql({ statusCode: 409 }); + }); }); }); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/worker.js index 500b3defaa8880..7f073657173653 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/worker.js @@ -139,10 +139,6 @@ export class Worker extends events.EventEmitter { ...doc }; return updatedJob; - }) - .catch((err) => { - this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job)); - return Promise.reject(err); }); } @@ -327,6 +323,7 @@ export class Worker extends events.EventEmitter { this.warn(`_claimPendingJobs encountered a version conflict on updating pending job ${job._id}`, err); return; // continue reducing and looking for a different job to claim } + this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job)); return Promise.reject(err); }); }); From 653c5b9d03f7f4c02cbbd1e067e65ced4b8a13af Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Thu, 23 Aug 2018 17:29:04 -0700 Subject: [PATCH 4/4] fix tests --- .../server/lib/esqueue/__tests__/worker.js | 79 +++++++++---------- .../reporting/server/lib/esqueue/worker.js | 1 + 2 files changed, 37 insertions(+), 43 deletions(-) diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js index d453b1b2502e0f..57698e81ff4370 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js @@ -7,7 +7,7 @@ import expect from 'expect.js'; import sinon from 'sinon'; import moment from 'moment'; -import { noop, random, get, find } from 'lodash'; +import { noop, random, get, find, identity } from 'lodash'; import { ClientMock } from './fixtures/elasticsearch'; import { QueueMock } from './fixtures/queue'; import { Worker } from '../worker'; @@ -514,54 +514,34 @@ describe('Worker class', function () { }); describe('find a pending job to claim', function () { - let updateSpy; - let jobs; + const getMockJobs = (status = 'pending') => ([{ + _index: 'myIndex', + _type: 'test', + _id: 12345, + _version: 3, + found: true, + _source: { + jobtype: 'jobtype', + created_by: false, + payload: { id: 'sample-job-1', now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' }, + priority: 10, + timeout: 10000, + created_at: '2016-04-25T21:13:04.738Z', + attempts: 0, + max_attempts: 3, + status + }, + }]); beforeEach(function () { - anchorMoment = moment(anchor); - clock = sinon.useFakeTimers(anchorMoment.valueOf()); - - updateSpy = sinon.spy(mockQueue.client, 'update'); - - jobs = [{ - _index: 'myIndex', - _type: 'test', - _id: 12345, - _version: 3, - found: true, - _source: { - jobtype: 'jobtype', - created_by: false, - payload: { id: 'sample-job-1', now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' }, - priority: 10, - timeout: 10000, - created_at: '2016-04-25T21:13:04.738Z', - attempts: 0, - max_attempts: 3, - status: 'pending', - }, - }]; + worker = new Worker(mockQueue, 'test', noop, defaultWorkerOptions); }); afterEach(() => { - clock.restore(); - }); - - it('should use version on update', function () { mockQueue.client.update.restore(); - return worker._claimPendingJobs(jobs) - .then(() => { - const [ job ] = jobs; - const query = updateSpy.firstCall.args[0]; - expect(query).to.have.property('index', job._index); - expect(query).to.have.property('type', job._type); - expect(query).to.have.property('id', job._id); - expect(query).to.have.property('version', job._version); - }); }); it('should emit for errors from claiming job', function (done) { - mockQueue.client.update.restore(); sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 })); worker.once(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) { @@ -576,17 +556,30 @@ describe('Worker class', function () { } }); - worker._claimPendingJobs(jobs); + worker._claimPendingJobs(getMockJobs()); }); it('should reject the promise if an error claiming the job', function () { - mockQueue.client.update.restore(); sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 })); - return worker._claimPendingJobs(jobs) + return worker._claimPendingJobs(getMockJobs()) .catch(err => { expect(err).to.eql({ statusCode: 409 }); }); }); + + it('should get the pending job', function () { + sinon.stub(mockQueue.client, 'update').returns(Promise.resolve({ test: 'cool' })); + sinon.stub(worker, '_performJob').callsFake(identity); + return worker._claimPendingJobs(getMockJobs()) + .then(claimedJob => { + expect(claimedJob._index).to.be('myIndex'); + expect(claimedJob._type).to.be('test'); + expect(claimedJob._source.jobtype).to.be('jobtype'); + expect(claimedJob._source.status).to.be('processing'); + expect(claimedJob.test).to.be('cool'); + worker._performJob.restore(); + }); + }); }); describe('failing a job', function () { diff --git a/x-pack/plugins/reporting/server/lib/esqueue/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/worker.js index 7f073657173653..02f9c054c2e3bd 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/worker.js @@ -338,6 +338,7 @@ export class Worker extends events.EventEmitter { }) .catch((err) => { this.warn('Error claiming jobs', err); + return Promise.reject(err); }); }