Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Code] Cancel clone/update job in the middle if disk space over the watermark #42890

Merged
merged 7 commits into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion x-pack/legacy/plugins/code/model/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import { IndexRequest } from './search';
import { CancellationReason } from '../server/queue/cancellation_service';

export type RepositoryUri = string;

Expand Down Expand Up @@ -86,12 +87,13 @@ export enum FileTreeItemType {
export interface WorkerResult {
uri: string;
cancelled?: boolean;
cancelledReason?: CancellationReason;
}

// TODO(mengwei): create a AbstractGitWorkerResult since we now have an
// AbstractGitWorker now.
export interface CloneWorkerResult extends WorkerResult {
repo: Repository;
repo?: Repository;
}

export interface DeleteWorkerResult extends WorkerResult {
Expand Down
66 changes: 53 additions & 13 deletions x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import path from 'path';
import rimraf from 'rimraf';
import sinon from 'sinon';

import { Repository } from '../../model';
import { CloneWorkerResult, Repository } from '../../model';
import { DiskWatermarkService } from '../disk_watermark';
import { GitOperations } from '../git_operations';
import { EsClient, Esqueue } from '../lib/esqueue';
import { Logger } from '../log';
import { CloneWorker, IndexWorker } from '../queue';
import { CancellationSerivce } from '../queue/cancellation_service';
import { CancellationReason, CancellationSerivce } from '../queue/cancellation_service';
import { RepositoryServiceFactory } from '../repository_service_factory';
import { createTestServerOption, emptyAsyncFunc } from '../test_utils';
import { ConsoleLoggerFactory } from '../utils/console_logger_factory';
Expand Down Expand Up @@ -372,34 +372,34 @@ describe('clone_worker_tests', () => {
diskWatermarkService as DiskWatermarkService
);

const result1 = await cloneWorker.executeJob({
const result1 = (await cloneWorker.executeJob({
payload: {
url: 'file:///foo/bar.git',
},
options: {},
timestamp: 0,
});
})) as CloneWorkerResult;

assert.ok(result1.repo === null);
assert.ok(!result1.repo);
assert.ok(newInstanceSpy.notCalled);
assert.ok(cloneSpy.notCalled);
assert.ok(isLowWatermarkSpy.calledOnce);

const result2 = await cloneWorker.executeJob({
const result2 = (await cloneWorker.executeJob({
payload: {
url: '/foo/bar.git',
},
options: {},
timestamp: 0,
});
})) as CloneWorkerResult;

assert.ok(result2.repo === null);
assert.ok(!result2.repo);
assert.ok(newInstanceSpy.notCalled);
assert.ok(cloneSpy.notCalled);
assert.ok(isLowWatermarkSpy.calledTwice);
});

it('Execute clone job failed because of low disk watermark', async () => {
it('Execute clone job failed because of low available disk space', async () => {
// Setup RepositoryService
const cloneSpy = sinon.spy();
const repoService = {
Expand Down Expand Up @@ -428,34 +428,74 @@ describe('clone_worker_tests', () => {
const isLowWatermarkSpy = sinon.stub().resolves(true);
const diskWatermarkService: any = {
isLowWatermark: isLowWatermarkSpy,
diskWatermarkViolationMessage: sinon.stub().returns('No enough disk space'),
};

// Setup EsClient
const updateSpy = sinon.spy();
const esClient = {
update: emptyAsyncFunc,
};
esClient.update = updateSpy;

// Setup IndexWorker
const enqueueJobSpy = sinon.spy();
const indexWorker = {
enqueueJob: emptyAsyncFunc,
};
indexWorker.enqueueJob = enqueueJobSpy;

const cloneWorker = new CloneWorker(
esQueue as Esqueue,
log,
{} as EsClient,
esClient as EsClient,
serverOptions,
gitOps,
{} as IndexWorker,
(indexWorker as any) as IndexWorker,
(repoServiceFactory as any) as RepositoryServiceFactory,
cancellationService as CancellationSerivce,
diskWatermarkService as DiskWatermarkService
);

let res: CloneWorkerResult = { uri: 'github.com/Microsoft/TypeScript-Node-Starter' };
try {
await cloneWorker.executeJob({
res = (await cloneWorker.executeJob({
payload: {
url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git',
},
options: {},
timestamp: 0,
});
})) as CloneWorkerResult;
// This step should not be touched.
assert.ok(false);
} catch (error) {
assert.ok(isLowWatermarkSpy.calledOnce);
assert.ok(newInstanceSpy.notCalled);
assert.ok(cloneSpy.notCalled);
}

assert.ok(res.cancelled);
assert.ok(res.cancelledReason === CancellationReason.LOW_DISK_SPACE);

const onJobExecutionErrorSpy = sinon.spy();
cloneWorker.onJobExecutionError = onJobExecutionErrorSpy;

await cloneWorker.onJobCompleted(
{
payload: {
url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git',
},
options: {},
timestamp: 0,
},
res
);

assert.ok(onJobExecutionErrorSpy.calledOnce);
// Non of the follow up steps of a normal complete job should not be called
// because the job is going to be forwarded as execution error.
assert.ok(updateSpy.notCalled);
await delay(1000);
assert.ok(enqueueJobSpy.notCalled);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
*/

export class CancellationToken {
public on(callback: () => void): void;
public cancel(): void;
public on(callback: (reason: string) => void): void;
public cancel(reason: string): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ export class CancellationToken {
}

if (this.isCancelled) {
callback();
return;
}

this._callbacks.push(callback);
};

cancel = () => {
cancel = (reason) => {
this.isCancelled = true;
this._callbacks.forEach(callback => callback());
this._callbacks.forEach(callback => callback(reason));
};
}
34 changes: 23 additions & 11 deletions x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Logger } from '../log';
import { RepositoryObjectClient } from '../search';
import { ServerOptions } from '../server_options';
import { AbstractWorker } from './abstract_worker';
import { CancellationReason } from './cancellation_service';
import { Job } from './job';

export abstract class AbstractGitWorker extends AbstractWorker {
Expand All @@ -36,23 +37,21 @@ export abstract class AbstractGitWorker extends AbstractWorker {
this.objectClient = new RepositoryObjectClient(client);
}

public async executeJob(_: Job): Promise<WorkerResult> {
public async executeJob(job: Job): Promise<WorkerResult> {
const uri = job.payload.uri;
if (await this.watermarkService.isLowWatermark()) {
const msg = this.watermarkService.diskWatermarkViolationMessage();
this.log.error(msg);
throw new Error(msg);
// Return job result as cancelled.
return {
uri,
cancelled: true,
cancelledReason: CancellationReason.LOW_DISK_SPACE,
};
}

return new Promise<WorkerResult>((resolve, reject) => {
resolve();
});
return { uri };
}

public async onJobCompleted(job: Job, res: CloneWorkerResult) {
if (res.cancelled) {
// Skip updating job progress if the job is done because of cancellation.
return;
}
await super.onJobCompleted(job, res);

// Update the default branch.
Expand Down Expand Up @@ -108,4 +107,17 @@ export abstract class AbstractGitWorker extends AbstractWorker {
// this.log.warn(err);
}
}

protected async onJobCancelled(job: Job, reason?: CancellationReason) {
if (reason && reason === CancellationReason.LOW_DISK_SPACE) {
// If the clone/update job is cancelled because of the disk watermark, manually
// trigger onJobExecutionError.
const msg = this.watermarkService.diskWatermarkViolationMessage();
this.log.error(
'Git clone/update job completed because of low disk space. Move forward as error.'
);
const error = new Error(msg);
await this.onJobExecutionError({ job, error });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { CancellationToken } from '../lib/esqueue';

import sinon from 'sinon';

import { CancellationSerivce } from './cancellation_service';
import { CancellationReason, CancellationSerivce } from './cancellation_service';

afterEach(() => {
sinon.restore();
Expand All @@ -30,9 +30,9 @@ test('Register and cancel cancellation token', async () => {
const promise = new Promise(resolve => {
promiseResolve = resolve;
});
await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise);
await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise);
// do not wait on the promise, or there will be a dead lock
const cancelPromise = service.cancelIndexJob(repoUri);
const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN);
// resolve the promise now
promiseResolve();

Expand All @@ -57,10 +57,10 @@ test('Register and cancel cancellation token while an exception is thrown from t
const promise = new Promise((resolve, reject) => {
promiseReject = reject;
});
await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise);
await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise);
// expect no exceptions are thrown when cancelling the job
// do not wait on the promise, or there will be a dead lock
const cancelPromise = service.cancelIndexJob(repoUri);
const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN);
// reject the promise now
promiseReject();

Expand Down
28 changes: 19 additions & 9 deletions x-pack/legacy/plugins/code/server/queue/cancellation_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ interface CancellableJob {
jobPromise: Promise<any>;
}

export enum CancellationReason {
REPOSITORY_DELETE = 'Cancel job because of deleting the entire repository',
LOW_DISK_SPACE = 'Cancel job because of low available disk space',
NEW_JOB_OVERRIDEN = 'Cancel job because of a new job of the same type has been registered',
}

export class CancellationSerivce {
private cloneCancellationMap: Map<RepositoryUri, CancellableJob>;
private updateCancellationMap: Map<RepositoryUri, CancellableJob>;
Expand All @@ -23,16 +29,16 @@ export class CancellationSerivce {
this.indexCancellationMap = new Map<RepositoryUri, CancellableJob>();
}

public async cancelCloneJob(repoUri: RepositoryUri) {
await this.cancelJob(this.cloneCancellationMap, repoUri);
public async cancelCloneJob(repoUri: RepositoryUri, reason: CancellationReason) {
await this.cancelJob(this.cloneCancellationMap, repoUri, reason);
}

public async cancelUpdateJob(repoUri: RepositoryUri) {
await this.cancelJob(this.updateCancellationMap, repoUri);
public async cancelUpdateJob(repoUri: RepositoryUri, reason: CancellationReason) {
await this.cancelJob(this.updateCancellationMap, repoUri, reason);
}

public async cancelIndexJob(repoUri: RepositoryUri) {
await this.cancelJob(this.indexCancellationMap, repoUri);
public async cancelIndexJob(repoUri: RepositoryUri, reason: CancellationReason) {
await this.cancelJob(this.indexCancellationMap, repoUri, reason);
}

public async registerCancelableCloneJob(
Expand Down Expand Up @@ -66,20 +72,24 @@ export class CancellationSerivce {
jobPromise: Promise<any>
) {
// Try to cancel the job first.
await this.cancelJob(jobMap, repoUri);
await this.cancelJob(jobMap, repoUri, CancellationReason.NEW_JOB_OVERRIDEN);
jobMap.set(repoUri, { token, jobPromise });
// remove the record from the cancellation service when the promise is fulfilled or rejected.
jobPromise.finally(() => {
jobMap.delete(repoUri);
});
}

private async cancelJob(jobMap: Map<RepositoryUri, CancellableJob>, repoUri: RepositoryUri) {
private async cancelJob(
jobMap: Map<RepositoryUri, CancellableJob>,
repoUri: RepositoryUri,
reason: CancellationReason
) {
const payload = jobMap.get(repoUri);
if (payload) {
const { token, jobPromise } = payload;
// 1. Use the cancellation token to pass cancel message to job
token.cancel();
token.cancel(reason);
// 2. waiting on the actual job promise to be resolved
try {
await jobPromise;
Expand Down