-
Notifications
You must be signed in to change notification settings - Fork 8.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create task to cleanup action execution failures (#96971)
* Initial commit * Add tests and support for concurrency * Ability to disable functionality, use bulk APIs * Fix type check * Fix jest tests * Cleanup * Cleanup pt2 * Add unit tests * Fix type check * Fixes * Update test failures * Split schedule between cleanup and idle * Add functional tests * Add one more test * Cleanup repeated code * Remove duplicate actions plugin requirement Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
- Loading branch information
1 parent
82c425f
commit 0507ac5
Showing
32 changed files
with
1,247 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
126 changes: 126 additions & 0 deletions
126
x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import type { SavedObjectsFindResult, SavedObjectsSerializer } from 'kibana/server'; | ||
import { loggingSystemMock, elasticsearchServiceMock } from '../../../../../src/core/server/mocks'; | ||
import { spacesMock } from '../../../spaces/server/mocks'; | ||
import { CleanupTasksOpts, cleanupTasks } from './cleanup_tasks'; | ||
import { TaskInstance } from '../../../task_manager/server'; | ||
import { ApiResponse, estypes } from '@elastic/elasticsearch'; | ||
|
||
describe('cleanupTasks', () => { | ||
const logger = loggingSystemMock.create().get(); | ||
const esClient = elasticsearchServiceMock.createElasticsearchClient(); | ||
const spaces = spacesMock.createStart(); | ||
const savedObjectsSerializer = ({ | ||
generateRawId: jest | ||
.fn() | ||
.mockImplementation((namespace: string | undefined, type: string, id: string) => { | ||
const namespacePrefix = namespace ? `${namespace}:` : ''; | ||
return `${namespacePrefix}${type}:${id}`; | ||
}), | ||
} as unknown) as SavedObjectsSerializer; | ||
|
||
const cleanupTasksOpts: CleanupTasksOpts = { | ||
logger, | ||
esClient, | ||
spaces, | ||
savedObjectsSerializer, | ||
kibanaIndex: '.kibana', | ||
taskManagerIndex: '.kibana_task_manager', | ||
tasks: [], | ||
}; | ||
|
||
const taskSO: SavedObjectsFindResult<TaskInstance> = { | ||
id: '123', | ||
type: 'task', | ||
references: [], | ||
score: 0, | ||
attributes: { | ||
id: '123', | ||
taskType: 'foo', | ||
scheduledAt: new Date(), | ||
state: {}, | ||
runAt: new Date(), | ||
startedAt: new Date(), | ||
retryAt: new Date(), | ||
ownerId: '234', | ||
params: { spaceId: undefined, actionTaskParamsId: '123' }, | ||
schedule: { interval: '5m' }, | ||
}, | ||
}; | ||
|
||
beforeEach(() => { | ||
esClient.bulk.mockReset(); | ||
}); | ||
|
||
it('should skip cleanup when there are no tasks to cleanup', async () => { | ||
const result = await cleanupTasks(cleanupTasksOpts); | ||
expect(result).toEqual({ | ||
success: true, | ||
successCount: 0, | ||
failureCount: 0, | ||
}); | ||
expect(esClient.bulk).not.toHaveBeenCalled(); | ||
}); | ||
|
||
it('should delete action_task_params and task objects', async () => { | ||
esClient.bulk.mockResolvedValue(({ | ||
body: { items: [], errors: false, took: 1 }, | ||
} as unknown) as ApiResponse<estypes.BulkResponse, unknown>); | ||
const result = await cleanupTasks({ | ||
...cleanupTasksOpts, | ||
tasks: [taskSO], | ||
}); | ||
expect(esClient.bulk).toHaveBeenCalledWith({ | ||
body: [{ delete: { _index: cleanupTasksOpts.kibanaIndex, _id: 'action_task_params:123' } }], | ||
}); | ||
expect(esClient.bulk).toHaveBeenCalledWith({ | ||
body: [{ delete: { _index: cleanupTasksOpts.taskManagerIndex, _id: 'task:123' } }], | ||
}); | ||
expect(result).toEqual({ | ||
success: true, | ||
successCount: 1, | ||
failureCount: 0, | ||
}); | ||
}); | ||
|
||
it('should not delete the task if the action_task_params failed to delete', async () => { | ||
esClient.bulk.mockResolvedValue(({ | ||
body: { | ||
items: [ | ||
{ | ||
delete: { | ||
_index: cleanupTasksOpts.kibanaIndex, | ||
_id: 'action_task_params:123', | ||
status: 500, | ||
result: 'Failure', | ||
error: true, | ||
}, | ||
}, | ||
], | ||
errors: true, | ||
took: 1, | ||
}, | ||
} as unknown) as ApiResponse<estypes.BulkResponse, unknown>); | ||
const result = await cleanupTasks({ | ||
...cleanupTasksOpts, | ||
tasks: [taskSO], | ||
}); | ||
expect(esClient.bulk).toHaveBeenCalledWith({ | ||
body: [{ delete: { _index: cleanupTasksOpts.kibanaIndex, _id: 'action_task_params:123' } }], | ||
}); | ||
expect(esClient.bulk).not.toHaveBeenCalledWith({ | ||
body: [{ delete: { _index: cleanupTasksOpts.taskManagerIndex, _id: 'task:123' } }], | ||
}); | ||
expect(result).toEqual({ | ||
success: false, | ||
successCount: 0, | ||
failureCount: 1, | ||
}); | ||
}); | ||
}); |
109 changes: 109 additions & 0 deletions
109
x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import { | ||
Logger, | ||
ElasticsearchClient, | ||
SavedObjectsFindResult, | ||
SavedObjectsSerializer, | ||
} from 'kibana/server'; | ||
import { TaskInstance } from '../../../task_manager/server'; | ||
import { SpacesPluginStart } from '../../../spaces/server'; | ||
import { | ||
bulkDelete, | ||
extractBulkResponseDeleteFailures, | ||
getRawActionTaskParamsIdFromTask, | ||
} from './lib'; | ||
|
||
export interface CleanupTasksOpts { | ||
logger: Logger; | ||
esClient: ElasticsearchClient; | ||
tasks: Array<SavedObjectsFindResult<TaskInstance>>; | ||
spaces?: SpacesPluginStart; | ||
savedObjectsSerializer: SavedObjectsSerializer; | ||
kibanaIndex: string; | ||
taskManagerIndex: string; | ||
} | ||
|
||
export interface CleanupTasksResult { | ||
success: boolean; | ||
successCount: number; | ||
failureCount: number; | ||
} | ||
|
||
/** | ||
* Cleanup tasks | ||
* | ||
* This function receives action execution tasks that are in a failed state, removes | ||
* the linked "action_task_params" object first and then if successful, the task manager's task. | ||
*/ | ||
export async function cleanupTasks({ | ||
logger, | ||
esClient, | ||
tasks, | ||
spaces, | ||
savedObjectsSerializer, | ||
kibanaIndex, | ||
taskManagerIndex, | ||
}: CleanupTasksOpts): Promise<CleanupTasksResult> { | ||
const deserializedTasks = tasks.map((task) => ({ | ||
...task, | ||
attributes: { | ||
...task.attributes, | ||
params: | ||
typeof task.attributes.params === 'string' | ||
? JSON.parse(task.attributes.params) | ||
: task.attributes.params || {}, | ||
}, | ||
})); | ||
|
||
// Remove accumulated action task params | ||
const actionTaskParamIdsToDelete = deserializedTasks.map((task) => | ||
getRawActionTaskParamsIdFromTask({ task, spaces, savedObjectsSerializer }) | ||
); | ||
const actionTaskParamBulkDeleteResult = await bulkDelete( | ||
esClient, | ||
kibanaIndex, | ||
actionTaskParamIdsToDelete | ||
); | ||
const failedActionTaskParams = actionTaskParamBulkDeleteResult | ||
? extractBulkResponseDeleteFailures(actionTaskParamBulkDeleteResult) | ||
: []; | ||
if (failedActionTaskParams?.length) { | ||
logger.debug( | ||
`Failed to delete the following action_task_params [${JSON.stringify( | ||
failedActionTaskParams | ||
)}]` | ||
); | ||
} | ||
|
||
// Remove accumulated tasks | ||
const taskIdsToDelete = deserializedTasks | ||
.map((task) => { | ||
const rawId = getRawActionTaskParamsIdFromTask({ task, spaces, savedObjectsSerializer }); | ||
// Avoid removing tasks that failed to remove linked objects | ||
if (failedActionTaskParams?.find((item) => item._id === rawId)) { | ||
return null; | ||
} | ||
const rawTaskId = savedObjectsSerializer.generateRawId(undefined, 'task', task.id); | ||
return rawTaskId; | ||
}) | ||
.filter((id) => !!id) as string[]; | ||
const taskBulkDeleteResult = await bulkDelete(esClient, taskManagerIndex, taskIdsToDelete); | ||
const failedTasks = taskBulkDeleteResult | ||
? extractBulkResponseDeleteFailures(taskBulkDeleteResult) | ||
: []; | ||
if (failedTasks?.length) { | ||
logger.debug(`Failed to delete the following tasks [${JSON.stringify(failedTasks)}]`); | ||
} | ||
|
||
return { | ||
success: failedActionTaskParams?.length === 0 && failedTasks.length === 0, | ||
successCount: tasks.length - failedActionTaskParams.length - failedTasks.length, | ||
failureCount: failedActionTaskParams.length + failedTasks.length, | ||
}; | ||
} |
9 changes: 9 additions & 0 deletions
9
x-pack/plugins/actions/server/cleanup_failed_executions/constants.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
export const TASK_TYPE = 'cleanup_failed_action_executions'; | ||
export const TASK_ID = `Actions-${TASK_TYPE}`; |
55 changes: 55 additions & 0 deletions
55
x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import { schema } from '@kbn/config-schema'; | ||
import { ActionsConfig } from '../config'; | ||
import { ensureScheduled } from './ensure_scheduled'; | ||
import { taskManagerMock } from '../../../task_manager/server/mocks'; | ||
import { loggingSystemMock } from '../../../../../src/core/server/mocks'; | ||
|
||
describe('ensureScheduled', () => { | ||
const logger = loggingSystemMock.create().get(); | ||
const taskManager = taskManagerMock.createStart(); | ||
|
||
const config: ActionsConfig['cleanupFailedExecutionsTask'] = { | ||
enabled: true, | ||
cleanupInterval: schema.duration().validate('5m'), | ||
idleInterval: schema.duration().validate('1h'), | ||
pageSize: 100, | ||
}; | ||
|
||
beforeEach(() => jest.resetAllMocks()); | ||
|
||
it(`should call task manager's ensureScheduled function with proper params`, async () => { | ||
await ensureScheduled(taskManager, logger, config); | ||
expect(taskManager.ensureScheduled).toHaveBeenCalledTimes(1); | ||
expect(taskManager.ensureScheduled.mock.calls[0]).toMatchInlineSnapshot(` | ||
Array [ | ||
Object { | ||
"id": "Actions-cleanup_failed_action_executions", | ||
"params": Object {}, | ||
"schedule": Object { | ||
"interval": "5m", | ||
}, | ||
"state": Object { | ||
"runs": 0, | ||
"total_cleaned_up": 0, | ||
}, | ||
"taskType": "cleanup_failed_action_executions", | ||
}, | ||
] | ||
`); | ||
}); | ||
|
||
it('should log an error and not throw when ensureScheduled function throws', async () => { | ||
taskManager.ensureScheduled.mockRejectedValue(new Error('Fail')); | ||
await ensureScheduled(taskManager, logger, config); | ||
expect(logger.error).toHaveBeenCalledWith( | ||
'Error scheduling Actions-cleanup_failed_action_executions, received Fail' | ||
); | ||
}); | ||
}); |
34 changes: 34 additions & 0 deletions
34
x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import { Logger } from 'kibana/server'; | ||
import { TASK_ID, TASK_TYPE } from './constants'; | ||
import { ActionsConfig } from '../config'; | ||
import { TaskManagerStartContract, asInterval } from '../../../task_manager/server'; | ||
|
||
export async function ensureScheduled( | ||
taskManager: TaskManagerStartContract, | ||
logger: Logger, | ||
{ cleanupInterval }: ActionsConfig['cleanupFailedExecutionsTask'] | ||
) { | ||
try { | ||
await taskManager.ensureScheduled({ | ||
id: TASK_ID, | ||
taskType: TASK_TYPE, | ||
schedule: { | ||
interval: asInterval(cleanupInterval.asMilliseconds()), | ||
}, | ||
state: { | ||
runs: 0, | ||
total_cleaned_up: 0, | ||
}, | ||
params: {}, | ||
}); | ||
} catch (e) { | ||
logger.error(`Error scheduling ${TASK_ID}, received ${e.message}`); | ||
} | ||
} |
Oops, something went wrong.