Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

let _claimPendingJobs have a valid job when updating leads to version conflict #21980

Merged
Merged
61 changes: 37 additions & 24 deletions x-pack/plugins/reporting/server/lib/esqueue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ function formatJobObject(job) {
};
}

function getLogger(opts, id, logLevel) {
return (msg, err) => {
const logger = opts.logger || function () {};

const message = `${id} - ${msg}`;
const tags = ['worker', logLevel];

if (err) {
logger(`${message}: ${err.stack ? err.stack : err }`, tags);
return;
}

logger(message, tags);
};
}

export class Worker extends events.EventEmitter {
constructor(queue, type, workerFn, opts) {
if (typeof type !== 'string') throw new Error('type must be a string');
Expand All @@ -40,19 +56,8 @@ export class Worker extends events.EventEmitter {
this.checkSize = opts.size || 10;
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;

this.debug = (msg, err) => {
const logger = opts.logger || function () {};

const message = `${this.id} - ${msg}`;
const tags = ['worker', 'debug'];

if (err) {
logger(`${message}: ${err.stack ? err.stack : err }`, tags);
return;
}

logger(message, tags);
};
this.debug = getLogger(opts, this.id, 'debug');
this.warn = getLogger(opts, this.id, 'warn');

this._running = true;
this.debug(`Created worker for job type ${this.jobtype}`);
Expand Down Expand Up @@ -136,15 +141,18 @@ export class Worker extends events.EventEmitter {
return updatedJob;
})
.catch((err) => {
if (err.statusCode === 409) return true;
this.debug(`_claimJob failed on job ${job._id}`, err);
if (err.statusCode === 409) {
this.warn(`_claimJob got version conflict when updating job ${job._id}`, err);
return true;
}
this.warn(`_claimJob failed on job ${job._id}`, err);
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job));
return false;
});
}

_failJob(job, output = false) {
this.debug(`Failing job ${job._id}`);
this.warn(`Failing job ${job._id}`);

const completedTime = moment().toISOString();
const docOutput = this._formatOutput(output);
Expand All @@ -170,7 +178,7 @@ export class Worker extends events.EventEmitter {
.then(() => true)
.catch((err) => {
if (err.statusCode === 409) return true;
this.debug(`_failJob failed to update job ${job._id}`, err);
this.warn(`_failJob failed to update job ${job._id}`, err);
this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job));
return false;
});
Expand Down Expand Up @@ -215,7 +223,7 @@ export class Worker extends events.EventEmitter {
if (isResolved) return;

cancellationToken.cancel();
this.debug(`Timeout processing job ${job._id}`);
this.warn(`Timeout processing job ${job._id}`);
reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, {
timeout: job._source.timeout,
jobId: job._id,
Expand Down Expand Up @@ -253,7 +261,7 @@ export class Worker extends events.EventEmitter {
})
.catch((err) => {
if (err.statusCode === 409) return false;
this.debug(`Failure saving job output ${job._id}`, err);
this.warn(`Failure saving job output ${job._id}`, err);
this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job));
});
}, (jobErr) => {
Expand All @@ -265,7 +273,7 @@ export class Worker extends events.EventEmitter {

// job execution failed
if (jobErr.name === 'WorkerTimeoutError') {
this.debug(`Timeout on job ${job._id}`);
this.warn(`Timeout on job ${job._id}`);
this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job));
return;

Expand All @@ -278,7 +286,7 @@ export class Worker extends events.EventEmitter {
}
}

this.debug(`Failure occurred on job ${job._id}`, jobErr);
this.warn(`Failure occurred on job ${job._id}`, jobErr);
this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job));
return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false);
});
Expand Down Expand Up @@ -316,6 +324,11 @@ export class Worker extends events.EventEmitter {

return this._claimJob(job)
.then((claimResult) => {
// could not update the job, use the previous object
if (claimResult === true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to think what the right course of action is here. It got a conflict, which indicates to me that some other process updated the job prior to this one. That probably means that another Kibana / es_queue instance is running the job, right? I'm not sure we actually want to proceed here.

If our tests are modifying the job out from under us, we should fix the tests, because I think in the real world, we want to treat conflicts here as if a competing Kibana process is running the report. @stacey-gammon maybe you can shed some light on this. I'm not really familiar enough with es_queue to know if I'm right / wrong here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that failing the job in case of conflict is probably the right thing to do. My understanding is that a conflict means another Kibana process has claimed the job and will be responsible for that report generation.

Ideally, _claimJob would always return a valid job object, and if it can't, it should throw an error. We'd log the error (not a debug log) and trust that the job will run on another instance, or that the user will see some messaging that something went wrong and re-try. I'm not familiar enough to know if those are safe assumptions, though. It would really help to add a unit test around _processPendingJobs to hit all the scenarios.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, _claimJob would always return a valid job object, and if it can't, it should throw an error. We'd log the error (not a debug log) and trust that the job will run on another instance, or that the user will see some messaging that something went wrong and re-try.

My thoughts as well, and this would make a natural flow when we add types.

It would really help to add a unit test around _processPendingJobs to hit all the scenarios.

Is this something feasible to add to this PR? Then we could maybe confidently change the return value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as long as the reviewers are on board, I will make that change of return types in this PR and add the unit test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Onboard. The whole true / false / object return type is a bit hard to follow, so this would be a nice improvement.

claimed = true;
return claimedJob;
}
if (claimResult !== false) {
claimed = true;
return claimResult;
Expand All @@ -325,14 +338,14 @@ export class Worker extends events.EventEmitter {
}, Promise.resolve())
.then((claimedJob) => {
if (!claimedJob) {
this.debug(`All ${jobs.length} jobs already claimed`);
this.debug(`All ${jobs.length} jobs already claimed`); // FIXME this means _claimJob failed, and an error was emitted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I'm not sure what the fix would be in this case. I think a debug message here is fine, as we've already logged errors / warnings prior to this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're probably right. I left this comment here so I would remember to bring it up. Looking at the code, if the claimedJob value is false, that means there was some error, but I don't see how that equates to "All jobs already claimed"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct debug message here is Found 0 out of ${jobs.length} claimable jobs

return;
}
this.debug(`Claimed job ${claimedJob._id}`);
return this._performJob(claimedJob);
})
.catch((err) => {
this.debug('Error claiming jobs', err);
this.warn('Error claiming jobs', err);
});
}

Expand Down Expand Up @@ -384,7 +397,7 @@ export class Worker extends events.EventEmitter {
// ignore missing indices errors
if (err && err.status === 404) return [];

this.debug('job querying failed', err);
this.warn('job querying failed', err);
this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err));
throw err;
});
Expand Down