From 0f50053adf56f884a4fe029cd60ef59fd377d744 Mon Sep 17 00:00:00 2001 From: Tim Sullivan Date: Mon, 22 Oct 2018 11:26:26 -0700 Subject: [PATCH] Revert "Core task manager (#23555)" This reverts commit 8b7c513b97bf5bd5f983557e970a490d5f0f4998. --- src/es_archiver/lib/indices/kibana_index.js | 17 +- src/server/config/schema.js | 2 - src/ui/ui_exports/ui_export_types/index.js | 4 - .../ui_export_types/task_definitions.js | 28 - x-pack/index.js | 2 - x-pack/plugins/task_manager/README.md | 306 ----------- x-pack/plugins/task_manager/index.js | 41 -- .../task_manager/lib/fill_pool.test.ts | 47 -- x-pack/plugins/task_manager/lib/fill_pool.ts | 41 -- .../task_manager/lib/intervals.test.ts | 52 -- x-pack/plugins/task_manager/lib/intervals.ts | 53 -- x-pack/plugins/task_manager/lib/logger.ts | 44 -- .../task_manager/lib/middleware.test.ts | 157 ------ x-pack/plugins/task_manager/lib/middleware.ts | 46 -- .../lib/sanitize_task_definitions.test.ts | 172 ------ .../lib/sanitize_task_definitions.ts | 49 -- x-pack/plugins/task_manager/package.json | 9 - x-pack/plugins/task_manager/task.ts | 234 -------- .../plugins/task_manager/task_manager.test.ts | 148 ----- x-pack/plugins/task_manager/task_manager.ts | 191 ------- .../plugins/task_manager/task_poller.test.ts | 135 ----- x-pack/plugins/task_manager/task_poller.ts | 95 ---- x-pack/plugins/task_manager/task_pool.test.ts | 201 ------- x-pack/plugins/task_manager/task_pool.ts | 109 ---- .../plugins/task_manager/task_runner.test.ts | 281 ---------- x-pack/plugins/task_manager/task_runner.ts | 234 -------- .../plugins/task_manager/task_store.test.ts | 519 ------------------ x-pack/plugins/task_manager/task_store.ts | 359 ------------ .../plugins/task_manager/test_utils/index.ts | 52 -- x-pack/scripts/functional_tests.js | 1 - x-pack/test/plugin_api_integration/config.js | 46 -- .../plugins/task_manager/index.js | 106 ---- .../plugins/task_manager/init_routes.js | 60 -- .../plugins/task_manager/package.json | 4 - .../test_suites/task_manager/index.js | 11 - .../task_manager/task_manager_integration.js | 163 ------ 36 files changed, 2 insertions(+), 4017 deletions(-) delete mode 100644 src/ui/ui_exports/ui_export_types/task_definitions.js delete mode 100644 x-pack/plugins/task_manager/README.md delete mode 100644 x-pack/plugins/task_manager/index.js delete mode 100644 x-pack/plugins/task_manager/lib/fill_pool.test.ts delete mode 100644 x-pack/plugins/task_manager/lib/fill_pool.ts delete mode 100644 x-pack/plugins/task_manager/lib/intervals.test.ts delete mode 100644 x-pack/plugins/task_manager/lib/intervals.ts delete mode 100644 x-pack/plugins/task_manager/lib/logger.ts delete mode 100644 x-pack/plugins/task_manager/lib/middleware.test.ts delete mode 100644 x-pack/plugins/task_manager/lib/middleware.ts delete mode 100644 x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts delete mode 100644 x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts delete mode 100644 x-pack/plugins/task_manager/package.json delete mode 100644 x-pack/plugins/task_manager/task.ts delete mode 100644 x-pack/plugins/task_manager/task_manager.test.ts delete mode 100644 x-pack/plugins/task_manager/task_manager.ts delete mode 100644 x-pack/plugins/task_manager/task_poller.test.ts delete mode 100644 x-pack/plugins/task_manager/task_poller.ts delete mode 100644 x-pack/plugins/task_manager/task_pool.test.ts delete mode 100644 x-pack/plugins/task_manager/task_pool.ts delete mode 100644 x-pack/plugins/task_manager/task_runner.test.ts delete mode 100644 x-pack/plugins/task_manager/task_runner.ts delete mode 100644 x-pack/plugins/task_manager/task_store.test.ts delete mode 100644 x-pack/plugins/task_manager/task_store.ts delete mode 100644 x-pack/plugins/task_manager/test_utils/index.ts delete mode 100644 x-pack/test/plugin_api_integration/config.js delete mode 100644 x-pack/test/plugin_api_integration/plugins/task_manager/index.js delete mode 100644 x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js delete mode 100644 x-pack/test/plugin_api_integration/plugins/task_manager/package.json delete mode 100644 x-pack/test/plugin_api_integration/test_suites/task_manager/index.js delete mode 100644 x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js diff --git a/src/es_archiver/lib/indices/kibana_index.js b/src/es_archiver/lib/indices/kibana_index.js index 159b2ddf7c70e9..d8a08b04872e7a 100644 --- a/src/es_archiver/lib/indices/kibana_index.js +++ b/src/es_archiver/lib/indices/kibana_index.js @@ -45,7 +45,8 @@ const buildUiExports = _.once(async () => { * Deletes all indices that start with `.kibana` */ export async function deleteKibanaIndices({ client, stats }) { - const indexNames = await fetchKibanaIndices(client); + const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' }); + const indexNames = kibanaIndices.map(x => x.index); if (!indexNames.length) { return; } @@ -101,17 +102,3 @@ async function loadElasticVersion() { const packageJson = await readFile(path.join(__dirname, '../../../../package.json')); return JSON.parse(packageJson).version; } - -/** - * Migrations mean that the Kibana index will look something like: - * .kibana, .kibana_1, .kibana_323, etc. This finds all indices starting - * with .kibana, then filters out any that aren't actually Kibana's core - * index (e.g. we don't want to remove .kibana_task_manager or the like). - * - * @param {string} index - */ -async function fetchKibanaIndices(client) { - const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' }); - const isKibanaIndex = (index) => (/^\.kibana[_]{0,1}[0-9]*$/).test(index); - return kibanaIndices.map(x => x.index).filter(isKibanaIndex); -} diff --git a/src/server/config/schema.js b/src/server/config/schema.js index 0bfb9c64e1c18e..ae14c03e8cfcdf 100644 --- a/src/server/config/schema.js +++ b/src/server/config/schema.js @@ -244,11 +244,9 @@ export default () => Joi.object({ }), profile: Joi.boolean().default(false) }).default(), - status: Joi.object({ allowAnonymous: Joi.boolean().default(false) }).default(), - map: Joi.object({ includeElasticMapsService: Joi.boolean().default(true), tilemap: tilemapSchema, diff --git a/src/ui/ui_exports/ui_export_types/index.js b/src/ui/ui_exports/ui_export_types/index.js index 6677966ff5ad28..9eb295bee75467 100644 --- a/src/ui/ui_exports/ui_export_types/index.js +++ b/src/ui/ui_exports/ui_export_types/index.js @@ -29,10 +29,6 @@ export { validations, } from './saved_object'; -export { - taskDefinitions -} from './task_definitions'; - export { app, apps, diff --git a/src/ui/ui_exports/ui_export_types/task_definitions.js b/src/ui/ui_exports/ui_export_types/task_definitions.js deleted file mode 100644 index 279583e5167994..00000000000000 --- a/src/ui/ui_exports/ui_export_types/task_definitions.js +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import { mergeAtType } from './reduce'; -import { alias, wrap, uniqueKeys } from './modify_reduce'; - -// How plugins define tasks that the task manager can run. -export const taskDefinitions = wrap( - alias('taskDefinitions'), - uniqueKeys(), - mergeAtType, -); diff --git a/x-pack/index.js b/x-pack/index.js index c2948b779ab207..be1ee31f7c2b29 100644 --- a/x-pack/index.js +++ b/x-pack/index.js @@ -27,7 +27,6 @@ import { notifications } from './plugins/notifications'; import { kueryAutocomplete } from './plugins/kuery_autocomplete'; import { canvas } from './plugins/canvas'; import { infra } from './plugins/infra'; -import { taskManager } from './plugins/task_manager'; module.exports = function (kibana) { return [ @@ -54,6 +53,5 @@ module.exports = function (kibana) { notifications(kibana), kueryAutocomplete(kibana), infra(kibana), - taskManager(kibana), ]; }; diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md deleted file mode 100644 index 8c6990f0e94cf7..00000000000000 --- a/x-pack/plugins/task_manager/README.md +++ /dev/null @@ -1,306 +0,0 @@ -# Kibana task manager - -The task manager is a generic system for running background tasks. It supports: - -- Single-run and recurring tasks -- Scheduling tasks to run after a specified datetime -- Basic retry logic -- Recovery of stalled tasks / timeouts -- Tracking task state across multiple runs -- Configuring the run-parameters for specific tasks -- Basic coordination to prevent the same task instance from running on more than one Kibana system at a time - -## Implementation details - -At a high-level, the task manager works like this: - -- Every `{poll_interval}` milliseconds, check the `{index}` for any tasks that need to be run: - - `runAt` is past - - `attempts` is less than the configured threshold -- Attempt to claim the task by using optimistic concurrency to set: - - status to `running` - - `runAt` to now + the timeout specified by the task -- Execute the task, if the previous claim succeeded -- If the task fails, increment the `attempts` count and reschedule it -- If the task succeeds: - - If it is recurring, store the result of the run, and reschedule - - If it is not recurring, remove it from the index - -## Pooling - -Each task manager instance runs tasks in a pool which ensures that at most N tasks are run at a time, where N is configurable. This prevents the system from running too many tasks at once in resource constrained environments. In addition to this, each individual task can also specify `numWorkers` which tells the system how many workers are consumed by a single running instance of a task. This effectively limits how many tasks of a given type can be run at once. - -For example, we may have a system with a `max_workers` of 10, but a super expensive task (such as `reporting`) which specifies a `numWorkers` of 10. In this case, `reporting` tasks will run one at a time. - -If a task specifies a higher `numWorkers` than the system supports, the system's `max_workers` setting will be substituted for it. - -## Config options - -The task_manager can be configured via `taskManager` config options (e.g. `taskManager.maxAttempts`): - -- `max_attempts` - How many times a failing task instance will be retried before it is never run again -- `poll_interval` - How often the background worker should check the task_manager index for more work -- `index` - The name of the index that the task_manager -- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10) -- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security. -- `override_num_workers`: An object of `taskType: number` that overrides the `num_workers` for tasks - - For example: `task_manager.override_num_workers.reporting: 2` would override the number of workers occupied by tasks of type `reporting` - - This allows sysadmins to tweak the operational performance of Kibana, allowing more or fewer tasks of a specific type to run simultaneously - -## Task definitions - -Plugins define tasks by calling the `registerTaskDefinitions` method on the `server.taskManager` object. - -A sample task can be found in the [x-pack/test/plugin_api_integration/plugins/task_manager](../../test/plugin_api_integration/plugins/task_manager/index.js) folder. - -```js -const { taskManager } = server; -taskManager.registerTaskDefinitions({ - // clusterMonitoring is the task type, and must be unique across the entire system - clusterMonitoring: { - // Human friendly name, used to represent this task in logs, UI, etc - title: 'Human friendly name', - - // Optional, human-friendly, more detailed description - description: 'Amazing!!', - - // Optional, how long, in minutes, the system should wait before - // a running instance of this task is considered to be timed out. - // This defaults to 5 minutes. - timeOut: '5m', - - // The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots, - // 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is - // overridden by the `override_num_workers` config value, if specified. - numWorkers: 2, - - // The createTaskRunner function / method returns an object that is responsible for - // performing the work of the task. context: { taskInstance, kbnServer }, is documented below. - createTaskRunner(context) { - return { - // Perform the work of the task. The return value should fit the TaskResult interface, documented - // below. Invalid return values will result in a logged warning. - async run() { - // Do some work - // Conditionally send some alerts - // Return some result or other... - }, - - // Optional, will be called if a running instance of this task times out, allowing the task - // to attempt to clean itself up. - async cancel() { - // Do whatever is required to cancel this task, such as killing any spawned processes - }, - }; - } - }, -}); -``` - -When Kibana attempts to claim and run a task instance, it looks its definition up, and executes its createTaskRunner's method, passing it a run context which looks like this: - -```js -{ - // An instance of the Kibana server object. - kbnServer, - - // The data associated with this instance of the task, with two properties being most notable: - // - // params: - // An object, specific to this task instance, used by the - // task to determine exactly what work should be performed. - // e.g. a cluster-monitoring task might have a `clusterName` - // property in here, but a movie-monitoring task might have - // a `directorName` property. - // - // state: - // The state returned from the previous run of this task instance. - // If this task instance has never succesfully run, this will - // be an empty object: {} - taskInstance, -} -``` - -## Task result - -The task runner's `run` method is expected to return a promise that resolves to undefined or to an object that looks like the following: -```js -{ - // Optional, if specified, this is used as the tasks' nextRun, overriding - // the default system scheduler. - runAt: "2020-07-24T17:34:35.272Z", - - // Optional, an error object, logged out as a warning. The pressence of this - // property indicates that the task did not succeed. - error: { message: 'Hrumph!' }, - - // Optional, this will be passed into the next run of the task, if - // this is a recurring task. - state: { - anything: 'goes here', - }, -} -``` - -Other return values will result in a warning, but the system should continue to work. - -## Task instances - -The task_manager module will store scheduled task instances in an index. This allows for recovery of failed tasks, coordination across Kibana clusters, persistence across Kibana reboots, etc. - -The data stored for a task instance looks something like this: - -```js -{ - // The type of task that will run this instance. - taskType: 'clusterMonitoring', - - // The next time this task instance should run. It is not guaranteed - // to run at this time, but it is guaranteed not to run earlier than - // this. - runAt: "2020-07-24T17:34:35.272Z", - - // Indicates that this is a recurring task. We currently only support - // 1 minute granularity. - interval: '5m', - - // How many times this task has been unsuccesfully attempted, - // this will be reset to 0 if the task ever succesfully completes. - // This is incremented if a task fails or times out. - attempts: 0, - - // Currently, this is either idle | running. It is used to - // coordinate which Kibana instance owns / is running a specific - // task instance. - status: 'idle', - - // The params specific to this task instance, which will be - // passed to the task when it runs, and will be used by the - // task to determine exactly what work should be performed. - // This is a JSON blob, and will be different per task type. - // e.g. a cluster-monitoring task might have a `clusterName` - // property in here, but a movie-monitoring task might have - // a `directorName` property. - params: '{ "task": "specific stuff here" }', - - // The result of the previous run of this task instance. This - // will be passed to the next run of the task, along with the - // params, and could be used by a task to do special logic If - // the task state changes (e.g. from green to red, or foo to bar) - // If there was no previous run (e.g. the instance has never succesfully - // completed, this will be an empty object.). This is a JSON blob, - // and will be different per task type. - state: '{ "status": "green" }', - - // An extension point for 3rd parties to build in security features on - // top of the task manager. For example, this might be the token of the user - // who scheduled this task. - userContext: 'the token of the user who scheduled this task', - - // An extension point for 3rd parties to build in security features on - // top of the task manager, and is expected to be the id of the user, if any, - // that scheduled this task. - user: '23lk3l42', - - // An application-specific designation, allowing different Kibana - // plugins / apps to query for only those tasks they care about. - scope: 'alerting', -} -``` - -## Programmatic access - -The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected. - -```js -const { taskManager } = server; -// Schedules a task. All properties are as documented in the previous -// storage section, except that here, params is an object, not a JSON -// string. -const task = await taskManager.schedule({ - taskType, - runAt, - interval, - params, - scope: 'my-fanci-app', -}); - -// Removes the specified task -await manager.remove({ id: task.id }); - -// Fetches tasks, supports pagination, via the search-after API: -// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html -// If scope is not specified, all tasks are returned, otherwise only tasks -// with the given scope are returned. -const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids'] }); - -// results look something like this: -{ - searchAfter: ['233322'], - // Tasks is an array of task instances - tasks: [{ - id: '3242342', - taskType: 'reporting', - // etc - }] -} -``` - -More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time. - -## Middleware - -The task manager exposes a middleware layer that allows modifying tasks before they are scheduled / persisted to the task manager index, and modifying tasks / the run context before a task is run. - -For example: - -```js -// In your plugin's init -server.taskManager.addMiddleware({ - async beforeSave({ taskInstance, ...opts }) { - console.log(`About to save a task of type ${taskInstance.taskType}`); - - return { - ...opts, - taskInstance: { - ...taskInstance, - params: { - ...taskInstance.params, - example: 'Added to params!', - }, - }, - }; - }, - - async beforeRun({ taskInstance, ...opts }) { - console.log(`About to run ${taskInstance.taskType} ${taskInstance.id}`); - const { example, ...taskWithoutExampleProp } = taskInstance; - - return { - ...opts, - taskInstance: taskWithoutExampleProp, - }; - }, -}); -``` - -## Limitations in v1.0 - -In v1, the system only understands 1 minute increments (e.g. '1m', '7m'). Tasks which need something more robust will need to specify their own "runAt" in their run method's return value. - -There is only a rudimentary mechanism for coordinating tasks and handling expired tasks. Tasks are considered expired if their runAt has arrived, and their status is still 'running'. - -There is no task history. Each run overwrites the previous run's state. One-time tasks are removed from the index upon completion regardless of success / failure. - -The task manager's public API is create / delete / list. Updates aren't directly supported, and listing should be scoped so that users only see their own tasks. - -## Testing - -- `node scripts/jest --testPathPattern=task_manager --watch` - -Integration tests can be run like so: - -``` -node scripts/functional_tests_server.js --config test/plugin_functional/config.js -node scripts/functional_test_runner --config test/plugin_functional/config.js --grep task_manager -``` diff --git a/x-pack/plugins/task_manager/index.js b/x-pack/plugins/task_manager/index.js deleted file mode 100644 index 821a454122b03c..00000000000000 --- a/x-pack/plugins/task_manager/index.js +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { TaskManager } from './task_manager'; - -export function taskManager(kibana) { - return new kibana.Plugin({ - id: 'task_manager', - require: ['kibana', 'elasticsearch', 'xpack_main'], - configPrefix: 'xpack.task_manager', - config(Joi) { - return Joi.object({ - enabled: Joi.boolean().default(true), - max_attempts: Joi.number() - .description('The maximum number of times a task will be attempted before being abandoned as failed') - .default(3), - poll_interval: Joi.number() - .description('How often, in milliseconds, the task manager will look for more work.') - .default(3000), - index: Joi.string() - .description('The name of the index used to store task information.') - .default('.kibana_task_manager'), - max_workers: Joi.number() - .description('The maximum number of tasks that this Kibana instance will run simultaneously.') - .default(10), - override_num_workers: Joi.object() - .pattern(/.*/, Joi.number().greater(0)) - .description('Customize the number of workers occupied by specific tasks (e.g. override_num_workers.reporting: 2)') - .default({}) - }).default(); - }, - preInit(server) { - const config = server.config(); - const taskManager = new TaskManager(this.kbnServer, server, config); - server.decorate('server', 'taskManager', taskManager); - }, - }); -} diff --git a/x-pack/plugins/task_manager/lib/fill_pool.test.ts b/x-pack/plugins/task_manager/lib/fill_pool.test.ts deleted file mode 100644 index 850b9da8943043..00000000000000 --- a/x-pack/plugins/task_manager/lib/fill_pool.test.ts +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import _ from 'lodash'; -import sinon from 'sinon'; -import { fillPool } from './fill_pool'; - -describe('fillPool', () => { - test('stops filling when there are no more tasks in the store', async () => { - const tasks = [[1, 2, 3], [4, 5]]; - let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; - const run = sinon.spy(() => true); - const converter = _.identity; - - await fillPool(run, fetchAvailableTasks, converter); - - expect(_.flattenDeep(run.args)).toEqual([1, 2, 3, 4, 5]); - }); - - test('stops filling when the pool has no more capacity', async () => { - const tasks = [[1, 2, 3], [4, 5]]; - let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; - const run = sinon.spy(() => false); - const converter = _.identity; - - await fillPool(run, fetchAvailableTasks, converter); - - expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]); - }); - - test('calls the converter on the records prior to running', async () => { - const tasks = [[1, 2, 3], [4, 5]]; - let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; - const run = sinon.spy(() => false); - const converter = (x: number) => x.toString(); - - await fillPool(run, fetchAvailableTasks, converter); - - expect(_.flattenDeep(run.args)).toEqual(['1', '2', '3']); - }); -}); diff --git a/x-pack/plugins/task_manager/lib/fill_pool.ts b/x-pack/plugins/task_manager/lib/fill_pool.ts deleted file mode 100644 index a266517c783bb5..00000000000000 --- a/x-pack/plugins/task_manager/lib/fill_pool.ts +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -type BatchRun = (tasks: T[]) => Promise; -type Fetcher = () => Promise; -type Converter = (t: T1) => T2; - -/** - * Given a function that runs a batch of tasks (e.g. taskPool.run), a function - * that fetches task records (e.g. store.fetchAvailableTasks), and a function - * that converts task records to the appropriate task runner, this function - * fills the pool with work. - * - * This is annoyingly general in order to simplify testing. - * - * @param run - a function that runs a batch of tasks (e.g. taskPool.run) - * @param fetchAvailableTasks - a function that fetches task records (e.g. store.fetchAvailableTasks) - * @param converter - a function that converts task records to the appropriate task runner - */ -export async function fillPool( - run: BatchRun, - fetchAvailableTasks: Fetcher, - converter: Converter -) { - while (true) { - const instances = await fetchAvailableTasks(); - - if (!instances.length) { - return; - } - - const tasks = instances.map(converter); - - if (!(await run(tasks))) { - return; - } - } -} diff --git a/x-pack/plugins/task_manager/lib/intervals.test.ts b/x-pack/plugins/task_manager/lib/intervals.test.ts deleted file mode 100644 index 5546be2aab8e9b..00000000000000 --- a/x-pack/plugins/task_manager/lib/intervals.test.ts +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import _ from 'lodash'; -import { assertValidInterval, intervalFromNow, minutesFromNow } from './intervals'; - -describe('taskIntervals', () => { - describe('assertValidInterval', () => { - test('it accepts intervals in the form `Nm`', () => { - expect(() => assertValidInterval(`${_.random(1000)}m`)).not.toThrow(); - }); - - test('it rejects intervals are not of the form `Nm`', () => { - expect(() => assertValidInterval(`5m 2s`)).toThrow( - /Invalid interval "5m 2s"\. Intervals must be of the form {numbrer}m. Example: 5m/ - ); - expect(() => assertValidInterval(`hello`)).toThrow( - /Invalid interval "hello"\. Intervals must be of the form {numbrer}m. Example: 5m/ - ); - }); - }); - - describe('intervalFromNow', () => { - test('it returns the current date plus n minutes', () => { - const mins = _.random(1, 100); - const expected = Date.now() + mins * 60 * 1000; - const nextRun = intervalFromNow(`${mins}m`)!.getTime(); - expect(Math.abs(nextRun - expected)).toBeLessThan(100); - }); - - test('it rejects intervals are not of the form `Nm`', () => { - expect(() => intervalFromNow(`5m 2s`)).toThrow( - /Invalid interval "5m 2s"\. Intervals must be of the form {numbrer}m. Example: 5m/ - ); - expect(() => intervalFromNow(`hello`)).toThrow( - /Invalid interval "hello"\. Intervals must be of the form {numbrer}m. Example: 5m/ - ); - }); - }); - - describe('minutesFromNow', () => { - test('it returns the current date plus a number of minutes', () => { - const mins = _.random(1, 100); - const expected = Date.now() + mins * 60 * 1000; - const nextRun = minutesFromNow(mins).getTime(); - expect(Math.abs(nextRun - expected)).toBeLessThan(100); - }); - }); -}); diff --git a/x-pack/plugins/task_manager/lib/intervals.ts b/x-pack/plugins/task_manager/lib/intervals.ts deleted file mode 100644 index a4f8a14af0767e..00000000000000 --- a/x-pack/plugins/task_manager/lib/intervals.ts +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -/** - * Returns a date that is the specified interval from now. Currently, - * only minute-intervals are supported. - * - * @param {string} interval - An interval of the form `Nm` such as `5m` - */ -export function intervalFromNow(interval?: string): Date | undefined { - if (interval === undefined) { - return; - } - - assertValidInterval(interval); - - return minutesFromNow(parseInterval(interval)); -} - -/** - * Returns a date that is mins minutes from now. - * - * @param mins The number of mintues from now - */ -export function minutesFromNow(mins: number): Date { - const now = new Date(); - - now.setMinutes(now.getMinutes() + mins); - - return now; -} - -/** - * Verifies that the specified interval matches our expected format. - * - * @param {string} interval - An interval such as `5m` - */ -export function assertValidInterval(interval: string) { - if (/^[0-9]+m$/.test(interval)) { - return interval; - } - - throw new Error( - `Invalid interval "${interval}". Intervals must be of the form {numbrer}m. Example: 5m.` - ); -} - -function parseInterval(interval: string) { - return parseInt(interval, 10); -} diff --git a/x-pack/plugins/task_manager/lib/logger.ts b/x-pack/plugins/task_manager/lib/logger.ts deleted file mode 100644 index 932acaa15de23a..00000000000000 --- a/x-pack/plugins/task_manager/lib/logger.ts +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -export type LogFn = (prefix: string[], msg: string) => void; - -type SimpleLogFn = (msg: string) => void; - -export interface Logger { - error: SimpleLogFn; - warning: SimpleLogFn; - debug: SimpleLogFn; - info: SimpleLogFn; -} - -export class TaskManagerLogger implements Logger { - private write: LogFn; - - constructor(log: LogFn) { - this.write = log; - } - - public error(msg: string) { - this.log('error', msg); - } - - public warning(msg: string) { - this.log('warning', msg); - } - - public debug(msg: string) { - this.log('debug', msg); - } - - public info(msg: string) { - this.log('info', msg); - } - - private log(type: string, msg: string) { - this.write([type, 'task_manager'], msg); - } -} diff --git a/x-pack/plugins/task_manager/lib/middleware.test.ts b/x-pack/plugins/task_manager/lib/middleware.test.ts deleted file mode 100644 index 358e401d9b319a..00000000000000 --- a/x-pack/plugins/task_manager/lib/middleware.test.ts +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import moment from 'moment'; -import { ConcreteTaskInstance, RunContext, TaskInstance, TaskStatus } from '../task'; -import { addMiddlewareToChain } from './middleware'; - -interface BeforeSaveOpts { - taskInstance: TaskInstance; -} - -const getMockTaskInstance = () => ({ - taskType: 'nice_task', - params: { abc: 'def' }, -}); -const getMockConcreteTaskInstance = () => { - const concrete: { - id: string; - version: number; - attempts: number; - status: TaskStatus; - runAt: Date; - state: any; - taskType: string; - params: any; - } = { - id: 'hy8o99o83', - version: 1, - attempts: 0, - status: 'idle', - runAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()), - state: {}, - taskType: 'nice_task', - params: { abc: 'def' }, - }; - return concrete; -}; -const getMockRunContext = (runTask: ConcreteTaskInstance) => ({ - taskInstance: runTask, - kbnServer: {}, -}); - -const defaultBeforeSave = async (opts: BeforeSaveOpts) => { - return opts; -}; - -const defaultBeforeRun = async (opts: RunContext) => { - return opts; -}; - -describe('addMiddlewareToChain', () => { - it('chains the beforeSave functions', () => { - const m1 = { - beforeSave: async (opts: BeforeSaveOpts) => { - Object.assign(opts.taskInstance.params, { m1: true }); - return opts; - }, - beforeRun: defaultBeforeRun, - }; - const m2 = { - beforeSave: async (opts: BeforeSaveOpts) => { - Object.assign(opts.taskInstance.params, { m2: true }); - return opts; - }, - beforeRun: defaultBeforeRun, - }; - const m3 = { - beforeSave: async (opts: BeforeSaveOpts) => { - Object.assign(opts.taskInstance.params, { m3: true }); - return opts; - }, - beforeRun: defaultBeforeRun, - }; - - let middlewareChain; - middlewareChain = addMiddlewareToChain(m1, m2); - middlewareChain = addMiddlewareToChain(middlewareChain, m3); - - middlewareChain.beforeSave({ taskInstance: getMockTaskInstance() }).then(saveOpts => { - expect(saveOpts).toMatchInlineSnapshot(` -Object { - "taskInstance": Object { - "params": Object { - "abc": "def", - "m1": true, - "m2": true, - "m3": true, - }, - "taskType": "nice_task", - }, -} -`); - }); - }); - - it('chains the beforeRun functions', () => { - const m1 = { - beforeSave: defaultBeforeSave, - beforeRun: async (opts: RunContext) => { - return { - ...opts, - m1: true, - }; - }, - }; - const m2 = { - beforeSave: defaultBeforeSave, - beforeRun: async (opts: RunContext) => { - return { - ...opts, - m2: true, - }; - }, - }; - const m3 = { - beforeSave: defaultBeforeSave, - beforeRun: async (opts: RunContext) => { - return { - ...opts, - m3: true, - }; - }, - }; - - let middlewareChain; - middlewareChain = addMiddlewareToChain(m1, m2); - middlewareChain = addMiddlewareToChain(middlewareChain, m3); - - middlewareChain - .beforeRun(getMockRunContext(getMockConcreteTaskInstance())) - .then(contextOpts => { - expect(contextOpts).toMatchInlineSnapshot(` -Object { - "kbnServer": Object {}, - "m1": true, - "m2": true, - "m3": true, - "taskInstance": Object { - "attempts": 0, - "id": "hy8o99o83", - "params": Object { - "abc": "def", - }, - "runAt": 2018-09-18T05:33:09.588Z, - "state": Object {}, - "status": "idle", - "taskType": "nice_task", - "version": 1, - }, -} -`); - }); - }); -}); diff --git a/x-pack/plugins/task_manager/lib/middleware.ts b/x-pack/plugins/task_manager/lib/middleware.ts deleted file mode 100644 index d81b76fda7e50e..00000000000000 --- a/x-pack/plugins/task_manager/lib/middleware.ts +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { RunContext, TaskInstance } from '../task'; - -/* - * BeforeSaveMiddlewareParams is nearly identical to RunContext, but - * taskInstance is before save (no _id property) - * - * taskInstance property is guaranteed to exist. The params can optionally - * include fields from an "options" object passed as the 2nd parameter to - * taskManager.schedule() - */ -export interface BeforeSaveMiddlewareParams { - taskInstance: TaskInstance; -} - -export type BeforeSaveFunction = ( - params: BeforeSaveMiddlewareParams -) => Promise; - -export type BeforeRunFunction = (params: RunContext) => Promise; - -export interface Middleware { - beforeSave: BeforeSaveFunction; - beforeRun: BeforeRunFunction; -} - -export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Middleware) { - const beforeSave = middleware.beforeSave - ? (params: BeforeSaveMiddlewareParams) => - middleware.beforeSave(params).then(prevMiddleware.beforeSave) - : prevMiddleware.beforeSave; - - const beforeRun = middleware.beforeRun - ? (params: RunContext) => middleware.beforeRun(params).then(prevMiddleware.beforeRun) - : prevMiddleware.beforeRun; - - return { - beforeSave, - beforeRun, - }; -} diff --git a/x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts deleted file mode 100644 index 6e9120861b2121..00000000000000 --- a/x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { get } from 'lodash'; -import { RunContext } from '../task'; -import { sanitizeTaskDefinitions } from './sanitize_task_definitions'; - -interface Opts { - numTasks: number; - numWorkers?: number; -} - -const getMockTaskDefinitions = (opts: Opts) => { - const { numTasks, numWorkers } = opts; - const tasks: any = {}; - - for (let i = 0; i < numTasks; i++) { - const type = `test_task_type_${i}`; - tasks[type] = { - type, - title: 'Test', - description: 'one super cool task', - numWorkers: numWorkers ? numWorkers : 1, - createTaskRunner(context: RunContext) { - const incre = get(context, 'taskInstance.state.incre', -1); - return { - run: () => ({ - state: { - incre: incre + 1, - }, - runAt: Date.now(), - }), - }; - }, - }; - } - return tasks; -}; - -describe('sanitizeTaskDefinitions', () => { - it('provides tasks with defaults if there are no overrides', () => { - const maxWorkers = 10; - const overrideNumWorkers = {}; - const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 }); - const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); - - expect(result).toMatchInlineSnapshot(` -Object { - "test_task_type_0": Object { - "createTaskRunner": [Function], - "description": "one super cool task", - "numWorkers": 1, - "timeOut": "5m", - "title": "Test", - "type": "test_task_type_0", - }, - "test_task_type_1": Object { - "createTaskRunner": [Function], - "description": "one super cool task", - "numWorkers": 1, - "timeOut": "5m", - "title": "Test", - "type": "test_task_type_1", - }, - "test_task_type_2": Object { - "createTaskRunner": [Function], - "description": "one super cool task", - "numWorkers": 1, - "timeOut": "5m", - "title": "Test", - "type": "test_task_type_2", - }, -} -`); - }); - - it('scales down task definitions workers if larger than max workers', () => { - const maxWorkers = 2; - const overrideNumWorkers = {}; - const taskDefinitions = getMockTaskDefinitions({ numTasks: 2, numWorkers: 5 }); - const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); - - expect(result).toMatchInlineSnapshot(` -Object { - "test_task_type_0": Object { - "createTaskRunner": [Function], - "description": "one super cool task", - "numWorkers": 2, - "timeOut": "5m", - "title": "Test", - "type": "test_task_type_0", - }, - "test_task_type_1": Object { - "createTaskRunner": [Function], - "description": "one super cool task", - "numWorkers": 2, - "timeOut": "5m", - "title": "Test", - "type": "test_task_type_1", - }, -} -`); - }); - - it('incorporates overrideNumWorkers to give certain type an override of number of workers', () => { - const overrideNumWorkers = { - test_task_type_0: 5, - test_task_type_1: 2, - }; - const maxWorkers = 5; - const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 }); - const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); - - expect(result).toMatchInlineSnapshot(` -Object { - "test_task_type_0": Object { - "createTaskRunner": [Function], - "description": "one super cool task", - "numWorkers": 5, - "timeOut": "5m", - "title": "Test", - "type": "test_task_type_0", - }, - "test_task_type_1": Object { - "createTaskRunner": [Function], - "description": "one super cool task", - "numWorkers": 2, - "timeOut": "5m", - "title": "Test", - "type": "test_task_type_1", - }, - "test_task_type_2": Object { - "createTaskRunner": [Function], - "description": "one super cool task", - "numWorkers": 1, - "timeOut": "5m", - "title": "Test", - "type": "test_task_type_2", - }, -} -`); - }); - - it('throws a validation exception for invalid task definition', () => { - const runsanitize = () => { - const maxWorkers = 10; - const overrideNumWorkers = {}; - const taskDefinitions = { - some_kind_of_task: { - fail: 'extremely', // cause a validation failure - type: 'breaky_task', - title: 'Test XYZ', - description: `Actually this won't work`, - createTaskRunner() { - return { - async run() { - return {}; - }, - }; - }, - }, - }; - - return sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); - }; - - expect(runsanitize).toThrowError(); - }); -}); diff --git a/x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts deleted file mode 100644 index 07d7cdd0df4078..00000000000000 --- a/x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import Joi from 'joi'; -import { - SanitizedTaskDefinition, - TaskDefinition, - TaskDictionary, - validateTaskDefinition, -} from '../task'; - -/** - * Sanitizes the system's task definitions. Task definitions have optional properties, and - * this ensures they all are given a reasonable default. This also overrides certain task - * definition properties with kibana.yml overrides (such as the `override_num_workers` config - * value). - * - * @param maxWorkers - The maxiumum numer of workers allowed to run at once - * @param taskDefinitions - The Kibana task definitions dictionary - * @param overrideNumWorkers - The kibana.yml overrides numWorkers per task type. - */ -export function sanitizeTaskDefinitions( - taskDefinitions: TaskDictionary = {}, - maxWorkers: number, - overrideNumWorkers: { [taskType: string]: number } -): TaskDictionary { - return Object.keys(taskDefinitions).reduce( - (acc, type) => { - const rawDefinition = taskDefinitions[type]; - rawDefinition.type = type; - const definition = Joi.attempt(rawDefinition, validateTaskDefinition) as TaskDefinition; - const numWorkers = Math.min( - maxWorkers, - overrideNumWorkers[definition.type] || definition.numWorkers || 1 - ); - - acc[type] = { - ...definition, - numWorkers, - }; - - return acc; - }, - {} as TaskDictionary - ); -} diff --git a/x-pack/plugins/task_manager/package.json b/x-pack/plugins/task_manager/package.json deleted file mode 100644 index 43b706b730bb77..00000000000000 --- a/x-pack/plugins/task_manager/package.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "name": "task_manager", - "version": "kibana", - "config": { - "@elastic/eslint-import-resolver-kibana": { - "projectRoot": false - } - } -} \ No newline at end of file diff --git a/x-pack/plugins/task_manager/task.ts b/x-pack/plugins/task_manager/task.ts deleted file mode 100644 index 66f7305d3585eb..00000000000000 --- a/x-pack/plugins/task_manager/task.ts +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import Joi from 'joi'; - -/* - * Type definitions and validations for tasks. - */ - -/** - * A loosely typed definition of the elasticjs wrapper. It's beyond the scope - * of this work to try to make a comprehensive type definition of this. - */ -export type ElasticJs = (action: string, args: any) => Promise; - -/** - * The run context is passed into a task's run function as its sole argument. - */ -export interface RunContext { - /** - * The Kibana server object. This gives tasks full-access to the server object, - * including the various ES options client functions - */ - kbnServer: object; - - /** - * The document describing the task instance, its params, state, id, etc. - */ - taskInstance: ConcreteTaskInstance; -} - -/** - * The return value of a task's run function should be a promise of RunResult. - */ -export interface RunResult { - /** - * Specifies the next run date / time for this task. If unspecified, this is - * treated as a single-run task, and will not be rescheduled after - * completion. - */ - runAt?: Date; - - /** - * If specified, indicates that the task failed to accomplish its work. This is - * logged out as a warning, and the task will be reattempted after a delay. - */ - error?: object; - - /** - * The state which will be passed to the next run of this task (if this is a - * recurring task). See the RunContext type definition for more details. - */ - state?: object; -} - -export const validateRunResult = Joi.object({ - runAt: Joi.date().optional(), - error: Joi.object().optional(), - state: Joi.object().optional(), -}).optional(); - -export type RunFunction = () => Promise; - -export type CancelFunction = () => Promise; - -export interface CancellableTask { - run: RunFunction; - cancel?: CancelFunction; -} - -export type TaskRunCreatorFunction = (context: RunContext) => CancellableTask; - -/** - * Defines a task which can be scheduled and run by the Kibana - * task manager. - */ -export interface TaskDefinition { - /** - * A unique identifier for the type of task being defined. - */ - type: string; - - /** - * A brief, human-friendly title for this task. - */ - title: string; - - /** - * An optional more detailed description of what this task does. - */ - description?: string; - - /** - * How long, in minutes, the system should wait for the task to complete - * before it is considered to be timed out. (e.g. '5m', the default). If - * the task takes longer than this, Kibana will send it a kill command and - * the task will be re-attempted. - */ - timeOut?: string; - - /** - * The numer of workers / slots a running instance of this task occupies. - * This defaults to 1. - */ - numWorkers?: number; - - /** - * Creates an object that has a run function which performs the task's work, - * and an optional cancel function which cancels the task. - */ - createTaskRunner: TaskRunCreatorFunction; -} - -/** - * A task definition with all of its properties set to a valid value. - */ -export interface SanitizedTaskDefinition extends TaskDefinition { - numWorkers: number; -} - -export const validateTaskDefinition = Joi.object({ - type: Joi.string().required(), - title: Joi.string().optional(), - description: Joi.string().optional(), - timeOut: Joi.string().default('5m'), - numWorkers: Joi.number().default(1), - createTaskRunner: Joi.func().required(), -}).default(); - -/** - * A dictionary mapping task types to their definitions. - */ -export interface TaskDictionary { - [taskType: string]: T; -} - -export type TaskStatus = 'idle' | 'running'; - -/* - * A task instance represents all of the data required to store, fetch, - * and execute a task. - */ -export interface TaskInstance { - /** - * Optional ID that can be passed by the caller. When ID is undefined, ES - * will auto-generate a unique id. Otherwise, ID will be used to either - * create a new document, or update existing document - */ - id?: string; - - /** - * The task definition type whose run function will execute this instance. - */ - taskType: string; - - /** - * The date and time that this task is scheduled to be run. It is not - * guaranteed to run at this time, but it is guaranteed not to run earlier - * than this. Defaults to immediately. - */ - runAt?: Date; - - /** - * An interval in minutes (e.g. '5m'). If specified, this is a recurring task. - */ - interval?: string; - - /** - * A task-specific set of parameters, used by the task's run function to tailor - * its work. This is generally user-input, such as { sms: '333-444-2222' }. - */ - params: object; - - /** - * The state passed into the task's run function, and returned by the previous - * run. If there was no previous run, or if the previous run did not return - * any state, this will be the empy object: {} - */ - state?: object; - - /** - * The id of the user who scheduled this task. - */ - user?: string; - - /** - * Used to group tasks for querying. So, reporting might schedule tasks with a scope of 'reporting', - * and then query such tasks to provide a glimpse at only reporting tasks, rather than at all tasks. - */ - scope?: string | string[]; -} - -/** - * A task instance that has an id and is ready for storage. - */ -export interface ConcreteTaskInstance extends TaskInstance { - /** - * The id of the Elastic document that stores this instance's data. This can - * be passed by the caller when scheduling the task. - */ - id: string; - - /** - * The version of the Elaticsearch document. - */ - version: number; - - /** - * The number of unsuccessful attempts since the last successful run. This - * will be zeroed out after a successful run. - */ - attempts: number; - - /** - * Indicates whether or not the task is currently running. - */ - status: TaskStatus; - - /** - * The date and time that this task is scheduled to be run. It is not guaranteed - * to run at this time, but it is guaranteed not to run earlier than this. - */ - runAt: Date; - - /** - * The state passed into the task's run function, and returned by the previous - * run. If there was no previous run, or if the previous run did not return - * any state, this will be the empy object: {} - */ - state: object; -} diff --git a/x-pack/plugins/task_manager/task_manager.test.ts b/x-pack/plugins/task_manager/task_manager.test.ts deleted file mode 100644 index d836f81fb0af51..00000000000000 --- a/x-pack/plugins/task_manager/task_manager.test.ts +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import _ from 'lodash'; -import sinon from 'sinon'; -import { bindToElasticSearchStatus, TaskManager } from './task_manager'; - -describe('TaskManager', () => { - let clock: sinon.SinonFakeTimers; - const defaultConfig = { - task_manager: { - max_workers: 10, - override_num_workers: {}, - index: 'foo', - max_attempts: 9, - poll_interval: 6000000, - }, - }; - - beforeEach(() => { - clock = sinon.useFakeTimers(); - }); - - afterEach(() => clock.restore()); - - test('starts / stops the poller when es goes green / red', async () => { - const handlers: any = {}; - const es = { - status: { - on: (color: string, handler: any) => (handlers[color] = () => Promise.resolve(handler())), - }, - }; - const start = sinon.spy(async () => undefined); - const stop = sinon.spy(async () => undefined); - const init = sinon.spy(async () => undefined); - - bindToElasticSearchStatus(es, { info: _.noop, debug: _.noop }, { stop, start }, { init }); - - await handlers.green(); - sinon.assert.calledOnce(init); - sinon.assert.calledOnce(start); - sinon.assert.notCalled(stop); - - await handlers.red(); - sinon.assert.calledOnce(init); - sinon.assert.calledOnce(start); - sinon.assert.calledOnce(stop); - - await handlers.green(); - sinon.assert.calledTwice(init); - sinon.assert.calledTwice(start); - sinon.assert.calledOnce(stop); - }); - - test('disallows schedule before init', async () => { - const { opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); - const task = { - taskType: 'foo', - params: {}, - }; - await expect(client.schedule(task)).rejects.toThrow(/The task manager is initializing/i); - }); - - test('disallows fetch before init', async () => { - const { opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); - await expect(client.fetch({})).rejects.toThrow(/The task manager is initializing/i); - }); - - test('disallows remove before init', async () => { - const { opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); - await expect(client.remove('23')).rejects.toThrow(/The task manager is initializing/i); - }); - - test('allows middleware registration before init', () => { - const { opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); - const middleware = { - beforeSave: async (saveOpts: any) => saveOpts, - beforeRun: async (runOpts: any) => runOpts, - }; - expect(() => client.addMiddleware(middleware)).not.toThrow(); - }); - - test('disallows middleware registration after init', async () => { - const { $test, opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); - const middleware = { - beforeSave: async (saveOpts: any) => saveOpts, - beforeRun: async (runOpts: any) => runOpts, - }; - - $test.afterPluginsInit(); - - expect(() => client.addMiddleware(middleware)).toThrow( - /Cannot add middleware after the task manager is initialized/i - ); - }); - - function testOpts() { - const $test = { - events: {} as any, - afterPluginsInit: _.noop, - }; - - const opts = { - config: { - get: (path: string) => _.get(defaultConfig, path), - }, - kbnServer: { - uiExports: { - taskDefinitions: {}, - }, - afterPluginsInit(callback: any) { - $test.afterPluginsInit = callback; - }, - }, - server: { - log: sinon.spy(), - decorate(...args: any[]) { - _.set(opts, args.slice(0, -1), _.last(args)); - }, - plugins: { - elasticsearch: { - getCluster() { - return { callWithInternalUser: _.noop }; - }, - status: { - on(eventName: string, callback: () => any) { - $test.events[eventName] = callback; - }, - }, - }, - }, - }, - }; - - return { - $test, - opts, - }; - } -}); diff --git a/x-pack/plugins/task_manager/task_manager.ts b/x-pack/plugins/task_manager/task_manager.ts deleted file mode 100644 index 7f55085a2804ae..00000000000000 --- a/x-pack/plugins/task_manager/task_manager.ts +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { fillPool } from './lib/fill_pool'; -import { TaskManagerLogger } from './lib/logger'; -import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware'; -import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions'; -import { ConcreteTaskInstance, RunContext, TaskInstance } from './task'; -import { SanitizedTaskDefinition, TaskDefinition, TaskDictionary } from './task'; -import { TaskPoller } from './task_poller'; -import { TaskPool } from './task_pool'; -import { TaskManagerRunner } from './task_runner'; -import { FetchOpts, TaskStore } from './task_store'; - -/* - * The TaskManager is the public interface into the task manager system. This glues together - * all of the disparate modules in one integration point. The task manager operates in two different ways: - * - * - pre-init, it allows middleware registration, but disallows task manipulation - * - post-init, it disallows middleware registration, but allows task manipulation - * - * Due to its complexity, this is mostly tested by integration tests (see readme). - */ - -/** - * The public interface into the task manager system. - */ -export class TaskManager { - private isInitialized = false; - private maxWorkers: number; - private overrideNumWorkers: { [taskType: string]: number }; - private definitions: TaskDictionary; - private store?: TaskStore; - private poller?: TaskPoller; - private middleware = { - beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts, - beforeRun: async (runOpts: RunContext) => runOpts, - }; - - /** - * Initializes the task manager, preventing any further addition of middleware, - * enabling the task manipulation methods, and beginning the background polling - * mechanism. - */ - public constructor(kbnServer: any, server: any, config: any) { - this.maxWorkers = config.get('xpack.task_manager.max_workers'); - this.overrideNumWorkers = config.get('xpack.task_manager.override_num_workers'); - this.definitions = {}; - - const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); - - kbnServer.afterPluginsInit(() => { - const store = new TaskStore({ - callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser, - index: config.get('xpack.task_manager.index'), - maxAttempts: config.get('xpack.task_manager.max_attempts'), - supportedTypes: Object.keys(this.definitions), - }); - const pool = new TaskPool({ - logger, - maxWorkers: this.maxWorkers, - }); - const createRunner = (instance: ConcreteTaskInstance) => - new TaskManagerRunner({ - logger, - kbnServer, - instance, - store, - definition: this.definitions[instance.taskType], - beforeRun: this.middleware.beforeRun, - }); - const poller = new TaskPoller({ - logger, - pollInterval: config.get('xpack.task_manager.poll_interval'), - work() { - return fillPool(pool.run, store.fetchAvailableTasks, createRunner); - }, - }); - - bindToElasticSearchStatus(server.plugins.elasticsearch, logger, poller, store); - - this.store = store; - this.poller = poller; - this.isInitialized = true; - }); - } - - /** - * Method for allowing consumers to register task definitions into the system. - * @param taskDefinitions - The Kibana task definitions dictionary - */ - public registerTaskDefinitions(taskDefinitions: TaskDictionary) { - this.assertUninitialized('register task definitions'); - const duplicate = Object.keys(taskDefinitions).find(k => !!this.definitions[k]); - if (duplicate) { - throw new Error(`Task ${duplicate} is already defined!`); - } - - const sanitized = sanitizeTaskDefinitions( - taskDefinitions, - this.maxWorkers, - this.overrideNumWorkers - ); - Object.assign(this.definitions, sanitized); - } - - /** - * Adds middleware to the task manager, such as adding security layers, loggers, etc. - * - * @param {Middleware} middleware - The middlware being added. - */ - public addMiddleware(middleware: Middleware) { - this.assertUninitialized('add middleware'); - const prevMiddleWare = this.middleware; - this.middleware = addMiddlewareToChain(prevMiddleWare, middleware); - } - - /** - * Schedules a task. - * - * @param task - The task being scheduled. - */ - public async schedule(taskInstance: TaskInstance, options?: any) { - this.assertInitialized(); - const { taskInstance: modifiedTask } = await this.middleware.beforeSave({ - ...options, - taskInstance, - }); - const result = await this.store!.schedule(modifiedTask); - this.poller!.attemptWork(); - return result; - } - - /** - * Fetches a paginatable list of scheduled tasks. - * - * @param opts - The query options used to filter tasks - */ - public async fetch(opts: FetchOpts) { - this.assertInitialized(); - return this.store!.fetch(opts); - } - - /** - * Removes the specified task from the index. - * - * @param {string} id - * @returns {Promise} - */ - public async remove(id: string) { - this.assertInitialized(); - return this.store!.remove(id); - } - - private assertUninitialized(message: string) { - if (this.isInitialized) { - throw new Error(`Cannot ${message} after the task manager is initialized.`); - } - } - - private assertInitialized() { - if (!this.isInitialized) { - throw new Error('The task manager is initializing.'); - } - } -} - -// This is exported for test purposes. It is responsible for starting / stopping -// the poller based on the elasticsearch plugin status. -export function bindToElasticSearchStatus( - elasticsearch: any, - logger: { debug: (s: string) => any; info: (s: string) => any }, - poller: { stop: () => any; start: () => Promise }, - store: { init: () => Promise } -) { - elasticsearch.status.on('red', () => { - logger.debug('Lost connection to Elasticsearch, stopping the poller.'); - poller.stop(); - }); - - elasticsearch.status.on('green', async () => { - logger.debug('Initializing store'); - await store.init(); - logger.debug('Starting poller'); - await poller.start(); - logger.info('Connected to Elasticsearch, and watching for tasks'); - }); -} diff --git a/x-pack/plugins/task_manager/task_poller.test.ts b/x-pack/plugins/task_manager/task_poller.test.ts deleted file mode 100644 index 08bb21b5cdd5c2..00000000000000 --- a/x-pack/plugins/task_manager/task_poller.test.ts +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import _ from 'lodash'; -import sinon from 'sinon'; -import { TaskPoller } from './task_poller'; -import { mockLogger, resolvable, sleep } from './test_utils'; - -describe('TaskPoller', () => { - describe('interval tests', () => { - let clock: sinon.SinonFakeTimers; - - beforeEach(() => { - clock = sinon.useFakeTimers(); - }); - - afterEach(() => clock.restore()); - - test('runs the work function on an interval', async () => { - const pollInterval = _.random(10, 20); - const done = resolvable(); - const work = sinon.spy(() => { - done.resolve(); - return Promise.resolve(); - }); - const poller = new TaskPoller({ - pollInterval, - work, - logger: mockLogger(), - }); - - poller.start(); - - sinon.assert.calledOnce(work); - await done; - - clock.tick(pollInterval - 1); - sinon.assert.calledOnce(work); - clock.tick(1); - sinon.assert.calledTwice(work); - }); - }); - - test('logs, but does not crash if the work function fails', async () => { - let count = 0; - const logger = mockLogger(); - const doneWorking = resolvable(); - const poller = new TaskPoller({ - logger, - pollInterval: 1, - work: async () => { - ++count; - if (count === 1) { - throw new Error('Dang it!'); - } - if (count > 1) { - poller.stop(); - doneWorking.resolve(); - } - }, - }); - - poller.start(); - - await doneWorking; - - expect(count).toEqual(2); - sinon.assert.calledWithMatch(logger.error, /Dang it/i); - }); - - test('is stoppable', async () => { - const doneWorking = resolvable(); - const work = sinon.spy(async () => { - poller.stop(); - doneWorking.resolve(); - }); - - const poller = new TaskPoller({ - logger: mockLogger(), - pollInterval: 1, - work, - }); - - poller.start(); - await doneWorking; - await sleep(10); - - sinon.assert.calledOnce(work); - }); - - test('disregards duplicate calls to "start"', async () => { - const doneWorking = resolvable(); - const work = sinon.spy(async () => { - await doneWorking; - }); - const poller = new TaskPoller({ - pollInterval: 1, - logger: mockLogger(), - work, - }); - - poller.start(); - poller.start(); - poller.start(); - poller.start(); - - poller.stop(); - - doneWorking.resolve(); - - sinon.assert.calledOnce(work); - }); - - test('waits for work before polling', async () => { - const doneWorking = resolvable(); - const work = sinon.spy(async () => { - await sleep(10); - poller.stop(); - doneWorking.resolve(); - }); - const poller = new TaskPoller({ - pollInterval: 1, - logger: mockLogger(), - work, - }); - - poller.start(); - await doneWorking; - - sinon.assert.calledOnce(work); - }); -}); diff --git a/x-pack/plugins/task_manager/task_poller.ts b/x-pack/plugins/task_manager/task_poller.ts deleted file mode 100644 index 86be6461870811..00000000000000 --- a/x-pack/plugins/task_manager/task_poller.ts +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -/* - * This module contains the logic for polling the task manager index for new work. - */ - -import { Logger } from './lib/logger'; - -type WorkFn = () => Promise; - -interface Opts { - pollInterval: number; - logger: Logger; - work: WorkFn; -} - -/** - * Performs work on a scheduled interval, logging any errors. This waits for work to complete - * (or error) prior to attempting another run. - */ -export class TaskPoller { - private isStarted = false; - private isWorking = false; - private timeout: any; - private pollInterval: number; - private logger: Logger; - private work: WorkFn; - - /** - * Constructs a new TaskPoller. - * - * @param opts - * @prop {number} pollInterval - How often, in milliseconds, we will run the work function - * @prop {Logger} logger - The task manager logger - * @prop {WorkFn} work - An empty, asynchronous function that performs the desired work - */ - constructor(opts: Opts) { - this.pollInterval = opts.pollInterval; - this.logger = opts.logger; - this.work = opts.work; - } - - /** - * Starts the poller. If the poller is already running, this has no effect. - */ - public async start() { - if (this.isStarted) { - return; - } - this.isStarted = true; - - const poll = async () => { - await this.attemptWork(); - - if (this.isStarted) { - this.timeout = setTimeout(poll, this.pollInterval); - } - }; - - poll(); - } - - /** - * Stops the poller. - */ - public stop() { - this.isStarted = false; - clearTimeout(this.timeout); - this.timeout = undefined; - } - - /** - * Runs the work function. If the work function is currently running, - * this has no effect. - */ - public async attemptWork() { - if (this.isWorking) { - return; - } - - this.isWorking = true; - - try { - await this.work(); - } catch (error) { - this.logger.error(`Failed to poll for work ${error.stack}`); - } finally { - this.isWorking = false; - } - } -} diff --git a/x-pack/plugins/task_manager/task_pool.test.ts b/x-pack/plugins/task_manager/task_pool.test.ts deleted file mode 100644 index 66164266ae37d1..00000000000000 --- a/x-pack/plugins/task_manager/task_pool.test.ts +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import sinon from 'sinon'; -import { TaskPool } from './task_pool'; -import { mockLogger, resolvable, sleep } from './test_utils'; - -describe('TaskPool', () => { - test('occupiedWorkers are a sum of worker costs', async () => { - const pool = new TaskPool({ - maxWorkers: 200, - logger: mockLogger(), - }); - - const result = await pool.run([ - { ...mockTask(), numWorkers: 10 }, - { ...mockTask(), numWorkers: 20 }, - { ...mockTask(), numWorkers: 30 }, - ]); - - expect(result).toBeTruthy(); - expect(pool.occupiedWorkers).toEqual(60); - }); - - test('availableWorkers are a function of total_capacity - occupiedWorkers', async () => { - const pool = new TaskPool({ - maxWorkers: 100, - logger: mockLogger(), - }); - - const result = await pool.run([ - { ...mockTask(), numWorkers: 20 }, - { ...mockTask(), numWorkers: 30 }, - { ...mockTask(), numWorkers: 40 }, - ]); - - expect(result).toBeTruthy(); - expect(pool.availableWorkers).toEqual(10); - }); - - test('does not run tasks that are beyond its available capacity', async () => { - const pool = new TaskPool({ - maxWorkers: 10, - logger: mockLogger(), - }); - - const shouldRun = mockRun(); - const shouldNotRun = mockRun(); - - const result = await pool.run([ - { ...mockTask(), numWorkers: 9, run: shouldRun }, - { ...mockTask(), numWorkers: 9, run: shouldNotRun }, - ]); - - expect(result).toBeFalsy(); - expect(pool.availableWorkers).toEqual(1); - sinon.assert.calledOnce(shouldRun); - sinon.assert.notCalled(shouldNotRun); - }); - - test('clears up capacity when a task completes', async () => { - const pool = new TaskPool({ - maxWorkers: 10, - logger: mockLogger(), - }); - - const firstWork = resolvable(); - const firstRun = sinon.spy(async () => { - await sleep(0); - firstWork.resolve(); - }); - const secondWork = resolvable(); - const secondRun = sinon.spy(async () => { - await sleep(0); - secondWork.resolve(); - }); - - const result = await pool.run([ - { ...mockTask(), numWorkers: 9, run: firstRun }, - { ...mockTask(), numWorkers: 2, run: secondRun }, - ]); - - expect(result).toBeFalsy(); - expect(pool.occupiedWorkers).toEqual(9); - expect(pool.availableWorkers).toEqual(1); - - await firstWork; - sinon.assert.calledOnce(firstRun); - sinon.assert.notCalled(secondRun); - - await pool.run([{ ...mockTask(), numWorkers: 2, run: secondRun }]); - - expect(pool.occupiedWorkers).toEqual(2); - expect(pool.availableWorkers).toEqual(8); - - await secondWork; - - expect(pool.occupiedWorkers).toEqual(0); - expect(pool.availableWorkers).toEqual(10); - sinon.assert.calledOnce(secondRun); - }); - - test('run cancels expired tasks prior to running new tasks', async () => { - const pool = new TaskPool({ - maxWorkers: 10, - logger: mockLogger(), - }); - - const expired = resolvable(); - const shouldRun = sinon.spy(() => Promise.resolve()); - const shouldNotRun = sinon.spy(() => Promise.resolve()); - const result = await pool.run([ - { - ...mockTask(), - numWorkers: 9, - async run() { - this.isExpired = true; - expired.resolve(); - await sleep(10); - return {}; - }, - cancel: shouldRun, - }, - { - ...mockTask(), - numWorkers: 1, - async run() { - await sleep(10); - return {}; - }, - cancel: shouldNotRun, - }, - ]); - - expect(result).toBeTruthy(); - expect(pool.occupiedWorkers).toEqual(10); - expect(pool.availableWorkers).toEqual(0); - - await expired; - - expect(await pool.run([{ ...mockTask(), numWorkers: 7 }])).toBeTruthy(); - sinon.assert.calledOnce(shouldRun); - sinon.assert.notCalled(shouldNotRun); - - expect(pool.occupiedWorkers).toEqual(8); - expect(pool.availableWorkers).toEqual(2); - }); - - test('logs if cancellation errors', async () => { - const logger = mockLogger(); - const pool = new TaskPool({ - logger, - maxWorkers: 20, - }); - - const cancelled = resolvable(); - const result = await pool.run([ - { - ...mockTask(), - numWorkers: 7, - async run() { - this.isExpired = true; - await sleep(10); - return {}; - }, - async cancel() { - cancelled.resolve(); - throw new Error('Dern!'); - }, - toString: () => '"shooooo!"', - }, - ]); - - expect(result).toBeTruthy(); - await pool.run([]); - - expect(pool.occupiedWorkers).toEqual(0); - - // Allow the task to cancel... - await cancelled; - - sinon.assert.calledWithMatch(logger.error, /Failed to cancel task "shooooo!"/); - }); - - function mockRun() { - return sinon.spy(async () => sleep(0)); - } - - function mockTask() { - return { - numWorkers: 1, - isExpired: false, - cancel: async () => undefined, - claimOwnership: async () => true, - run: mockRun(), - }; - } -}); diff --git a/x-pack/plugins/task_manager/task_pool.ts b/x-pack/plugins/task_manager/task_pool.ts deleted file mode 100644 index 7ac62219fc7091..00000000000000 --- a/x-pack/plugins/task_manager/task_pool.ts +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -/* - * This module contains the logic that ensures we don't run too many - * tasks at once in a given Kibana instance. - */ - -import { Logger } from './lib/logger'; -import { TaskRunner } from './task_runner'; - -interface Opts { - maxWorkers: number; - logger: Logger; -} - -/** - * Runs tasks in batches, taking costs into account. - */ -export class TaskPool { - private maxWorkers: number; - private running = new Set(); - private logger: Logger; - - /** - * Creates an instance of TaskPool. - * - * @param {Opts} opts - * @prop {number} maxWorkers - The total number of workers / work slots available - * (e.g. maxWorkers is 4, then 2 tasks of cost 2 can run at a time, or 4 tasks of cost 1) - * @prop {Logger} logger - The task manager logger. - */ - constructor(opts: Opts) { - this.maxWorkers = opts.maxWorkers; - this.logger = opts.logger; - } - - /** - * Gets how many workers are currently in use. - */ - get occupiedWorkers() { - let total = 0; - - this.running.forEach(({ numWorkers }) => (total += numWorkers)); - - return total; - } - - /** - * Gets how many workers are currently available. - */ - get availableWorkers() { - return this.maxWorkers - this.occupiedWorkers; - } - - /** - * Attempts to run the specified list of tasks. Returns true if it was able - * to start every task in the list, false if there was not enough capacity - * to run every task. - * - * @param {TaskRunner[]} tasks - * @returns {Promise} - */ - public run = (tasks: TaskRunner[]) => { - this.cancelExpiredTasks(); - return this.attemptToRun(tasks); - }; - - private async attemptToRun(tasks: TaskRunner[]) { - for (const task of tasks) { - if (this.availableWorkers < task.numWorkers) { - return false; - } - - if (await task.claimOwnership()) { - this.running.add(task); - task - .run() - .catch(error => { - this.logger.warning(`Task ${task} failed in attempt to run: ${error.stack}`); - }) - .then(() => this.running.delete(task)); - } - } - - return true; - } - - private cancelExpiredTasks() { - for (const task of this.running) { - if (task.isExpired) { - this.cancelTask(task); - } - } - } - - private async cancelTask(task: TaskRunner) { - try { - this.logger.debug(`Cancelling expired task ${task}.`); - this.running.delete(task); - await task.cancel(); - } catch (error) { - this.logger.error(`Failed to cancel task ${task}: ${error.stack}`); - } - } -} diff --git a/x-pack/plugins/task_manager/task_runner.test.ts b/x-pack/plugins/task_manager/task_runner.test.ts deleted file mode 100644 index 0ea607b8811170..00000000000000 --- a/x-pack/plugins/task_manager/task_runner.test.ts +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import _ from 'lodash'; -import sinon from 'sinon'; -import { minutesFromNow } from './lib/intervals'; -import { ConcreteTaskInstance, TaskDefinition } from './task'; -import { TaskManagerRunner } from './task_runner'; - -describe('TaskManagerRunner', () => { - test('provides details about the task that is running', () => { - const { runner } = testOpts({ - instance: { - id: 'foo', - taskType: 'bar', - }, - }); - - expect(runner.id).toEqual('foo'); - expect(runner.taskType).toEqual('bar'); - expect(runner.toString()).toEqual('bar "foo"'); - }); - - test('warns if the task returns an unexpected result', async () => { - await allowsReturnType(undefined); - await allowsReturnType({}); - await allowsReturnType({ - runAt: new Date(), - }); - await allowsReturnType({ - error: new Error('Dang it!'), - }); - await allowsReturnType({ - state: { shazm: true }, - }); - await disallowsReturnType('hm....'); - await disallowsReturnType({ - whatIsThis: '?!!?', - }); - }); - - test('queues a reattempt if the task fails', async () => { - const initialAttempts = _.random(0, 2); - const id = Date.now().toString(); - const { runner, store } = testOpts({ - instance: { - id, - attempts: initialAttempts, - params: { a: 'b' }, - state: { hey: 'there' }, - }, - definition: { - createTaskRunner: () => ({ - async run() { - throw new Error('Dangit!'); - }, - }), - }, - }); - - await runner.run(); - - sinon.assert.calledOnce(store.update); - const instance = store.update.args[0][0]; - - expect(instance.id).toEqual(id); - expect(instance.attempts).toEqual(initialAttempts + 1); - expect(instance.runAt.getTime()).toBeGreaterThan(Date.now()); - expect(instance.params).toEqual({ a: 'b' }); - expect(instance.state).toEqual({ hey: 'there' }); - }); - - test('reschedules tasks that have an interval', async () => { - const { runner, store } = testOpts({ - instance: { - interval: '10m', - }, - }); - - await runner.run(); - - sinon.assert.calledOnce(store.update); - const instance = store.update.args[0][0]; - - expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime()); - expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime()); - }); - - test('reschedules tasks that return a runAt', async () => { - const runAt = minutesFromNow(_.random(1, 10)); - const { runner, store } = testOpts({ - definition: { - createTaskRunner: () => ({ - async run() { - return { runAt }; - }, - }), - }, - }); - - await runner.run(); - - sinon.assert.calledOnce(store.update); - sinon.assert.calledWithMatch(store.update, { runAt }); - }); - - test('tasks that return runAt override interval', async () => { - const runAt = minutesFromNow(_.random(5)); - const { runner, store } = testOpts({ - instance: { - interval: '20m', - }, - definition: { - createTaskRunner: () => ({ - async run() { - return { runAt }; - }, - }), - }, - }); - - await runner.run(); - - sinon.assert.calledOnce(store.update); - sinon.assert.calledWithMatch(store.update, { runAt }); - }); - - test('removes non-recurring tasks after they complete', async () => { - const id = _.random(1, 20).toString(); - const { runner, store } = testOpts({ - instance: { - id, - interval: undefined, - }, - definition: { - createTaskRunner: () => ({ - async run() { - return undefined; - }, - }), - }, - }); - - await runner.run(); - - sinon.assert.calledOnce(store.remove); - sinon.assert.calledWith(store.remove, id); - }); - - test('cancel cancels the task runner, if it is cancellable', async () => { - let wasCancelled = false; - const { runner, logger } = testOpts({ - definition: { - createTaskRunner: () => ({ - async run() { - await new Promise(r => setTimeout(r, 1000)); - }, - async cancel() { - wasCancelled = true; - }, - }), - }, - }); - - const promise = runner.run(); - await new Promise(r => setTimeout(r, 1)); - await runner.cancel(); - await promise; - - expect(wasCancelled).toBeTruthy(); - sinon.assert.neverCalledWithMatch(logger.warning, /not cancellable/); - }); - - test('warns if cancel is called on a non-cancellable task', async () => { - const { runner, logger } = testOpts({ - definition: { - createTaskRunner: () => ({ - run: async () => undefined, - }), - }, - }); - - const promise = runner.run(); - await runner.cancel(); - await promise; - - sinon.assert.calledWithMatch(logger.warning, /not cancellable/); - }); - - interface TestOpts { - instance?: Partial; - definition?: Partial; - } - - function testOpts(opts: TestOpts) { - const callCluster = sinon.stub(); - const createTaskRunner = sinon.stub(); - const logger = { - error: sinon.stub(), - debug: sinon.stub(), - info: sinon.stub(), - warning: sinon.stub(), - }; - const store = { - update: sinon.stub(), - remove: sinon.stub(), - }; - const runner = new TaskManagerRunner({ - kbnServer: sinon.stub(), - beforeRun: context => Promise.resolve(context), - logger, - store, - instance: Object.assign( - { - id: 'foo', - taskType: 'bar', - version: 32, - runAt: new Date(), - attempts: 0, - params: {}, - scope: 'reporting', - state: {}, - status: 'idle', - user: 'example', - }, - opts.instance || {} - ), - definition: Object.assign( - { - type: 'bar', - title: 'Bar!', - createTaskRunner, - }, - opts.definition || {} - ), - }); - - return { - callCluster, - createTaskRunner, - runner, - logger, - store, - }; - } - - async function testReturn(result: any, shouldBeValid: boolean) { - const { runner, logger } = testOpts({ - definition: { - createTaskRunner: () => ({ - run: async () => result, - }), - }, - }); - - await runner.run(); - - try { - if (shouldBeValid) { - sinon.assert.notCalled(logger.warning); - } else { - sinon.assert.calledWith(logger.warning, sinon.match(/invalid task result/i)); - } - } catch (err) { - sinon.assert.fail( - `Expected result ${JSON.stringify(result)} to be ${shouldBeValid ? 'valid' : 'invalid'}` - ); - } - } - - function allowsReturnType(result: any) { - return testReturn(result, true); - } - - function disallowsReturnType(result: any) { - return testReturn(result, false); - } -}); diff --git a/x-pack/plugins/task_manager/task_runner.ts b/x-pack/plugins/task_manager/task_runner.ts deleted file mode 100644 index 78b21c7bc15c5a..00000000000000 --- a/x-pack/plugins/task_manager/task_runner.ts +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -/* - * This module contains the core logic for running an individual task. - * It handles the full lifecycle of a task run, including error handling, - * rescheduling, middleware application, etc. - */ - -import Joi from 'joi'; -import { intervalFromNow, minutesFromNow } from './lib/intervals'; -import { Logger } from './lib/logger'; -import { BeforeRunFunction } from './lib/middleware'; -import { - CancelFunction, - CancellableTask, - ConcreteTaskInstance, - RunResult, - TaskDefinition, - validateRunResult, -} from './task'; -import { RemoveResult } from './task_store'; - -export interface TaskRunner { - numWorkers: number; - isExpired: boolean; - cancel: CancelFunction; - claimOwnership: () => Promise; - run: () => Promise; - toString?: () => string; -} - -interface Updatable { - update(doc: ConcreteTaskInstance): Promise; - remove(id: string): Promise; -} - -interface Opts { - logger: Logger; - definition: TaskDefinition; - instance: ConcreteTaskInstance; - store: Updatable; - kbnServer: any; - beforeRun: BeforeRunFunction; -} - -/** - * Runs a background task, ensures that errors are properly handled, - * allows for cancellation. - * - * @export - * @class TaskManagerRunner - * @implements {TaskRunner} - */ -export class TaskManagerRunner implements TaskRunner { - private task?: CancellableTask; - private instance: ConcreteTaskInstance; - private definition: TaskDefinition; - private logger: Logger; - private store: Updatable; - private kbnServer: any; - private beforeRun: BeforeRunFunction; - - /** - * Creates an instance of TaskManagerRunner. - * @param {Opts} opts - * @prop {Logger} logger - The task manager logger - * @prop {TaskDefinition} definition - The definition of the task being run - * @prop {ConcreteTaskInstance} instance - The record describing this particular task instance - * @prop {Updatable} store - The store used to read / write tasks instance info - * @prop {kbnServer} kbnServer - An async function that provides the task's run context - * @prop {BeforeRunFunction} beforeRun - A function that adjusts the run context prior to running the task - * @memberof TaskManagerRunner - */ - constructor(opts: Opts) { - this.instance = sanitizeInstance(opts.instance); - this.definition = opts.definition; - this.logger = opts.logger; - this.store = opts.store; - this.kbnServer = opts.kbnServer; - this.beforeRun = opts.beforeRun; - } - - /** - * Gets how many workers are occupied by this task instance. - */ - public get numWorkers() { - return this.definition.numWorkers || 1; - } - - /** - * Gets the id of this task instance. - */ - public get id() { - return this.instance.id; - } - - /** - * Gets the task type of this task instance. - */ - public get taskType() { - return this.instance.taskType; - } - - /** - * Gets whether or not this task has run longer than its expiration setting allows. - */ - public get isExpired() { - return this.instance.runAt < new Date(); - } - - /** - * Returns a log-friendly representation of this task. - */ - public toString() { - return `${this.instance.taskType} "${this.instance.id}"`; - } - - /** - * Runs the task, handling the task result, errors, etc, rescheduling if need - * be. NOTE: the time of applying the middleware's beforeRun is incorporated - * into the total timeout time the task in configured with. We may decide to - * start the timer after beforeRun resolves - * - * @returns {Promise} - */ - public async run(): Promise { - try { - this.logger.debug(`Running task ${this}`); - const modifiedContext = await this.beforeRun({ - kbnServer: this.kbnServer, - taskInstance: this.instance, - }); - const task = this.definition.createTaskRunner(modifiedContext); - this.task = task; - return this.processResult(this.validateResult(await this.task.run())); - } catch (error) { - this.logger.warning(`Task ${this} failed ${error.stack}`); - this.logger.debug(`Task ${JSON.stringify(this.instance)} failed ${error.stack}`); - - return this.processResult({ error }); - } - } - - /** - * Attempts to claim exclusive rights to run the task. If the attempt fails - * with a 409 (http conflict), we assume another Kibana instance beat us to the punch. - * - * @returns {Promise} - */ - public async claimOwnership(): Promise { - const VERSION_CONFLICT_STATUS = 409; - - try { - this.instance = await this.store.update({ - ...this.instance, - status: 'running', - runAt: intervalFromNow(this.definition.timeOut)!, - }); - - return true; - } catch (error) { - if (error.statusCode !== VERSION_CONFLICT_STATUS) { - throw error; - } - } - - return false; - } - - /** - * Attempts to cancel the task. - * - * @returns {Promise} - */ - public async cancel() { - const { task } = this; - if (task && task.cancel) { - this.task = undefined; - return task.cancel(); - } - - this.logger.warning(`The task ${this} is not cancellable.`); - } - - private validateResult(result?: RunResult | void): RunResult { - const { error } = Joi.validate(result, validateRunResult); - - if (error) { - this.logger.warning(`Invalid task result for ${this}: ${error.message}`); - } - - return result || {}; - } - - private async processResult(result: RunResult): Promise { - const runAt = result.runAt || intervalFromNow(this.instance.interval); - const state = result.state || this.instance.state || {}; - - if (runAt || result.error) { - await this.store.update({ - ...this.instance, - runAt: runAt || minutesFromNow((this.instance.attempts + 1) * 5), - state, - attempts: result.error ? this.instance.attempts + 1 : 0, - }); - } else { - try { - await this.store.remove(this.instance.id); - } catch (err) { - if (err.statusCode === 404) { - this.logger.warning( - `Task cleanup of ${this} failed in processing. Was remove called twice?` - ); - } else { - throw err; - } - } - } - - return result; - } -} - -function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance { - return { - ...instance, - params: instance.params || {}, - state: instance.state || {}, - }; -} diff --git a/x-pack/plugins/task_manager/task_store.test.ts b/x-pack/plugins/task_manager/task_store.test.ts deleted file mode 100644 index f3675f545b0fea..00000000000000 --- a/x-pack/plugins/task_manager/task_store.test.ts +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import _ from 'lodash'; -import sinon from 'sinon'; -import { TaskInstance, TaskStatus } from './task'; -import { FetchOpts, TaskStore } from './task_store'; - -describe('TaskStore', () => { - describe('init', () => { - test('creates the task manager index', async () => { - const callCluster = sinon.spy(); - const store = new TaskStore({ - callCluster, - index: 'tasky', - maxAttempts: 2, - supportedTypes: ['a', 'b', 'c'], - }); - - await store.init(); - - sinon.assert.calledOnce(callCluster); - - callCluster.calledOnceWith('indices.create', { - index: 'tasky', - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - }, - }); - }); - - test('patches the task manager index mappings if the index already exists', async () => { - const callCluster = sinon.spy((path: string) => { - if (path === 'indices.create') { - return Promise.reject({ body: { error: { type: 'resource_already_exists_exception' } } }); - } - }); - const store = new TaskStore({ - callCluster, - index: 'taskalicious', - maxAttempts: 2, - supportedTypes: ['a', 'b', 'c'], - }); - - await store.init(); - - sinon.assert.calledTwice(callCluster); - - callCluster.calledOnceWith('indices.putMapping', { - index: 'taskalicious', - }); - }); - }); - - describe('schedule', () => { - async function testSchedule(task: TaskInstance) { - const callCluster = sinon.spy(() => - Promise.resolve({ - _id: 'testid', - _version: 3344, - }) - ); - const store = new TaskStore({ - callCluster, - index: 'tasky', - maxAttempts: 2, - supportedTypes: ['report', 'dernstraight', 'yawn'], - }); - - const result = await store.schedule(task); - - sinon.assert.calledOnce(callCluster); - - return { result, callCluster, arg: callCluster.args[0][1] }; - } - - test('serializes the params and state', async () => { - const task = { - params: { hello: 'world' }, - state: { foo: 'bar' }, - taskType: 'report', - }; - const { callCluster, arg } = await testSchedule(task); - - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'index'); - - expect(arg).toMatchObject({ - index: 'tasky', - type: '_doc', - body: { - task: { - params: JSON.stringify(task.params), - state: JSON.stringify(task.state), - }, - }, - }); - }); - - test('retiurns a concrete task instance', async () => { - const task = { - params: { hello: 'world' }, - state: { foo: 'bar' }, - taskType: 'report', - }; - const { result } = await testSchedule(task); - - expect(result).toMatchObject({ - ...task, - version: 3344, - id: 'testid', - }); - }); - - test('sets runAt to now if not specified', async () => { - const now = Date.now(); - const { arg } = await testSchedule({ taskType: 'dernstraight', params: {} }); - expect(arg.body.task.runAt.getTime()).toBeGreaterThanOrEqual(now); - }); - - test('ensures params and state are not null', async () => { - const { arg } = await testSchedule({ taskType: 'yawn' } as any); - expect(arg.body.task.params).toEqual('{}'); - expect(arg.body.task.state).toEqual('{}'); - }); - - test('errors if the task type is unknown', async () => { - await expect(testSchedule({ taskType: 'nope', params: {} })).rejects.toThrow( - /Unsupported task type "nope"/i - ); - }); - }); - - describe('fetch', () => { - async function testFetch(opts?: FetchOpts, hits: any[] = []) { - const callCluster = sinon.spy(async () => ({ hits: { hits } })); - const store = new TaskStore({ - callCluster, - index: 'tasky', - maxAttempts: 2, - supportedTypes: ['a', 'b', 'c'], - }); - - const result = await store.fetch(opts); - - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'search'); - - return { - result, - args: callCluster.args[0][1], - }; - } - - test('empty call filters by type, sorts by runAt and id', async () => { - const { args } = await testFetch(); - expect(args).toMatchObject({ - type: '_doc', - index: 'tasky', - body: { - sort: [{ 'task.runAt': 'asc' }, { _id: 'desc' }], - query: { term: { type: 'task' } }, - }, - }); - }); - - test('allows custom queries', async () => { - const { args } = await testFetch({ - query: { - term: { 'task.taskType': 'bar' }, - }, - }); - - expect(args).toMatchObject({ - body: { - query: { - bool: { - must: [{ term: { type: 'task' } }, { term: { 'task.taskType': 'bar' } }], - }, - }, - }, - }); - }); - - test('sorts by id if custom sort does not have an id sort in it', async () => { - const { args } = await testFetch({ - sort: [{ 'task.taskType': 'desc' }], - }); - - expect(args).toMatchObject({ - body: { - sort: [{ 'task.taskType': 'desc' }, { _id: 'desc' }], - }, - }); - }); - - test('allows custom sort by id', async () => { - const { args } = await testFetch({ - sort: [{ _id: 'asc' }], - }); - - expect(args).toMatchObject({ - body: { - sort: [{ _id: 'asc' }], - }, - }); - }); - - test('allows specifying pagination', async () => { - const now = new Date(); - const searchAfter = [now, '143243sdafa32']; - const { args } = await testFetch({ - searchAfter, - }); - - expect(args).toMatchObject({ - body: { - search_after: searchAfter, - }, - }); - }); - - test('returns paginated tasks', async () => { - const runAt = new Date(); - const { result } = await testFetch(undefined, [ - { - _id: 'aaa', - _source: { - type: 'task', - task: { - runAt, - taskType: 'foo', - interval: undefined, - attempts: 0, - status: 'idle', - params: '{ "hello": "world" }', - state: '{ "baby": "Henhen" }', - user: 'jimbo', - scope: 'reporting', - }, - }, - sort: ['a', 1], - }, - { - _id: 'bbb', - _source: { - type: 'task', - task: { - runAt, - taskType: 'bar', - interval: '5m', - attempts: 2, - status: 'running', - params: '{ "shazm": 1 }', - state: '{ "henry": "The 8th" }', - user: 'dabo', - scope: ['reporting', 'ceo'], - }, - }, - sort: ['b', 2], - }, - ]); - - expect(result).toEqual({ - docs: [ - { - attempts: 0, - id: 'aaa', - interval: undefined, - params: { hello: 'world' }, - runAt, - scope: 'reporting', - state: { baby: 'Henhen' }, - status: 'idle', - taskType: 'foo', - user: 'jimbo', - version: undefined, - }, - { - attempts: 2, - id: 'bbb', - interval: '5m', - params: { shazm: 1 }, - runAt, - scope: ['reporting', 'ceo'], - state: { henry: 'The 8th' }, - status: 'running', - taskType: 'bar', - user: 'dabo', - version: undefined, - }, - ], - searchAfter: ['b', 2], - }); - }); - }); - - describe('fetchAvailableTasks', () => { - async function testFetchAvailableTasks({ opts = {}, hits = [] }: any = {}) { - const callCluster = sinon.spy(async () => ({ hits: { hits } })); - const store = new TaskStore({ - callCluster, - supportedTypes: ['a', 'b', 'c'], - index: 'tasky', - maxAttempts: 2, - ...opts, - }); - - const result = await store.fetchAvailableTasks(); - - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'search'); - - return { - result, - args: callCluster.args[0][1], - }; - } - - test('it filters tasks by supported types, maxAttempts, and runAt', async () => { - const maxAttempts = _.random(2, 43); - const index = `index_${_.random(1, 234)}`; - const { args } = await testFetchAvailableTasks({ - opts: { - index, - maxAttempts, - supportedTypes: ['foo', 'bar'], - }, - }); - expect(args).toMatchObject({ - body: { - query: { - bool: { - must: [ - { term: { type: 'task' } }, - { - bool: { - must: [ - { terms: { 'task.taskType': ['foo', 'bar'] } }, - { range: { 'task.attempts': { lte: maxAttempts } } }, - { range: { 'task.runAt': { lte: 'now' } } }, - ], - }, - }, - ], - }, - }, - size: 10, - sort: { 'task.runAt': { order: 'asc' } }, - version: true, - }, - index, - type: '_doc', - }); - }); - - test('it returns task objects', async () => { - const runAt = new Date(); - const { result } = await testFetchAvailableTasks({ - hits: [ - { - _id: 'aaa', - _source: { - type: 'task', - task: { - runAt, - taskType: 'foo', - interval: undefined, - attempts: 0, - status: 'idle', - params: '{ "hello": "world" }', - state: '{ "baby": "Henhen" }', - user: 'jimbo', - scope: 'reporting', - }, - }, - sort: ['a', 1], - }, - { - _id: 'bbb', - _source: { - type: 'task', - task: { - runAt, - taskType: 'bar', - interval: '5m', - attempts: 2, - status: 'running', - params: '{ "shazm": 1 }', - state: '{ "henry": "The 8th" }', - user: 'dabo', - scope: ['reporting', 'ceo'], - }, - }, - sort: ['b', 2], - }, - ], - }); - expect(result).toMatchObject([ - { - attempts: 0, - id: 'aaa', - interval: undefined, - params: { hello: 'world' }, - runAt, - scope: 'reporting', - state: { baby: 'Henhen' }, - status: 'idle', - taskType: 'foo', - user: 'jimbo', - version: undefined, - }, - { - attempts: 2, - id: 'bbb', - interval: '5m', - params: { shazm: 1 }, - runAt, - scope: ['reporting', 'ceo'], - state: { henry: 'The 8th' }, - status: 'running', - taskType: 'bar', - user: 'dabo', - version: undefined, - }, - ]); - }); - }); - - describe('update', () => { - test('refreshes the index, handles versioning', async () => { - const runAt = new Date(); - const task = { - runAt, - id: 'task:324242', - params: { hello: 'world' }, - state: { foo: 'bar' }, - taskType: 'report', - version: 2, - attempts: 3, - status: 'idle' as TaskStatus, - }; - - const callCluster = sinon.spy(async () => ({ _version: task.version + 1 })); - const store = new TaskStore({ - callCluster, - index: 'tasky', - maxAttempts: 2, - supportedTypes: ['a', 'b', 'c'], - }); - - const result = await store.update(task); - - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'update'); - - expect(callCluster.args[0][1]).toMatchObject({ - id: task.id, - index: 'tasky', - type: '_doc', - version: 2, - refresh: true, - body: { - doc: { - task: { - ...['id', 'version'].reduce((acc, prop) => _.omit(acc, prop), task), - params: JSON.stringify(task.params), - state: JSON.stringify(task.state), - }, - }, - }, - }); - - expect(result).toEqual({ ...task, version: 3 }); - }); - }); - - describe('remove', () => { - test('removes the task with the specified id', async () => { - const id = `id-${_.random(1, 20)}`; - const callCluster = sinon.spy(() => - Promise.resolve({ - _index: 'myindex', - _id: id, - _version: 32, - result: 'deleted', - }) - ); - const store = new TaskStore({ - callCluster, - index: 'myindex', - maxAttempts: 2, - supportedTypes: ['a'], - }); - const result = await store.remove(id); - - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'delete'); - - expect(result).toEqual({ - id, - index: 'myindex', - version: 32, - result: 'deleted', - }); - - expect(callCluster.args[0][1]).toMatchObject({ - id, - index: 'myindex', - type: '_doc', - refresh: true, - }); - }); - }); -}); diff --git a/x-pack/plugins/task_manager/task_store.ts b/x-pack/plugins/task_manager/task_store.ts deleted file mode 100644 index 53393521afab57..00000000000000 --- a/x-pack/plugins/task_manager/task_store.ts +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -/* - * This module contains helpers for managing the task manager storage layer. - */ - -import { ConcreteTaskInstance, ElasticJs, TaskInstance, TaskStatus } from './task'; - -const DOC_TYPE = '_doc'; - -export interface StoreOpts { - callCluster: ElasticJs; - index: string; - maxAttempts: number; - supportedTypes: string[]; -} - -export interface FetchOpts { - searchAfter?: any[]; - sort?: object[]; - query?: object; -} - -export interface FetchResult { - searchAfter: any[]; - docs: ConcreteTaskInstance[]; -} - -export interface RemoveResult { - index: string; - id: string; - version: string; - result: string; -} - -// Internal, the raw document, as stored in the Kibana index. -export interface RawTaskDoc { - _id: string; - _index: string; - _type: string; - _version: number; - _source: { - type: string; - task: { - taskType: string; - runAt: Date; - interval?: string; - attempts: number; - status: TaskStatus; - params: string; - state: string; - user?: string; - scope?: string | string[]; - }; - }; -} - -/** - * Wraps an elasticsearch connection and provides a task manager-specific - * interface into the index. - */ -export class TaskStore { - private callCluster: ElasticJs; - private index: string; - private maxAttempts: number; - private supportedTypes: string[]; - - /** - * Constructs a new TaskStore. - * @param {StoreOpts} opts - * @prop {CallCluster} callCluster - The elastic search connection - * @prop {string} index - The name of the task manager index - * @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned - * @prop {string[]} supportedTypes - The task types supported by this store - */ - constructor(opts: StoreOpts) { - this.callCluster = opts.callCluster; - this.index = opts.index; - this.maxAttempts = opts.maxAttempts; - this.supportedTypes = opts.supportedTypes; - } - - /** - * Initializes the store, ensuring the task manager index is created and up to date. - */ - public async init() { - const properties = { - type: { type: 'keyword' }, - task: { - properties: { - taskType: { type: 'keyword' }, - runAt: { type: 'date' }, - interval: { type: 'text' }, - attempts: { type: 'integer' }, - status: { type: 'keyword' }, - params: { type: 'text' }, - state: { type: 'text' }, - user: { type: 'keyword' }, - scope: { type: 'keyword' }, - }, - }, - }; - - try { - await this.callCluster('indices.create', { - index: this.index, - body: { - mappings: { - _doc: { - dynamic: 'strict', - properties, - }, - }, - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - }, - }, - }); - } catch (err) { - if ( - !err.body || - !err.body.error || - err.body.error.type !== 'resource_already_exists_exception' - ) { - throw err; - } - return this.callCluster('indices.putMapping', { - index: this.index, - type: DOC_TYPE, - body: { - properties, - }, - }); - } - } - - /** - * Schedules a task. - * - * @param task - The task being scheduled. - */ - public async schedule(taskInstance: TaskInstance): Promise { - if (!this.supportedTypes.includes(taskInstance.taskType)) { - throw new Error(`Unsupported task type "${taskInstance.taskType}".`); - } - - const { id, ...body } = rawSource(taskInstance); - const result = await this.callCluster('index', { - id, - body, - index: this.index, - type: DOC_TYPE, - refresh: true, - }); - - const { task } = body; - return { - ...taskInstance, - id: result._id, - version: result._version, - attempts: 0, - status: task.status, - runAt: task.runAt, - state: taskInstance.state || {}, - }; - } - - /** - * Fetches a paginatable list of scheduled tasks. - * - * @param opts - The query options used to filter tasks - */ - public async fetch(opts: FetchOpts = {}): Promise { - const sort = paginatableSort(opts.sort); - return this.search({ - sort, - search_after: opts.searchAfter, - query: opts.query, - }); - } - - /** - * Fetches tasks from the index, which are ready to be run. - * - runAt is now or past - * - id is not currently running in this instance of Kibana - * - has a type that is in our task definitions - * - * @param {TaskQuery} query - * @prop {string[]} types - Task types to be queried - * @prop {number} size - The number of task instances to retrieve - * @returns {Promise} - */ - public fetchAvailableTasks = async () => { - const { docs } = await this.search({ - query: { - bool: { - must: [ - { terms: { 'task.taskType': this.supportedTypes } }, - { range: { 'task.attempts': { lte: this.maxAttempts } } }, - { range: { 'task.runAt': { lte: 'now' } } }, - ], - }, - }, - size: 10, - sort: { 'task.runAt': { order: 'asc' } }, - version: true, - }); - - return docs; - }; - - /** - * Updates the specified doc in the index, returning the doc - * with its version up to date. - * - * @param {TaskDoc} doc - * @returns {Promise} - */ - public async update(doc: ConcreteTaskInstance): Promise { - const rawDoc = taskDocToRaw(doc, this.index); - - const { _version } = await this.callCluster('update', { - body: { - doc: rawDoc._source, - }, - id: doc.id, - index: this.index, - type: DOC_TYPE, - version: doc.version, - // The refresh is important so that if we immediately look for work, - // we don't pick up this task. - refresh: true, - }); - - return { - ...doc, - version: _version, - }; - } - - /** - * Removes the specified task from the index. - * - * @param {string} id - * @returns {Promise} - */ - public async remove(id: string): Promise { - const result = await this.callCluster('delete', { - id, - index: this.index, - type: DOC_TYPE, - // The refresh is important so that if we immediately look for work, - // we don't pick up this task. - refresh: true, - }); - - return { - index: result._index, - id: result._id, - version: result._version, - result: result.result, - }; - } - - private async search(opts: any = {}): Promise { - const originalQuery = opts.query; - const queryOnlyTasks = { term: { type: 'task' } }; - const query = originalQuery - ? { bool: { must: [queryOnlyTasks, originalQuery] } } - : queryOnlyTasks; - - const result = await this.callCluster('search', { - type: DOC_TYPE, - index: this.index, - body: { - ...opts, - query, - }, - }); - - const rawDocs = result.hits.hits; - - return { - docs: (rawDocs as RawTaskDoc[]).map(rawToTaskDoc), - searchAfter: (rawDocs.length && rawDocs[rawDocs.length - 1].sort) || [], - }; - } -} - -function paginatableSort(sort: any[] = []) { - const sortById = { _id: 'desc' }; - - if (!sort.length) { - return [{ 'task.runAt': 'asc' }, sortById]; - } - - if (sort.find(({ _id }) => !!_id)) { - return sort; - } - - return [...sort, sortById]; -} - -function rawSource(doc: ConcreteTaskInstance | TaskInstance) { - const { id, ...taskFields } = doc; - const source = { - ...taskFields, - params: JSON.stringify(doc.params || {}), - state: JSON.stringify(doc.state || {}), - attempts: (doc as ConcreteTaskInstance).attempts || 0, - runAt: doc.runAt || new Date(), - status: (doc as ConcreteTaskInstance).status || 'idle', - }; - - delete (source as any).id; - delete (source as any).version; - delete (source as any).type; - - return { - id, - type: 'task', - task: source, - }; -} - -function taskDocToRaw(doc: ConcreteTaskInstance, index: string): RawTaskDoc { - const { type, task } = rawSource(doc); - - return { - _id: doc.id, - _index: index, - _source: { type, task }, - _type: DOC_TYPE, - _version: doc.version, - }; -} - -function rawToTaskDoc(doc: RawTaskDoc): ConcreteTaskInstance { - return { - ...doc._source.task, - id: doc._id, - version: doc._version, - params: parseJSONField(doc._source.task.params, 'params', doc), - state: parseJSONField(doc._source.task.state, 'state', doc), - }; -} - -function parseJSONField(json: string, fieldName: string, doc: RawTaskDoc) { - try { - return json ? JSON.parse(json) : {}; - } catch (error) { - throw new Error(`Task "${doc._id}"'s ${fieldName} field has invalid JSON: ${json}`); - } -} diff --git a/x-pack/plugins/task_manager/test_utils/index.ts b/x-pack/plugins/task_manager/test_utils/index.ts deleted file mode 100644 index 6a427de41b311d..00000000000000 --- a/x-pack/plugins/task_manager/test_utils/index.ts +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -/* - * A handful of helper functions for testing the task manager. - */ - -import sinon from 'sinon'; - -// Caching this here to avoid setTimeout mocking affecting our tests. -const nativeTimeout = setTimeout; - -/** - * Creates a mock task manager Logger. - */ -export function mockLogger() { - return { - info: sinon.stub(), - debug: sinon.stub(), - warning: sinon.stub(), - error: sinon.stub(), - }; -} - -interface Resolvable { - resolve: () => void; -} - -/** - * Creates a promise which can be resolved externally, useful for - * coordinating async tests. - */ -export function resolvable(): PromiseLike & Resolvable { - let resolve: () => void; - const result = new Promise(r => (resolve = r)) as any; - - result.resolve = () => nativeTimeout(resolve, 0); - - return result; -} - -/** - * A simple helper for waiting a specified number of milliseconds. - * - * @param {number} ms - */ -export async function sleep(ms: number) { - return new Promise(r => nativeTimeout(r, ms)); -} diff --git a/x-pack/scripts/functional_tests.js b/x-pack/scripts/functional_tests.js index 5016ae12a6b035..65fa2787c09afc 100644 --- a/x-pack/scripts/functional_tests.js +++ b/x-pack/scripts/functional_tests.js @@ -12,7 +12,6 @@ require('@kbn/test').runTestsCli([ require.resolve('../test/reporting/configs/phantom_functional.js'), require.resolve('../test/functional/config.js'), require.resolve('../test/api_integration/config.js'), - require.resolve('../test/plugin_api_integration/config.js'), require.resolve('../test/saml_api_integration/config.js'), require.resolve('../test/spaces_api_integration/spaces_only/config'), require.resolve('../test/spaces_api_integration/security_and_spaces/config'), diff --git a/x-pack/test/plugin_api_integration/config.js b/x-pack/test/plugin_api_integration/config.js deleted file mode 100644 index e4f7743c444437..00000000000000 --- a/x-pack/test/plugin_api_integration/config.js +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import path from 'path'; -import fs from 'fs'; - -export default async function ({ readConfigFile }) { - const integrationConfig = await readConfigFile(require.resolve('../api_integration/config')); - const kibanaFunctionalConfig = await readConfigFile(require.resolve('../../../test/functional/config.js')); - - // Find all folders in ./plugins since we treat all them as plugin folder - const allFiles = fs.readdirSync(path.resolve(__dirname, 'plugins')); - const plugins = allFiles.filter(file => fs.statSync(path.resolve(__dirname, 'plugins', file)).isDirectory()); - - return { - testFiles: [ - require.resolve('./test_suites/task_manager'), - ], - services: { - retry: kibanaFunctionalConfig.get('services.retry'), - ...integrationConfig.get('services'), - }, - pageObjects: integrationConfig.get('pageObjects'), - servers: integrationConfig.get('servers'), - esTestCluster: integrationConfig.get('esTestCluster'), - apps: integrationConfig.get('apps'), - esArchiver: { - directory: path.resolve(__dirname, '../es_archives') - }, - screenshots: integrationConfig.get('screenshots'), - junit: { - reportName: 'Plugin Functional Tests', - }, - kbnTestServer: { - ...integrationConfig.get('kbnTestServer'), - serverArgs: [ - ...integrationConfig.get('kbnTestServer.serverArgs'), - ...plugins.map(pluginDir => `--plugin-path=${path.resolve(__dirname, 'plugins', pluginDir)}`), - ], - }, - }; -} - diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/index.js b/x-pack/test/plugin_api_integration/plugins/task_manager/index.js deleted file mode 100644 index cc064154c9bb00..00000000000000 --- a/x-pack/test/plugin_api_integration/plugins/task_manager/index.js +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { initRoutes } from './init_routes'; - -export default function (kibana) { - return new kibana.Plugin({ - name: 'sampleTask', - require: ['elasticsearch', 'task_manager'], - - config(Joi) { - return Joi.object({ - enabled: Joi.boolean().default(true), - }).default(); - }, - - init(server) { - const { taskManager } = server; - - taskManager.registerTaskDefinitions({ - sampleTask: { - title: 'Sample Task', - description: 'A sample task for testing the task_manager.', - timeOut: '1m', - numWorkers: 2, - - // This task allows tests to specify its behavior (whether it reschedules itself, whether it errors, etc) - // taskInstance.params has the following optional fields: - // nextRunMilliseconds: number - If specified, the run method will return a runAt that is now + nextRunMilliseconds - // failWith: string - If specified, the task will throw an error with the specified message - createTaskRunner: ({ kbnServer, taskInstance }) => ({ - async run() { - const { params, state } = taskInstance; - const prevState = state || { count: 0 }; - - if (params.failWith) { - throw new Error(params.failWith); - } - - const callCluster = kbnServer.server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; - await callCluster('index', { - index: '.task_manager_test_result', - type: '_doc', - body: { - type: 'task', - taskId: taskInstance.id, - params: JSON.stringify(params), - state: JSON.stringify(state), - ranAt: new Date(), - }, - refresh: true, - }); - - return { - state: { count: (prevState.count || 0) + 1 }, - runAt: millisecondsFromNow(params.nextRunMilliseconds), - }; - }, - }), - }, - }); - - taskManager.addMiddleware({ - async beforeSave({ taskInstance, ...opts }) { - const modifiedInstance = { - ...taskInstance, - params: { - originalParams: taskInstance.params, - superFly: 'My middleware param!', - }, - }; - - return { - ...opts, - taskInstance: modifiedInstance, - }; - }, - - async beforeRun({ taskInstance, ...opts }) { - return { - ...opts, - taskInstance: { - ...taskInstance, - params: taskInstance.params.originalParams, - }, - }; - }, - }); - - initRoutes(server); - }, - }); -} - -function millisecondsFromNow(ms) { - if (!ms) { - return; - } - - const dt = new Date(); - dt.setTime(dt.getTime() + ms); - return dt; -} diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js deleted file mode 100644 index 6206c9b6e39fed..00000000000000 --- a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import Joi from 'joi'; - -export function initRoutes(server) { - const { taskManager } = server; - - server.route({ - path: '/api/sample_tasks', - method: 'POST', - config: { - validate: { - payload: Joi.object({ - taskType: Joi.string().required(), - interval: Joi.string().optional(), - params: Joi.object().required(), - state: Joi.object().optional(), - id: Joi.string().optional(), - }), - }, - }, - async handler(request, reply) { - try { - const task = await taskManager.schedule(request.payload, { request }); - reply(task); - } catch (err) { - reply(err); - } - }, - }); - - server.route({ - path: '/api/sample_tasks', - method: 'GET', - async handler(_req, reply) { - try { - reply(taskManager.fetch()); - } catch (err) { - reply(err); - } - } - }); - - server.route({ - path: '/api/sample_tasks', - method: 'DELETE', - async handler(_req, reply) { - try { - const { docs: tasks } = await taskManager.fetch(); - reply(Promise.all(tasks.map((task) => taskManager.remove(task.id)))); - } catch (err) { - reply(err); - } - }, - }); -} diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/package.json b/x-pack/test/plugin_api_integration/plugins/task_manager/package.json deleted file mode 100644 index ede03a08a2721d..00000000000000 --- a/x-pack/test/plugin_api_integration/plugins/task_manager/package.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "name": "sample_task_plugin", - "version": "kibana" -} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.js deleted file mode 100644 index 2c9f41da09de69..00000000000000 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.js +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -export default function ({ loadTestFile }) { - describe('task_manager', () => { - loadTestFile(require.resolve('./task_manager_integration')); - }); -} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js deleted file mode 100644 index c6c531f83cc62f..00000000000000 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import _ from 'lodash'; -import expect from 'expect.js'; -import url from 'url'; -import supertestAsPromised from 'supertest-as-promised'; - -export default function ({ getService }) { - const es = getService('es'); - const retry = getService('retry'); - const config = getService('config'); - const testHistoryIndex = '.task_manager_test_result'; - const supertest = supertestAsPromised(url.format(config.get('servers.kibana'))); - - describe('scheduling and running tasks', () => { - beforeEach(() => supertest.delete('/api/sample_tasks') - .set('kbn-xsrf', 'xxx') - .expect(200)); - - beforeEach(async () => - (await es.indices.exists({ index: testHistoryIndex })) && es.deleteByQuery({ - index: testHistoryIndex, - q: 'type:task', - refresh: true, - })); - - function currentTasks() { - return supertest.get('/api/sample_tasks') - .expect(200) - .then((response) => response.body); - } - - function historyDocs() { - return es.search({ - index: testHistoryIndex, - type: '_doc', - q: 'type:task', - }).then(result => result.hits.hits); - } - - function scheduleTask(task) { - return supertest.post('/api/sample_tasks') - .set('kbn-xsrf', 'xxx') - .send(task) - .expect(200) - .then((response) => response.body); - } - - it('should support middleware', async () => { - const historyItem = _.random(1, 100); - - await scheduleTask({ - taskType: 'sampleTask', - interval: '30m', - params: { historyItem }, - }); - - await retry.try(async () => { - expect((await historyDocs()).length).to.eql(1); - - const [task] = (await currentTasks()).docs; - - expect(task.attempts).to.eql(0); - expect(task.state.count).to.eql(1); - - expect(task.params).to.eql({ - superFly: 'My middleware param!', - originalParams: { historyItem }, - }); - }); - }); - - it('should remove non-recurring tasks after they complete', async () => { - await scheduleTask({ - taskType: 'sampleTask', - params: { }, - }); - - await retry.try(async () => { - const history = await historyDocs(); - expect(history.length).to.eql(1); - expect((await currentTasks()).docs).to.eql([]); - }); - }); - - it('should use a given ID as the task document ID', async () => { - const result = await scheduleTask({ - id: 'test-task-for-sample-task-plugin-to-test-task-manager', - taskType: 'sampleTask', - params: { }, - }); - - expect(result.id).to.be('test-task-for-sample-task-plugin-to-test-task-manager'); - }); - - it('should reschedule if task errors', async () => { - const task = await scheduleTask({ - taskType: 'sampleTask', - params: { failWith: 'Dangit!!!!!' }, - }); - - await retry.try(async () => { - const [scheduledTask] = (await currentTasks()).docs; - expect(scheduledTask.id).to.eql(task.id); - expect(scheduledTask.attempts).to.be.greaterThan(0); - expect(Date.parse(scheduledTask.runAt)).to.be.greaterThan(Date.parse(task.runAt)); - }); - }); - - it('should reschedule if task returns runAt', async () => { - const nextRunMilliseconds = _.random(60000, 200000); - const count = _.random(1, 20); - - const originalTask = await scheduleTask({ - taskType: 'sampleTask', - params: { nextRunMilliseconds }, - state: { count }, - }); - - await retry.try(async () => { - expect((await historyDocs()).length).to.eql(1); - - const [task] = (await currentTasks()).docs; - expect(task.attempts).to.eql(0); - expect(task.state.count).to.eql(count + 1); - - expectReschedule(originalTask, task, nextRunMilliseconds); - }); - }); - - it('should reschedule if task has an interval', async () => { - const interval = _.random(5, 200); - const intervalMilliseconds = interval * 60000; - - const originalTask = await scheduleTask({ - taskType: 'sampleTask', - interval: `${interval}m`, - params: { }, - }); - - await retry.try(async () => { - expect((await historyDocs()).length).to.eql(1); - - const [task] = (await currentTasks()).docs; - expect(task.attempts).to.eql(0); - expect(task.state.count).to.eql(1); - - expectReschedule(originalTask, task, intervalMilliseconds); - }); - }); - - async function expectReschedule(originalTask, currentTask, expectedDiff) { - const originalRunAt = Date.parse(originalTask.runAt); - const buffer = 10000; - expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(expectedDiff - buffer); - expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.lessThan(expectedDiff + buffer); - } - }); -}