Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup action task params objects after successful execution #55227

Merged
merged 15 commits into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/plugins/actions/server/lib/action_executor.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { ActionExecutorContract } from './action_executor';
const createActionExecutorMock = () => {
const mocked: jest.Mocked<ActionExecutorContract> = {
initialize: jest.fn(),
execute: jest.fn(),
execute: jest.fn().mockResolvedValue({ status: 'ok', actionId: '' }),
};
return mocked;
};
Expand Down
55 changes: 55 additions & 0 deletions x-pack/plugins/actions/server/lib/task_runner_factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ const actionExecutorInitializerParams = {
};
const taskRunnerFactoryInitializerParams = {
spaceIdToNamespace,
logger: loggingServiceMock.create().get(),
encryptedSavedObjectsPlugin: mockedEncryptedSavedObjectsPlugin,
getBasePath: jest.fn().mockReturnValue(undefined),
getScopedSavedObjectsClient: jest.fn().mockReturnValue(services.savedObjectsClient),
};

beforeEach(() => {
jest.resetAllMocks();
actionExecutorInitializerParams.getServices.mockReturnValue(services);
taskRunnerFactoryInitializerParams.getScopedSavedObjectsClient.mockReturnValue(
services.savedObjectsClient
);
});

test(`throws an error if factory isn't initialized`, () => {
Expand Down Expand Up @@ -135,6 +140,56 @@ test('executes the task by calling the executor with proper parameters', async (
});
});

test('cleans up action_task_params object', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
});

mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' });
spaceIdToNamespace.mockReturnValueOnce('namespace-test');
mockedEncryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [],
});

await taskRunner.run();

expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3');
});

test('runs successfully when cleanup fails and logs the error', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
});

mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' });
spaceIdToNamespace.mockReturnValueOnce('namespace-test');
mockedEncryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [],
});
services.savedObjectsClient.delete.mockRejectedValueOnce(new Error('Fail'));

await taskRunner.run();

expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3');
expect(taskRunnerFactoryInitializerParams.logger.error).toHaveBeenCalledWith(
'Failed to cleanup action_task_params object [id="3"]: Fail'
);
});

test('throws an error with suggested retry logic when return status is error', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
Expand Down
17 changes: 17 additions & 0 deletions x-pack/plugins/actions/server/lib/task_runner_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@

import { ActionExecutorContract } from './action_executor';
import { ExecutorError } from './executor_error';
import { Logger, CoreStart } from '../../../../../src/core/server';
import { RunContext } from '../../../task_manager/server';
import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../encrypted_saved_objects/server';
import { ActionTaskParams, GetBasePathFunction, SpaceIdToNamespaceFunction } from '../types';

export interface TaskRunnerContext {
logger: Logger;
encryptedSavedObjectsPlugin: EncryptedSavedObjectsStartContract;
spaceIdToNamespace: SpaceIdToNamespaceFunction;
getBasePath: GetBasePathFunction;
getScopedSavedObjectsClient: CoreStart['savedObjects']['getScopedClient'];
}

export class TaskRunnerFactory {
Expand All @@ -40,9 +43,11 @@ export class TaskRunnerFactory {

const { actionExecutor } = this;
const {
logger,
encryptedSavedObjectsPlugin,
spaceIdToNamespace,
getBasePath,
getScopedSavedObjectsClient,
} = this.taskRunnerContext!;

return {
Expand Down Expand Up @@ -85,6 +90,7 @@ export class TaskRunnerFactory {
actionId,
request: fakeRequest,
});

if (executorResult.status === 'error') {
// Task manager error handler only kicks in when an error thrown (at this time)
// So what we have to do is throw when the return status is `error`.
Expand All @@ -94,6 +100,17 @@ export class TaskRunnerFactory {
executorResult.retry == null ? false : executorResult.retry
);
}

// Cleanup action_task_params object now that we're done with it
try {
const savedObjectsClient = getScopedSavedObjectsClient(fakeRequest);
await savedObjectsClient.delete('action_task_params', actionTaskParamsId);
} catch (e) {
// Log error only, we shouldn't fail the task because of an error here (if ever there's retry logic)
logger.error(
`Failed to cleanup action_task_params object [id="${actionTaskParamsId}"]: ${e.message}`
);
}
},
};
}
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/actions/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,11 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
});

