Skip to content

Commit

Permalink
[Task manager] Adds ensureScheduling api to allow safer rescheduling …
Browse files Browse the repository at this point in the history
…of existing tasks (elastic#50232)

Adds an ensureScheduling api to Task Manager which allow safer rescheduling of existing tasks by handling the case where a Task with a known ID is scheduled and clashes with an existing schedule of that same task.
  • Loading branch information
gmmorris committed Nov 27, 2019
1 parent 6d2c960 commit 3fd3cf7
Show file tree
Hide file tree
Showing 14 changed files with 442 additions and 16 deletions.
245 changes: 245 additions & 0 deletions x-pack/legacy/plugins/lens/server/usage/task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import moment from 'moment';
import KbnServer, { Server } from 'src/legacy/server/kbn_server';
import { CoreSetup } from 'src/core/server';
import { CallClusterOptions } from 'src/legacy/core_plugins/elasticsearch';
import {
SearchParams,
DeleteDocumentByQueryParams,
SearchResponse,
DeleteDocumentByQueryResponse,
} from 'elasticsearch';
import { ESSearchResponse } from '../../../apm/typings/elasticsearch';
import { XPackMainPlugin } from '../../../xpack_main/xpack_main';
import { RunContext } from '../../../task_manager';
import { getVisualizationCounts } from './visualization_counts';

// This task is responsible for running daily and aggregating all the Lens click event objects
// into daily rolled-up documents, which will be used in reporting click stats

const TELEMETRY_TASK_TYPE = 'lens_telemetry';

export const TASK_ID = `Lens-${TELEMETRY_TASK_TYPE}`;

type ClusterSearchType = (
endpoint: 'search',
params: SearchParams & {
rest_total_hits_as_int: boolean;
},
options?: CallClusterOptions
) => Promise<SearchResponse<unknown>>;
type ClusterDeleteType = (
endpoint: 'deleteByQuery',
params: DeleteDocumentByQueryParams,
options?: CallClusterOptions
) => Promise<DeleteDocumentByQueryResponse>;

export function initializeLensTelemetry(core: CoreSetup, { server }: { server: Server }) {
registerLensTelemetryTask(core, { server });
scheduleTasks(server);
}

function registerLensTelemetryTask(core: CoreSetup, { server }: { server: Server }) {
const taskManager = server.plugins.task_manager;

if (!taskManager) {
server.log(['debug', 'telemetry'], `Task manager is not available`);
return;
}

taskManager.registerTaskDefinitions({
[TELEMETRY_TASK_TYPE]: {
title: 'Lens telemetry fetch task',
type: TELEMETRY_TASK_TYPE,
timeout: '1m',
createTaskRunner: telemetryTaskRunner(server),
},
});
}

function scheduleTasks(server: Server) {
const taskManager = server.plugins.task_manager;
const { kbnServer } = (server.plugins.xpack_main as XPackMainPlugin & {
status: { plugin: { kbnServer: KbnServer } };
}).status.plugin;

if (!taskManager) {
server.log(['debug', 'telemetry'], `Task manager is not available`);
return;
}

kbnServer.afterPluginsInit(() => {
// The code block below can't await directly within "afterPluginsInit"
// callback due to circular dependency The server isn't "ready" until
// this code block finishes. Migrations wait for server to be ready before
// executing. Saved objects repository waits for migrations to finish before
// finishing the request. To avoid this, we'll await within a separate
// function block.
(async () => {
try {
await taskManager.ensureScheduled({
id: TASK_ID,
taskType: TELEMETRY_TASK_TYPE,
state: { byDate: {}, suggestionsByDate: {}, saved: {}, runs: 0 },
params: {},
});
} catch (e) {
server.log(['debug', 'telemetry'], `Error scheduling task, received ${e.message}`);
}
})();
});
}

export async function getDailyEvents(
kibanaIndex: string,
callCluster: ClusterSearchType & ClusterDeleteType
): Promise<{
byDate: Record<string, Record<string, number>>;
suggestionsByDate: Record<string, Record<string, number>>;
}> {
const aggs = {
daily: {
date_histogram: {
field: 'lens-ui-telemetry.date',
calendar_interval: '1d',
min_doc_count: 1,
},
aggs: {
groups: {
filters: {
filters: {
suggestionEvents: {
bool: {
filter: {
term: { 'lens-ui-telemetry.type': 'suggestion' },
},
},
},
regularEvents: {
bool: {
must_not: {
term: { 'lens-ui-telemetry.type': 'suggestion' },
},
},
},
},
},
aggs: {
names: {
terms: { field: 'lens-ui-telemetry.name', size: 100 },
aggs: {
sums: { sum: { field: 'lens-ui-telemetry.count' } },
},
},
},
},
},
},
};

const metrics: ESSearchResponse<
unknown,
{
body: { aggs: typeof aggs };
},
{ restTotalHitsAsInt: true }
> = await callCluster('search', {
index: kibanaIndex,
rest_total_hits_as_int: true,
body: {
query: {
bool: {
filter: [
{ term: { type: 'lens-ui-telemetry' } },
{ range: { 'lens-ui-telemetry.date': { gte: 'now-90d/d' } } },
],
},
},
aggs,
},
size: 0,
});

const byDateByType: Record<string, Record<string, number>> = {};
const suggestionsByDate: Record<string, Record<string, number>> = {};

metrics.aggregations!.daily.buckets.forEach(daily => {
const byType: Record<string, number> = byDateByType[daily.key] || {};
daily.groups.buckets.regularEvents.names.buckets.forEach(bucket => {
byType[bucket.key] = (bucket.sums.value || 0) + (byType[daily.key] || 0);
});
byDateByType[daily.key] = byType;

const suggestionsByType: Record<string, number> = suggestionsByDate[daily.key] || {};
daily.groups.buckets.suggestionEvents.names.buckets.forEach(bucket => {
suggestionsByType[bucket.key] =
(bucket.sums.value || 0) + (suggestionsByType[daily.key] || 0);
});
suggestionsByDate[daily.key] = suggestionsByType;
});

// Always delete old date because we don't report it
await callCluster('deleteByQuery', {
index: kibanaIndex,
waitForCompletion: true,
body: {
query: {
bool: {
filter: [
{ term: { type: 'lens-ui-telemetry' } },
{ range: { 'lens-ui-telemetry.date': { lt: 'now-90d/d' } } },
],
},
},
},
});

return {
byDate: byDateByType,
suggestionsByDate,
};
}

export function telemetryTaskRunner(server: Server) {
return ({ taskInstance }: RunContext) => {
const { state } = taskInstance;
const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser;

return {
async run() {
const kibanaIndex = server.config().get<string>('kibana.index');

return Promise.all([
getDailyEvents(kibanaIndex, callCluster),
getVisualizationCounts(callCluster, server.config()),
])
.then(([lensTelemetry, lensVisualizations]) => {
return {
state: {
runs: (state.runs || 0) + 1,
byDate: (lensTelemetry && lensTelemetry.byDate) || {},
suggestionsByDate: (lensTelemetry && lensTelemetry.suggestionsByDate) || {},
saved: lensVisualizations,
},
runAt: getNextMidnight(),
};
})
.catch(errMsg =>
server.log(['warning'], `Error executing lens telemetry task: ${errMsg}`)
);
},
};
};
}

function getNextMidnight() {
return moment()
.add(1, 'day')
.startOf('day')
.toDate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export function scheduleTask(server, taskManager) {
// function block.
(async () => {
try {
await taskManager.schedule({
await taskManager.ensureScheduled({
id: TASK_ID,
taskType: TELEMETRY_TASK_TYPE,
state: { stats: {}, runs: 0 },
Expand Down
2 changes: 1 addition & 1 deletion x-pack/legacy/plugins/oss_telemetry/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export interface HapiServer {
};
task_manager: {
registerTaskDefinitions: (opts: any) => void;
schedule: (opts: any) => Promise<void>;
ensureScheduled: (opts: any) => Promise<void>;
fetch: (
opts: any
) => Promise<{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export function scheduleTasks(server: HapiServer) {
// function block.
(async () => {
try {
await taskManager.schedule({
await taskManager.ensureScheduled({
id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}`,
taskType: VIS_TELEMETRY_TASK,
state: { stats: {}, runs: 0 },
Expand Down
2 changes: 1 addition & 1 deletion x-pack/legacy/plugins/oss_telemetry/test_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export const getMockKbnServer = (
xpack_main: {},
task_manager: {
registerTaskDefinitions: (opts: any) => undefined,
schedule: (opts: any) => Promise.resolve(),
ensureScheduled: (opts: any) => Promise.resolve(),
fetch: mockTaskFetch,
},
},
Expand Down
11 changes: 11 additions & 0 deletions x-pack/legacy/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ The data stored for a task instance looks something like this:

The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected.

### schedule
Using `schedule` you can instruct TaskManger to schedule an instance of a TaskType at some point in the future.

```js
const taskManager = server.plugins.task_manager;
// Schedules a task. All properties are as documented in the previous
Expand Down Expand Up @@ -248,6 +251,14 @@ const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids']
}
```

### ensureScheduling
When using the `schedule` api to schedule a Task you can provide a hard coded `id` on the Task. This tells TaskManager to use this `id` to identify the Task Instance rather than generate an `id` on its own.
The danger is that in such a situation, a Task with that same `id` might already have been scheduled at some earlier point, and this would result in an error. In some cases, this is the expected behavior, but often you only care about ensuring the task has been _scheduled_ and don't need it to be scheduled a fresh.

To achieve this you should use the `ensureScheduling` api which has the exact same behavior as `schedule`, except it allows the scheduling of a Task with an `id` that's already in assigned to another Task and it will assume that the existing Task is the one you wished to `schedule`, treating this as a successful operation.

### more options

More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time.

## Middleware
Expand Down
1 change: 1 addition & 0 deletions x-pack/legacy/plugins/task_manager/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ describe('Task Manager Plugin', () => {
expect(setupResult).toMatchInlineSnapshot(`
Object {
"addMiddleware": [Function],
"ensureScheduled": [Function],
"fetch": [Function],
"registerTaskDefinitions": [Function],
"remove": [Function],
Expand Down
2 changes: 2 additions & 0 deletions x-pack/legacy/plugins/task_manager/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface PluginSetupContract {
fetch: TaskManager['fetch'];
remove: TaskManager['remove'];
schedule: TaskManager['schedule'];
ensureScheduled: TaskManager['ensureScheduled'];
addMiddleware: TaskManager['addMiddleware'];
registerTaskDefinitions: TaskManager['registerTaskDefinitions'];
}
Expand Down Expand Up @@ -59,6 +60,7 @@ export class Plugin {
fetch: (...args) => taskManager.fetch(...args),
remove: (...args) => taskManager.remove(...args),
schedule: (...args) => taskManager.schedule(...args),
ensureScheduled: (...args) => taskManager.ensureScheduled(...args),
addMiddleware: (...args) => taskManager.addMiddleware(...args),
registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args),
};
Expand Down
19 changes: 19 additions & 0 deletions x-pack/legacy/plugins/task_manager/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ import Joi from 'joi';
* Type definitions and validations for tasks.
*/

/**
* Require
* @desc Create a Subtype of type T `T` such that the property under key `P` becomes required
* @example
* type TaskInstance = {
* id?: string;
* name: string;
* };
*
* // This type is now defined as { id: string; name: string; }
* type TaskInstanceWithId = Require<TaskInstance, 'id'>;
*/
type Require<T extends object, P extends keyof T> = Omit<T, P> & Required<Pick<T, P>>;

/**
* A loosely typed definition of the elasticjs wrapper. It's beyond the scope
* of this work to try to make a comprehensive type definition of this.
Expand Down Expand Up @@ -227,6 +241,11 @@ export interface TaskInstance {
scope?: string[];
}

/**
* A task instance that has an id.
*/
export type TaskInstanceWithId = Require<TaskInstance, 'id'>;

/**
* A task instance that has an id and is ready for storage.
*/
Expand Down
1 change: 1 addition & 0 deletions x-pack/legacy/plugins/task_manager/task_manager.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const createTaskManagerMock = () => {
const mocked: jest.Mocked<TaskManager> = {
registerTaskDefinitions: jest.fn(),
addMiddleware: jest.fn(),
ensureScheduled: jest.fn(),
schedule: jest.fn(),
fetch: jest.fn(),
remove: jest.fn(),
Expand Down
Loading

0 comments on commit 3fd3cf7

Please sign in to comment.