Skip to content

Commit

Permalink
[Code] Cancel clone/update job in the middle if disk space over the w…
Browse files Browse the repository at this point in the history
…atermark (#42890)

* [Code] interrupt clone if disk usage goes above watermark during clone

* minor refactor

* add unit tests

* minor change

* fix type check

* minor fix

* fix test
  • Loading branch information
mw-ding committed Aug 12, 2019
1 parent 85699b1 commit 0891db0
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 88 deletions.
4 changes: 3 additions & 1 deletion x-pack/legacy/plugins/code/model/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import { IndexRequest } from './search';
import { CancellationReason } from '../server/queue/cancellation_service';

export type RepositoryUri = string;

Expand Down Expand Up @@ -86,12 +87,13 @@ export enum FileTreeItemType {
export interface WorkerResult {
uri: string;
cancelled?: boolean;
cancelledReason?: CancellationReason;
}

// TODO(mengwei): create a AbstractGitWorkerResult since we now have an
// AbstractGitWorker now.
export interface CloneWorkerResult extends WorkerResult {
repo: Repository;
repo?: Repository;
}

export interface DeleteWorkerResult extends WorkerResult {
Expand Down
66 changes: 53 additions & 13 deletions x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import path from 'path';
import rimraf from 'rimraf';
import sinon from 'sinon';

import { Repository } from '../../model';
import { CloneWorkerResult, Repository } from '../../model';
import { DiskWatermarkService } from '../disk_watermark';
import { GitOperations } from '../git_operations';
import { EsClient, Esqueue } from '../lib/esqueue';
import { Logger } from '../log';
import { CloneWorker, IndexWorker } from '../queue';
import { CancellationSerivce } from '../queue/cancellation_service';
import { CancellationReason, CancellationSerivce } from '../queue/cancellation_service';
import { RepositoryServiceFactory } from '../repository_service_factory';
import { createTestServerOption, emptyAsyncFunc } from '../test_utils';
import { ConsoleLoggerFactory } from '../utils/console_logger_factory';
Expand Down Expand Up @@ -372,34 +372,34 @@ describe('clone_worker_tests', () => {
diskWatermarkService as DiskWatermarkService
);

const result1 = await cloneWorker.executeJob({
const result1 = (await cloneWorker.executeJob({
payload: {
url: 'file:///foo/bar.git',
},
options: {},
timestamp: 0,
});
})) as CloneWorkerResult;

assert.ok(result1.repo === null);
assert.ok(!result1.repo);
assert.ok(newInstanceSpy.notCalled);
assert.ok(cloneSpy.notCalled);
assert.ok(isLowWatermarkSpy.calledOnce);

const result2 = await cloneWorker.executeJob({
const result2 = (await cloneWorker.executeJob({
payload: {
url: '/foo/bar.git',
},
options: {},
timestamp: 0,
});
})) as CloneWorkerResult;

assert.ok(result2.repo === null);
assert.ok(!result2.repo);
assert.ok(newInstanceSpy.notCalled);
assert.ok(cloneSpy.notCalled);
assert.ok(isLowWatermarkSpy.calledTwice);
});

it('Execute clone job failed because of low disk watermark', async () => {
it('Execute clone job failed because of low available disk space', async () => {
// Setup RepositoryService
const cloneSpy = sinon.spy();
const repoService = {
Expand Down Expand Up @@ -428,34 +428,74 @@ describe('clone_worker_tests', () => {
const isLowWatermarkSpy = sinon.stub().resolves(true);
const diskWatermarkService: any = {
isLowWatermark: isLowWatermarkSpy,
diskWatermarkViolationMessage: sinon.stub().returns('No enough disk space'),
};

// Setup EsClient
const updateSpy = sinon.spy();
const esClient = {
update: emptyAsyncFunc,
};
esClient.update = updateSpy;

// Setup IndexWorker
const enqueueJobSpy = sinon.spy();
const indexWorker = {
enqueueJob: emptyAsyncFunc,
};
indexWorker.enqueueJob = enqueueJobSpy;

const cloneWorker = new CloneWorker(
esQueue as Esqueue,
log,
{} as EsClient,
esClient as EsClient,
serverOptions,
gitOps,
{} as IndexWorker,
(indexWorker as any) as IndexWorker,
(repoServiceFactory as any) as RepositoryServiceFactory,
cancellationService as CancellationSerivce,
diskWatermarkService as DiskWatermarkService
);

let res: CloneWorkerResult = { uri: 'github.com/Microsoft/TypeScript-Node-Starter' };
try {
await cloneWorker.executeJob({
res = (await cloneWorker.executeJob({
payload: {
url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git',
},
options: {},
timestamp: 0,
});
})) as CloneWorkerResult;
// This step should not be touched.
assert.ok(false);
} catch (error) {
assert.ok(isLowWatermarkSpy.calledOnce);
assert.ok(newInstanceSpy.notCalled);
assert.ok(cloneSpy.notCalled);
}

assert.ok(res.cancelled);
assert.ok(res.cancelledReason === CancellationReason.LOW_DISK_SPACE);

const onJobExecutionErrorSpy = sinon.spy();
cloneWorker.onJobExecutionError = onJobExecutionErrorSpy;

await cloneWorker.onJobCompleted(
{
payload: {
url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git',
},
options: {},
timestamp: 0,
},
res
);

assert.ok(onJobExecutionErrorSpy.calledOnce);
// Non of the follow up steps of a normal complete job should not be called
// because the job is going to be forwarded as execution error.
assert.ok(updateSpy.notCalled);
await delay(1000);
assert.ok(enqueueJobSpy.notCalled);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
*/

export class CancellationToken {
public on(callback: () => void): void;
public cancel(): void;
public on(callback: (reason: string) => void): void;
public cancel(reason: string): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ export class CancellationToken {
}

if (this.isCancelled) {
callback();
return;
}

this._callbacks.push(callback);
};

cancel = () => {
cancel = (reason) => {
this.isCancelled = true;
this._callbacks.forEach(callback => callback());
this._callbacks.forEach(callback => callback(reason));
};
}
34 changes: 23 additions & 11 deletions x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Logger } from '../log';
import { RepositoryObjectClient } from '../search';
import { ServerOptions } from '../server_options';
import { AbstractWorker } from './abstract_worker';
import { CancellationReason } from './cancellation_service';
import { Job } from './job';

export abstract class AbstractGitWorker extends AbstractWorker {
Expand All @@ -36,23 +37,21 @@ export abstract class AbstractGitWorker extends AbstractWorker {
this.objectClient = new RepositoryObjectClient(client);
}

public async executeJob(_: Job): Promise<WorkerResult> {
public async executeJob(job: Job): Promise<WorkerResult> {
const uri = job.payload.uri;
if (await this.watermarkService.isLowWatermark()) {
const msg = this.watermarkService.diskWatermarkViolationMessage();
this.log.error(msg);
throw new Error(msg);
// Return job result as cancelled.
return {
uri,
cancelled: true,
cancelledReason: CancellationReason.LOW_DISK_SPACE,
};
}

return new Promise<WorkerResult>((resolve, reject) => {
resolve();
});
return { uri };
}

public async onJobCompleted(job: Job, res: CloneWorkerResult) {
if (res.cancelled) {
// Skip updating job progress if the job is done because of cancellation.
return;
}
await super.onJobCompleted(job, res);

// Update the default branch.
Expand Down Expand Up @@ -108,4 +107,17 @@ export abstract class AbstractGitWorker extends AbstractWorker {
// this.log.warn(err);
}
}

protected async onJobCancelled(job: Job, reason?: CancellationReason) {
if (reason && reason === CancellationReason.LOW_DISK_SPACE) {
// If the clone/update job is cancelled because of the disk watermark, manually
// trigger onJobExecutionError.
const msg = this.watermarkService.diskWatermarkViolationMessage();
this.log.error(
'Git clone/update job completed because of low disk space. Move forward as error.'
);
const error = new Error(msg);
await this.onJobExecutionError({ job, error });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { CancellationToken } from '../lib/esqueue';

import sinon from 'sinon';

import { CancellationSerivce } from './cancellation_service';
import { CancellationReason, CancellationSerivce } from './cancellation_service';

afterEach(() => {
sinon.restore();
Expand All @@ -30,9 +30,9 @@ test('Register and cancel cancellation token', async () => {
const promise = new Promise(resolve => {
promiseResolve = resolve;
});
await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise);
await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise);
// do not wait on the promise, or there will be a dead lock
const cancelPromise = service.cancelIndexJob(repoUri);
const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN);
// resolve the promise now
promiseResolve();

Expand All @@ -57,10 +57,10 @@ test('Register and cancel cancellation token while an exception is thrown from t
const promise = new Promise((resolve, reject) => {
promiseReject = reject;
});
await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise);
await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise);
// expect no exceptions are thrown when cancelling the job
// do not wait on the promise, or there will be a dead lock
const cancelPromise = service.cancelIndexJob(repoUri);
const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN);
// reject the promise now
promiseReject();

Expand Down
28 changes: 19 additions & 9 deletions x-pack/legacy/plugins/code/server/queue/cancellation_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ interface CancellableJob {
jobPromise: Promise<any>;
}

export enum CancellationReason {
REPOSITORY_DELETE = 'Cancel job because of deleting the entire repository',
LOW_DISK_SPACE = 'Cancel job because of low available disk space',
NEW_JOB_OVERRIDEN = 'Cancel job because of a new job of the same type has been registered',
}

export class CancellationSerivce {
private cloneCancellationMap: Map<RepositoryUri, CancellableJob>;
private updateCancellationMap: Map<RepositoryUri, CancellableJob>;
Expand All @@ -23,16 +29,16 @@ export class CancellationSerivce {
this.indexCancellationMap = new Map<RepositoryUri, CancellableJob>();
}

public async cancelCloneJob(repoUri: RepositoryUri) {
await this.cancelJob(this.cloneCancellationMap, repoUri);
public async cancelCloneJob(repoUri: RepositoryUri, reason: CancellationReason) {
await this.cancelJob(this.cloneCancellationMap, repoUri, reason);
}

public async cancelUpdateJob(repoUri: RepositoryUri) {
await this.cancelJob(this.updateCancellationMap, repoUri);
public async cancelUpdateJob(repoUri: RepositoryUri, reason: CancellationReason) {
await this.cancelJob(this.updateCancellationMap, repoUri, reason);
}

public async cancelIndexJob(repoUri: RepositoryUri) {
await this.cancelJob(this.indexCancellationMap, repoUri);
public async cancelIndexJob(repoUri: RepositoryUri, reason: CancellationReason) {
await this.cancelJob(this.indexCancellationMap, repoUri, reason);
}

public async registerCancelableCloneJob(
Expand Down Expand Up @@ -66,20 +72,24 @@ export class CancellationSerivce {
jobPromise: Promise<any>
) {
// Try to cancel the job first.
await this.cancelJob(jobMap, repoUri);
await this.cancelJob(jobMap, repoUri, CancellationReason.NEW_JOB_OVERRIDEN);
jobMap.set(repoUri, { token, jobPromise });
// remove the record from the cancellation service when the promise is fulfilled or rejected.
jobPromise.finally(() => {
jobMap.delete(repoUri);
});
}

private async cancelJob(jobMap: Map<RepositoryUri, CancellableJob>, repoUri: RepositoryUri) {
private async cancelJob(
jobMap: Map<RepositoryUri, CancellableJob>,
repoUri: RepositoryUri,
reason: CancellationReason
) {
const payload = jobMap.get(repoUri);
if (payload) {
const { token, jobPromise } = payload;
// 1. Use the cancellation token to pass cancel message to job
token.cancel();
token.cancel(reason);
// 2. waiting on the actual job promise to be resolved
try {
await jobPromise;
Expand Down

0 comments on commit 0891db0

Please sign in to comment.