taskRunnerFactory!.initialize({
logger,
encryptedSavedObjectsPlugin: plugins.encryptedSavedObjects,
getBasePath: this.getBasePath,
spaceIdToNamespace: this.spaceIdToNamespace,
getScopedSavedObjectsClient: core.savedObjects.getScopedClient,
});

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export default function(kibana: any) {
encrypted: schema.string(),
}),
},
async executor({ config, secrets, params, services }: ActionTypeExecutorOptions) {
async executor({ config, secrets, params, services, actionId }: ActionTypeExecutorOptions) {
await services.callCluster('index', {
index: params.index,
refresh: 'wait_for',
Expand All @@ -74,6 +74,7 @@ export default function(kibana: any) {
source: 'action:test.index-record',
},
});
return { status: 'ok', actionId };
},
};
const failingActionType: ActionType = {
Expand Down Expand Up @@ -141,7 +142,7 @@ export default function(kibana: any) {
reference: schema.string(),
}),
},
async executor({ params, services }: ActionTypeExecutorOptions) {
async executor({ params, services, actionId }: ActionTypeExecutorOptions) {
// Call cluster
let callClusterSuccess = false;
let callClusterError;
Expand Down Expand Up @@ -186,8 +187,8 @@ export default function(kibana: any) {
},
});
return {
actionId,
status: 'ok',
actionId: '',
};
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,37 @@ export class TaskManagerUtils {
}
});
}

async waitForActionTaskParamsToBeCleanedUp(createdAtFilter: Date): Promise<void> {
return await this.retry.try(async () => {
const searchResult = await this.es.search({
index: '.kibana',
body: {
query: {
bool: {
must: [
{
term: {
type: 'action_task_params',
},
},
{
range: {
updated_at: {
gte: createdAtFilter,
},
},
},
],
},
},
},
});
if (searchResult.hits.total.value) {
throw new Error(
`Expected 0 action_task_params objects but received ${searchResult.hits.total.value}`
);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ export default function alertTests({ getService }: FtrProviderContext) {
reference,
source: 'action:test.index-record',
});

await taskManagerUtils.waitForActionTaskParamsToBeCleanedUp(testStart);
break;
default:
throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
ObjectRemover,
AlertUtils,
ensureDatetimeIsWithinRange,
TaskManagerUtils,
} from '../../../common/lib';

// eslint-disable-next-line import/no-default-export
Expand All @@ -24,6 +25,7 @@ export function alertTests({ getService }: FtrProviderContext, space: Space) {
const es = getService('legacyEs');
const retry = getService('retry');
const esTestIndexTool = new ESTestIndexTool(es, retry);
const taskManagerUtils = new TaskManagerUtils(es, retry);

function getAlertingTaskById(taskId: string) {
return supertestWithoutAuth
Expand Down Expand Up @@ -73,6 +75,7 @@ export function alertTests({ getService }: FtrProviderContext, space: Space) {
});

it('should schedule task, run alert and schedule actions', async () => {
const testStart = new Date();
const reference = alertUtils.generateReference();
const response = await alertUtils.createAlwaysFiringAction({ reference });
const alertId = response.body.id;
Expand Down Expand Up @@ -121,6 +124,8 @@ export function alertTests({ getService }: FtrProviderContext, space: Space) {
reference,
source: 'action:test.index-record',
});

await taskManagerUtils.waitForActionTaskParamsToBeCleanedUp(testStart);
});

it('should reschedule failing alerts using the alerting interval and not the Task Manager retry logic', async () => {
Expand Down