From e888adb1b97fa76ff59fd495df4f28156bd21b8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Mon, 27 Jan 2020 16:25:42 -0500 Subject: [PATCH] Cleanup action task params objects after successful execution (#55227) (#56058) * Cleanup action task params saved objects after use * Fix jest tests * Add integration test to ensure object gets cleaned up * Add unit tests * Fix comment * Re-use updated_at instead of creating createdAt * Consider null/undefined returned from executor as success as well Co-authored-by: Elastic Machine Co-authored-by: Elastic Machine --- .../server/lib/action_executor.mock.ts | 2 +- .../server/lib/task_runner_factory.test.ts | 55 +++++++++++++++++++ .../actions/server/lib/task_runner_factory.ts | 17 ++++++ x-pack/plugins/actions/server/plugin.ts | 2 + .../common/fixtures/plugins/alerts/index.ts | 7 ++- .../common/lib/task_manager_utils.ts | 33 +++++++++++ .../tests/alerting/alerts.ts | 2 + .../spaces_only/tests/alerting/alerts_base.ts | 5 ++ 8 files changed, 119 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/actions/server/lib/action_executor.mock.ts b/x-pack/plugins/actions/server/lib/action_executor.mock.ts index 73e5e96ab24edb..b4419cd761bbe4 100644 --- a/x-pack/plugins/actions/server/lib/action_executor.mock.ts +++ b/x-pack/plugins/actions/server/lib/action_executor.mock.ts @@ -9,7 +9,7 @@ import { ActionExecutorContract } from './action_executor'; const createActionExecutorMock = () => { const mocked: jest.Mocked = { initialize: jest.fn(), - execute: jest.fn(), + execute: jest.fn().mockResolvedValue({ status: 'ok', actionId: '' }), }; return mocked; }; diff --git a/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts b/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts index 2246193057d0e5..8890de2483290c 100644 --- a/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts +++ b/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts @@ -63,13 +63,18 @@ const actionExecutorInitializerParams = { }; const taskRunnerFactoryInitializerParams = { spaceIdToNamespace, + logger: loggingServiceMock.create().get(), encryptedSavedObjectsPlugin: mockedEncryptedSavedObjectsPlugin, getBasePath: jest.fn().mockReturnValue(undefined), + getScopedSavedObjectsClient: jest.fn().mockReturnValue(services.savedObjectsClient), }; beforeEach(() => { jest.resetAllMocks(); actionExecutorInitializerParams.getServices.mockReturnValue(services); + taskRunnerFactoryInitializerParams.getScopedSavedObjectsClient.mockReturnValue( + services.savedObjectsClient + ); }); test(`throws an error if factory isn't initialized`, () => { @@ -135,6 +140,56 @@ test('executes the task by calling the executor with proper parameters', async ( }); }); +test('cleans up action_task_params object', async () => { + const taskRunner = taskRunnerFactory.create({ + taskInstance: mockedTaskInstance, + }); + + mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' }); + spaceIdToNamespace.mockReturnValueOnce('namespace-test'); + mockedEncryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '3', + type: 'action_task_params', + attributes: { + actionId: '2', + params: { baz: true }, + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + + await taskRunner.run(); + + expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3'); +}); + +test('runs successfully when cleanup fails and logs the error', async () => { + const taskRunner = taskRunnerFactory.create({ + taskInstance: mockedTaskInstance, + }); + + mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' }); + spaceIdToNamespace.mockReturnValueOnce('namespace-test'); + mockedEncryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '3', + type: 'action_task_params', + attributes: { + actionId: '2', + params: { baz: true }, + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + services.savedObjectsClient.delete.mockRejectedValueOnce(new Error('Fail')); + + await taskRunner.run(); + + expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3'); + expect(taskRunnerFactoryInitializerParams.logger.error).toHaveBeenCalledWith( + 'Failed to cleanup action_task_params object [id="3"]: Fail' + ); +}); + test('throws an error with suggested retry logic when return status is error', async () => { const taskRunner = taskRunnerFactory.create({ taskInstance: mockedTaskInstance, diff --git a/x-pack/plugins/actions/server/lib/task_runner_factory.ts b/x-pack/plugins/actions/server/lib/task_runner_factory.ts index 59da7bdfab318e..c3e89e0c16efcc 100644 --- a/x-pack/plugins/actions/server/lib/task_runner_factory.ts +++ b/x-pack/plugins/actions/server/lib/task_runner_factory.ts @@ -6,14 +6,17 @@ import { ActionExecutorContract } from './action_executor'; import { ExecutorError } from './executor_error'; +import { Logger, CoreStart } from '../../../../../src/core/server'; import { RunContext } from '../../../task_manager/server'; import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../encrypted_saved_objects/server'; import { ActionTaskParams, GetBasePathFunction, SpaceIdToNamespaceFunction } from '../types'; export interface TaskRunnerContext { + logger: Logger; encryptedSavedObjectsPlugin: EncryptedSavedObjectsStartContract; spaceIdToNamespace: SpaceIdToNamespaceFunction; getBasePath: GetBasePathFunction; + getScopedSavedObjectsClient: CoreStart['savedObjects']['getScopedClient']; } export class TaskRunnerFactory { @@ -40,9 +43,11 @@ export class TaskRunnerFactory { const { actionExecutor } = this; const { + logger, encryptedSavedObjectsPlugin, spaceIdToNamespace, getBasePath, + getScopedSavedObjectsClient, } = this.taskRunnerContext!; return { @@ -85,6 +90,7 @@ export class TaskRunnerFactory { actionId, request: fakeRequest, }); + if (executorResult.status === 'error') { // Task manager error handler only kicks in when an error thrown (at this time) // So what we have to do is throw when the return status is `error`. @@ -94,6 +100,17 @@ export class TaskRunnerFactory { executorResult.retry == null ? false : executorResult.retry ); } + + // Cleanup action_task_params object now that we're done with it + try { + const savedObjectsClient = getScopedSavedObjectsClient(fakeRequest); + await savedObjectsClient.delete('action_task_params', actionTaskParamsId); + } catch (e) { + // Log error only, we shouldn't fail the task because of an error here (if ever there's retry logic) + logger.error( + `Failed to cleanup action_task_params object [id="${actionTaskParamsId}"]: ${e.message}` + ); + } }, }; } diff --git a/x-pack/plugins/actions/server/plugin.ts b/x-pack/plugins/actions/server/plugin.ts index 6412593488cf8f..cb0e3347541fd7 100644 --- a/x-pack/plugins/actions/server/plugin.ts +++ b/x-pack/plugins/actions/server/plugin.ts @@ -191,9 +191,11 @@ export class ActionsPlugin implements Plugin, Plugi }); taskRunnerFactory!.initialize({ + logger, encryptedSavedObjectsPlugin: plugins.encryptedSavedObjects, getBasePath: this.getBasePath, spaceIdToNamespace: this.spaceIdToNamespace, + getScopedSavedObjectsClient: core.savedObjects.getScopedClient, }); return { diff --git a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/index.ts b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/index.ts index 9d019352ff5707..6c2a22f2737fe9 100644 --- a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/index.ts +++ b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/index.ts @@ -62,7 +62,7 @@ export default function(kibana: any) { encrypted: schema.string(), }), }, - async executor({ config, secrets, params, services }: ActionTypeExecutorOptions) { + async executor({ config, secrets, params, services, actionId }: ActionTypeExecutorOptions) { await services.callCluster('index', { index: params.index, refresh: 'wait_for', @@ -74,6 +74,7 @@ export default function(kibana: any) { source: 'action:test.index-record', }, }); + return { status: 'ok', actionId }; }, }; const failingActionType: ActionType = { @@ -141,7 +142,7 @@ export default function(kibana: any) { reference: schema.string(), }), }, - async executor({ params, services }: ActionTypeExecutorOptions) { + async executor({ params, services, actionId }: ActionTypeExecutorOptions) { // Call cluster let callClusterSuccess = false; let callClusterError; @@ -186,8 +187,8 @@ export default function(kibana: any) { }, }); return { + actionId, status: 'ok', - actionId: '', }; }, }; diff --git a/x-pack/test/alerting_api_integration/common/lib/task_manager_utils.ts b/x-pack/test/alerting_api_integration/common/lib/task_manager_utils.ts index b72960b162e76f..3a1d035a023c22 100644 --- a/x-pack/test/alerting_api_integration/common/lib/task_manager_utils.ts +++ b/x-pack/test/alerting_api_integration/common/lib/task_manager_utils.ts @@ -43,4 +43,37 @@ export class TaskManagerUtils { } }); } + + async waitForActionTaskParamsToBeCleanedUp(createdAtFilter: Date): Promise { + return await this.retry.try(async () => { + const searchResult = await this.es.search({ + index: '.kibana', + body: { + query: { + bool: { + must: [ + { + term: { + type: 'action_task_params', + }, + }, + { + range: { + updated_at: { + gte: createdAtFilter, + }, + }, + }, + ], + }, + }, + }, + }); + if (searchResult.hits.total.value) { + throw new Error( + `Expected 0 action_task_params objects but received ${searchResult.hits.total.value}` + ); + } + }); + } } diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/alerts.ts b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/alerts.ts index 08e6c90a1044ca..386ba0adf5aabb 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/alerts.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/alerts.ts @@ -147,6 +147,8 @@ export default function alertTests({ getService }: FtrProviderContext) { reference, source: 'action:test.index-record', }); + + await taskManagerUtils.waitForActionTaskParamsToBeCleanedUp(testStart); break; default: throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`); diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts_base.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts_base.ts index d9a58851afb31a..3c60d2779720a8 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts_base.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts_base.ts @@ -16,6 +16,7 @@ import { ObjectRemover, AlertUtils, ensureDatetimeIsWithinRange, + TaskManagerUtils, } from '../../../common/lib'; // eslint-disable-next-line import/no-default-export @@ -24,6 +25,7 @@ export function alertTests({ getService }: FtrProviderContext, space: Space) { const es = getService('legacyEs'); const retry = getService('retry'); const esTestIndexTool = new ESTestIndexTool(es, retry); + const taskManagerUtils = new TaskManagerUtils(es, retry); function getAlertingTaskById(taskId: string) { return supertestWithoutAuth @@ -73,6 +75,7 @@ export function alertTests({ getService }: FtrProviderContext, space: Space) { }); it('should schedule task, run alert and schedule actions', async () => { + const testStart = new Date(); const reference = alertUtils.generateReference(); const response = await alertUtils.createAlwaysFiringAction({ reference }); const alertId = response.body.id; @@ -121,6 +124,8 @@ export function alertTests({ getService }: FtrProviderContext, space: Space) { reference, source: 'action:test.index-record', }); + + await taskManagerUtils.waitForActionTaskParamsToBeCleanedUp(testStart); }); it('should reschedule failing alerts using the alerting interval and not the Task Manager retry logic', async () => {