Skip to content

Commit

Permalink
expose Task Manager via a Kibana Platform plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Dec 31, 2019
1 parent 2cbc508 commit f7c61c7
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 81 deletions.
4 changes: 2 additions & 2 deletions x-pack/legacy/plugins/actions/server/shim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export function shim(

const pluginsSetup: ActionsPluginsSetup = {
security: newPlatform.setup.plugins.security as SecurityPluginSetupContract | undefined,
task_manager: server.plugins.task_manager,
task_manager: newPlatform.setup.plugins.kibanaTaskManager as TaskManagerSetupContract,
xpack_main: server.plugins.xpack_main,
encryptedSavedObjects: newPlatform.setup.plugins
.encryptedSavedObjects as EncryptedSavedObjectsSetupContract,
Expand All @@ -146,7 +146,7 @@ export function shim(
spaces: () => server.plugins.spaces,
encryptedSavedObjects: newPlatform.start.plugins
.encryptedSavedObjects as EncryptedSavedObjectsStartContract,
task_manager: server.plugins.task_manager,
task_manager: newPlatform.start.plugins.kibanaTaskManager as TaskManagerStartContract,
};

return {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/legacy/plugins/alerting/server/shim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export function shim(

const pluginsSetup: AlertingPluginsSetup = {
security: newPlatform.setup.plugins.security as SecurityPluginSetupContract | undefined,
task_manager: server.plugins.task_manager,
task_manager: newPlatform.setup.plugins.kibanaTaskManager as TaskManagerSetupContract,
actions: server.plugins.actions.setup,
xpack_main: server.plugins.xpack_main,
encryptedSavedObjects: newPlatform.setup.plugins
Expand All @@ -137,7 +137,7 @@ export function shim(
spaces: () => server.plugins.spaces,
encryptedSavedObjects: newPlatform.start.plugins
.encryptedSavedObjects as EncryptedSavedObjectsStartContract,
task_manager: server.plugins.task_manager,
task_manager: newPlatform.start.plugins.kibanaTaskManager as TaskManagerStartContract,
};

return {
Expand Down
11 changes: 8 additions & 3 deletions x-pack/legacy/plugins/lens/server/usage/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import {
SearchResponse,
DeleteDocumentByQueryResponse,
} from 'elasticsearch';
import { TaskManagerPluginStartContract } from '../../../../../plugins/kibana_task_manager/server';
import { ESSearchResponse } from '../../../apm/typings/elasticsearch';
import { XPackMainPlugin } from '../../../xpack_main/server/xpack_main';
import { RunContext } from '../../../task_manager/server';
import { RunContext, TaskManager } from '../../../task_manager/server';
import { getVisualizationCounts } from './visualization_counts';

// This task is responsible for running daily and aggregating all the Lens click event objects
Expand Down Expand Up @@ -45,7 +46,10 @@ export function initializeLensTelemetry(core: CoreSetup, server: Server) {
}

function registerLensTelemetryTask(core: CoreSetup, server: Server) {
const taskManager = server.plugins.task_manager;
const taskManager = {
...server.newPlatform.setup.plugins.kibanaTaskManager,
...server.newPlatform.start.plugins.kibanaTaskManager,
} as TaskManager;

if (!taskManager) {
server.log(['debug', 'telemetry'], `Task manager is not available`);
Expand All @@ -63,7 +67,8 @@ function registerLensTelemetryTask(core: CoreSetup, server: Server) {
}

function scheduleTasks(server: Server) {
const taskManager = server.plugins.task_manager;
const taskManager = server.newPlatform.start.plugins
.kibanaTaskManager as TaskManagerPluginStartContract;
const { kbnServer } = (server.plugins.xpack_main as XPackMainPlugin & {
status: { plugin: { kbnServer: KbnServer } };
}).status.plugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const TELEMETRY_TASK_TYPE = 'maps_telemetry';
export const TASK_ID = `Maps-${TELEMETRY_TASK_TYPE}`;

export function scheduleTask(server) {
const taskManager = server.plugins.task_manager;
const taskManager = server.newPlatform.start.plugins.kibanaTaskManager;

if (!taskManager) {
server.log(['debug', 'telemetry'], `Task manager is not available`);
Expand Down Expand Up @@ -42,7 +42,10 @@ export function scheduleTask(server) {
}

export function registerMapsTelemetryTask(server) {
const taskManager = server.plugins.task_manager;
const taskManager = {
...server.newPlatform.setup.plugins.kibanaTaskManager,
...server.newPlatform.start.plugins.kibanaTaskManager,
};

if (!taskManager) {
server.log(['debug', 'telemetry'], `Task manager is not available`);
Expand Down
2 changes: 1 addition & 1 deletion x-pack/legacy/plugins/oss_telemetry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const ossTelemetry: LegacyPluginInitializer = kibana => {
} as PluginInitializerContext);
plugin.setup(server.newPlatform.setup.core, {
usageCollection: server.newPlatform.setup.plugins.usageCollection,
taskManager: server.plugins.task_manager,
taskManager: server.newPlatform.setup.plugins.kibanaTaskManager,
__LEGACY: {
config: server.config(),
xpackMainStatus: ((server.plugins.xpack_main as unknown) as { status: any }).status
Expand Down
48 changes: 24 additions & 24 deletions x-pack/legacy/plugins/task_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

import { Root } from 'joi';
import { Legacy } from 'kibana';
import { Plugin, PluginSetupContract } from './plugin';
import { Plugin, PluginContract } from './plugin';
import { SavedObjectsSerializer, SavedObjectsSchema } from '../../../../../src/core/server';
import mappings from './mappings.json';
import { migrations } from './migrations';
import { TaskManagerPluginSetupContract } from '../../../../plugins/kibana_task_manager/server';
export { Middleware } from './lib/middleware';
export { TaskDictionary, TaskDefinition } from './task';

export { PluginSetupContract as TaskManager };
export { PluginContract as TaskManager };
export {
TaskInstance,
ConcreteTaskInstance,
Expand Down Expand Up @@ -55,32 +58,29 @@ export function taskManager(kibana: any) {
.default(10),
}).default();
},
init(server: Legacy.Server) {
const plugin = new Plugin({
logger: {
get: () => ({
info: (message: string) => server.log(['info', 'task_manager'], message),
debug: (message: string) => server.log(['debug', 'task_manager'], message),
warn: (message: string) => server.log(['warn', 'task_manager'], message),
error: (message: string) => server.log(['error', 'task_manager'], message),
}),
},
});
async init(server: Legacy.Server) {
const legacyPlugin = new Plugin();
const schema = new SavedObjectsSchema(this.kbnServer.uiExports.savedObjectSchemas);
const serializer = new SavedObjectsSerializer(schema);
const setupContract = plugin.setup(
{},
{

const {
savedObjects,
newPlatform: {
setup: {
plugins: { kibanaTaskManager },
},
},
plugins: { elasticsearch },
} = server;

await (kibanaTaskManager as TaskManagerPluginSetupContract).registerLegacyAPI({
legacyPlugin,
legacyDependencies: {
serializer,
config: server.config(),
elasticsearch: server.plugins.elasticsearch,
savedObjects: server.savedObjects,
}
);
this.kbnServer.afterPluginsInit(() => {
plugin.start();
savedObjects,
elasticsearch,
},
});
server.expose(setupContract);
},
uiExports: {
mappings,
Expand Down
48 changes: 15 additions & 33 deletions x-pack/legacy/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { CoreSetup } from 'kibana/server';
import { Logger } from './types';
import { TaskManager } from './task_manager';

export interface PluginSetupContract {
export interface PluginContract {
fetch: TaskManager['fetch'];
remove: TaskManager['remove'];
schedule: TaskManager['schedule'];
Expand All @@ -22,61 +23,42 @@ export interface LegacyDeps {
serializer: any;
elasticsearch: any;
savedObjects: any;
}

interface PluginInitializerContext {
logger: {
get: () => Logger;
};
logger: Logger;
}

export class Plugin {
private logger: Logger;
private taskManager?: TaskManager;

constructor(initializerContext: PluginInitializerContext) {
this.logger = initializerContext.logger.get();
}
constructor() {}

// TODO: Make asynchronous like new platform
public setup(
core: {},
{ config, serializer, elasticsearch, savedObjects }: LegacyDeps
): PluginSetupContract {
core: CoreSetup,
{ logger, config, serializer, elasticsearch, savedObjects }: LegacyDeps
): PluginContract {
const { callWithInternalUser } = elasticsearch.getCluster('admin');
const savedObjectsRepository = savedObjects.getSavedObjectsRepository(callWithInternalUser, [
'task',
]);

const taskManager = new TaskManager({
this.taskManager = new TaskManager({
taskManagerId: core.uuid.getInstanceUuid(),
config,
savedObjectsRepository,
serializer,
callWithInternalUser,
logger: this.logger,
logger,
});
this.taskManager = taskManager;

return {
fetch: (...args) => taskManager.fetch(...args),
remove: (...args) => taskManager.remove(...args),
schedule: (...args) => taskManager.schedule(...args),
runNow: (...args) => taskManager.runNow(...args),
ensureScheduled: (...args) => taskManager.ensureScheduled(...args),
addMiddleware: (...args) => taskManager.addMiddleware(...args),
registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args),
};
return this.taskManager;
}

public start() {
if (this.taskManager) {
this.taskManager.start();
}
public start(): PluginContract {
this.taskManager!.start();
return this.taskManager!;
}

public stop() {
if (this.taskManager) {
this.taskManager.stop();
}
this.taskManager!.stop();
}
}
22 changes: 14 additions & 8 deletions x-pack/legacy/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import { performance } from 'perf_hooks';

import { pipe } from 'fp-ts/lib/pipeable';
import { Option, none, some, map as mapOptional } from 'fp-ts/lib/Option';

import { SavedObjectsClientContract, SavedObjectsSerializer } from '../../../../../src/core/server';
import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { TaskManagerConfig } from '../../../../plugins/kibana_task_manager/server';

import { Logger } from './types';
import {
Expand Down Expand Up @@ -56,10 +58,11 @@ const VERSION_CONFLICT_STATUS = 409;

export interface TaskManagerOpts {
logger: Logger;
config: any;
config: TaskManagerConfig;
callWithInternalUser: any;
savedObjectsRepository: SavedObjectsClientContract;
serializer: SavedObjectsSerializer;
taskManagerId: string;
}

interface RunNowResult {
Expand Down Expand Up @@ -102,15 +105,18 @@ export class TaskManager {
beforeMarkRunning: async (runOpts: RunContext) => runOpts,
};

private shouldAllowRegistrationAfterStart: boolean;

/**
* Initializes the task manager, preventing any further addition of middleware,
* enabling the task manipulation methods, and beginning the background polling
* mechanism.
*/
constructor(opts: TaskManagerOpts) {
this.logger = opts.logger;
this.shouldAllowRegistrationAfterStart = true;

const taskManagerId = opts.config.get('server.uuid');
const { taskManagerId } = opts;
if (!taskManagerId) {
this.logger.error(
`TaskManager is unable to start as there the Kibana UUID is invalid (value of the "server.uuid" configuration is ${taskManagerId})`
Expand All @@ -124,8 +130,8 @@ export class TaskManager {
serializer: opts.serializer,
savedObjectsRepository: opts.savedObjectsRepository,
callCluster: opts.callWithInternalUser,
index: opts.config.get('xpack.task_manager.index'),
maxAttempts: opts.config.get('xpack.task_manager.max_attempts'),
index: opts.config.index,
maxAttempts: opts.config.max_attempts,
definitions: this.definitions,
taskManagerId: `kibana:${taskManagerId}`,
});
Expand All @@ -134,12 +140,12 @@ export class TaskManager {

this.pool = new TaskPool({
logger: this.logger,
maxWorkers: opts.config.get('xpack.task_manager.max_workers'),
maxWorkers: opts.config.max_workers,
});

this.poller$ = createTaskPoller<string, FillPoolResult>({
pollInterval: opts.config.get('xpack.task_manager.poll_interval'),
bufferCapacity: opts.config.get('xpack.task_manager.request_capacity'),
pollInterval: opts.config.poll_interval,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
Expand Down Expand Up @@ -345,7 +351,7 @@ export class TaskManager {
* @returns void
*/
private assertUninitialized(message: string) {
if (this.isStarted) {
if (!this.shouldAllowRegistrationAfterStart && this.isStarted) {
throw new Error(`Cannot ${message} after the task manager is initialized!`);
}
}
Expand Down
8 changes: 8 additions & 0 deletions x-pack/plugins/kibana_task_manager/kibana.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "kibanaTaskManager",
"server": true,
"version": "8.0.0",
"kibanaVersion": "kibana",
"configPath": ["xpack", "task_manager"],
"ui": false
}
33 changes: 33 additions & 0 deletions x-pack/plugins/kibana_task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { configSchema } from './config';

describe('config validation', () => {
test('task manager defaults', () => {
const config: Record<string, any> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"enabled": true,
"index": ".kibana_task_manager",
"max_attempts": 3,
"max_workers": 10,
"poll_interval": 3000,
"request_capacity": 1000,
}
`);
});

test('the ElastiSearch Tasks index cannot be used for task manager', () => {
const config: Record<string, any> = {
index: '.tasks',
};
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"[index]: \\".tasks\\" is an invalid Kibana Task Manager index, as it is already in use by the ElasticSearch Tasks Manager"`
);
});
});
Loading

0 comments on commit f7c61c7

Please sign in to comment.