Skip to content

Commit

Permalink
fix: πŸ› subsribe to "abort" event in abort controller (#63199)
Browse files Browse the repository at this point in the history
* fix: πŸ› subsribe to "abort" event in abort controller

* test: πŸ’ add test for execution cancellation after completion

* feat: 🎸 use a more meaningful sentinel

* refactor: πŸ’‘ use toPromise() from data plugin

* chore: πŸ€– disable most new tests

* test: πŸ’ remove fake timers from abort tests

* test: πŸ’ remove new tests completely

There seems to be some async race condition in Jest test runner not
related to this PR, but for some reason Jest fails. Removing these tests
temporarily to check if this file cases Jest to fail.

* chore: πŸ€– try adding .catch clause to abortion promise

* chore: πŸ€– revert tests back and add .catch() comment
  • Loading branch information
streamich committed Apr 28, 2020
1 parent e09e6a2 commit fa219bc
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 19 deletions.
16 changes: 14 additions & 2 deletions src/plugins/data/common/utils/abort_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,31 @@ export class AbortError extends Error {
* Returns a `Promise` corresponding with when the given `AbortSignal` is aborted. Useful for
* situations when you might need to `Promise.race` multiple `AbortSignal`s, or an `AbortSignal`
* with any other expected errors (or completions).
*
* @param signal The `AbortSignal` to generate the `Promise` from
* @param shouldReject If `false`, the promise will be resolved, otherwise it will be rejected
*/
export function toPromise(signal: AbortSignal, shouldReject = false) {
return new Promise((resolve, reject) => {
export function toPromise(signal: AbortSignal, shouldReject?: false): Promise<undefined | Event>;
export function toPromise(signal: AbortSignal, shouldReject?: true): Promise<never>;
export function toPromise(signal: AbortSignal, shouldReject: boolean = false) {
const promise = new Promise((resolve, reject) => {
const action = shouldReject ? reject : resolve;
if (signal.aborted) action();
signal.addEventListener('abort', action);
});

/**
* Below is to make sure we don't have unhandled promise rejections. Otherwise
* Jest tests fail.
*/
promise.catch(() => {});

return promise;
}

/**
* Returns an `AbortSignal` that will be aborted when the first of the given signals aborts.
*
* @param signals
*/
export function getCombinedSignal(signals: AbortSignal[]) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { Execution } from './execution';
import { parseExpression } from '../ast';
import { createUnitTestExecutor } from '../test_helpers';

jest.useFakeTimers();

beforeEach(() => {
jest.clearAllTimers();
});

const createExecution = (
expression: string = 'foo bar=123',
context: Record<string, unknown> = {},
debug: boolean = false
) => {
const executor = createUnitTestExecutor();
const execution = new Execution({
executor,
ast: parseExpression(expression),
context,
debug,
});
return execution;
};

describe('Execution abortion tests', () => {
test('can abort an expression immediately', async () => {
const execution = createExecution('sleep 10');

execution.start();
execution.cancel();

const result = await execution.result;

expect(result).toMatchObject({
type: 'error',
error: {
message: 'The expression was aborted.',
name: 'AbortError',
},
});
});

test('can abort an expression which has function running mid flight', async () => {
const execution = createExecution('sleep 300');

execution.start();
jest.advanceTimersByTime(100);
execution.cancel();

const result = await execution.result;

expect(result).toMatchObject({
type: 'error',
error: {
message: 'The expression was aborted.',
name: 'AbortError',
},
});
});

test('cancelling execution after it completed has no effect', async () => {
jest.useRealTimers();

const execution = createExecution('sleep 1');

execution.start();

const result = await execution.result;

execution.cancel();

expect(result).toBe(null);

jest.useFakeTimers();
});
});
46 changes: 32 additions & 14 deletions src/plugins/expressions/common/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { Executor } from '../executor';
import { createExecutionContainer, ExecutionContainer } from './container';
import { createError } from '../util';
import { Defer, now } from '../../../kibana_utils/common';
import { AbortError } from '../../../data/common';
import { toPromise } from '../../../data/common/utils/abort_utils';
import { RequestAdapter, DataAdapter } from '../../../inspector/common';
import { isExpressionValueError, ExpressionValueError } from '../expression_types/specs/error';
import {
Expand All @@ -38,6 +38,12 @@ import { ArgumentType, ExpressionFunction } from '../expression_functions';
import { getByAlias } from '../util/get_by_alias';
import { ExecutionContract } from './execution_contract';

const createAbortErrorValue = () =>
createError({
message: 'The expression was aborted.',
name: 'AbortError',
});

export interface ExecutionParams<
ExtraContext extends Record<string, unknown> = Record<string, unknown>
> {
Expand Down Expand Up @@ -70,7 +76,7 @@ export class Execution<
/**
* Dynamic state of the execution.
*/
public readonly state: ExecutionContainer<Output>;
public readonly state: ExecutionContainer<Output | ExpressionValueError>;

/**
* Initial input of the execution.
Expand All @@ -91,6 +97,18 @@ export class Execution<
*/
private readonly abortController = new AbortController();

/**
* Promise that rejects if/when abort controller sends "abort" signal.
*/
private readonly abortRejection = toPromise(this.abortController.signal, true);

/**
* Races a given promise against the "abort" event of `abortController`.
*/
private race<T>(promise: Promise<T>): Promise<T> {
return Promise.race<T>([this.abortRejection, promise]);
}

/**
* Whether .start() method has been called.
*/
Expand All @@ -99,7 +117,7 @@ export class Execution<
/**
* Future that tracks result or error of this execution.
*/
private readonly firstResultFuture = new Defer<Output>();
private readonly firstResultFuture = new Defer<Output | ExpressionValueError>();

/**
* Contract is a public representation of `Execution` instances. Contract we
Expand All @@ -114,7 +132,7 @@ export class Execution<

public readonly expression: string;

public get result(): Promise<unknown> {
public get result(): Promise<Output | ExpressionValueError> {
return this.firstResultFuture.promise;
}

Expand All @@ -134,7 +152,7 @@ export class Execution<
this.expression = params.expression || formatExpression(params.ast!);
const ast = params.ast || parseExpression(this.expression);

this.state = createExecutionContainer<Output>({
this.state = createExecutionContainer<Output | ExpressionValueError>({
...executor.state.get(),
state: 'not-started',
ast,
Expand Down Expand Up @@ -173,7 +191,12 @@ export class Execution<
this.state.transitions.start();

const { resolve, reject } = this.firstResultFuture;
this.invokeChain(this.state.get().ast.chain, input).then(resolve, reject);
const chainPromise = this.invokeChain(this.state.get().ast.chain, input);

this.race(chainPromise).then(resolve, error => {
if (this.abortController.signal.aborted) resolve(createAbortErrorValue());
else reject(error);
});

this.firstResultFuture.promise.then(
result => {
Expand All @@ -189,11 +212,6 @@ export class Execution<
if (!chainArr.length) return input;

for (const link of chainArr) {
// if execution was aborted return error
if (this.context.abortSignal && this.context.abortSignal.aborted) {
return createError(new AbortError('The expression was aborted.'));
}

const { function: fnName, arguments: fnArgs } = link;
const fn = getByAlias(this.state.get().functions, fnName);

Expand All @@ -207,10 +225,10 @@ export class Execution<
try {
// `resolveArgs` returns an object because the arguments themselves might
// actually have a `then` function which would be treated as a `Promise`.
const { resolvedArgs } = await this.resolveArgs(fn, input, fnArgs);
const { resolvedArgs } = await this.race(this.resolveArgs(fn, input, fnArgs));
args = resolvedArgs;
timeStart = this.params.debug ? now() : 0;
const output = await this.invokeFunction(fn, input, resolvedArgs);
const output = await this.race(this.invokeFunction(fn, input, resolvedArgs));

if (this.params.debug) {
const timeEnd: number = now();
Expand Down Expand Up @@ -256,7 +274,7 @@ export class Execution<
args: Record<string, unknown>
): Promise<any> {
const normalizedInput = this.cast(input, fn.inputTypes);
const output = await fn.fn(normalizedInput, args, this.context);
const output = await this.race(fn.fn(normalizedInput, args, this.context));

// Validate that the function returned the type it said it would.
// This isn't required, but it keeps function developers honest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export type ExpressionValueError = ExpressionValueBoxed<
name?: string;
stack?: string;
};
info: unknown;
info?: unknown;
}
>;

Expand Down
6 changes: 4 additions & 2 deletions src/plugins/expressions/common/util/create_error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
* under the License.
*/

import { ExpressionValueError } from '../../public';

type ErrorLike = Partial<Pick<Error, 'name' | 'message' | 'stack'>>;

export const createError = (err: string | ErrorLike) => ({
export const createError = (err: string | ErrorLike): ExpressionValueError => ({
type: 'error',
error: {
stack:
Expand All @@ -28,7 +30,7 @@ export const createError = (err: string | ErrorLike) => ({
: typeof err === 'object'
? err.stack
: undefined,
message: typeof err === 'string' ? err : err.message,
message: typeof err === 'string' ? err : String(err.message),
name: typeof err === 'object' ? err.name || 'Error' : 'Error',
},
});

0 comments on commit fa219bc

Please sign in to comment.