Skip to content

Commit

Permalink
Add config options to limit the amount of tasks executing at once
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisronline committed Apr 27, 2021
1 parent 1785f5d commit 3800f5c
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 94 deletions.
10 changes: 5 additions & 5 deletions x-pack/plugins/actions/server/actions_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ interface ConstructorOptions {
preconfiguredActions: PreConfiguredAction[];
actionExecutor: ActionExecutorContract;
executionEnqueuer: ExecutionEnqueuer;
ephemeralRunNow: (task: EphemeralTask) => void;
ephemeralRunNow: (tasks: EphemeralTask[]) => void;
request: KibanaRequest;
authorization: ActionsAuthorization;
auditLogger?: AuditLogger;
Expand All @@ -92,7 +92,7 @@ export class ActionsClient {
private readonly authorization: ActionsAuthorization;
private readonly executionEnqueuer: ExecutionEnqueuer;
private readonly ephemeralRunNow: (
task: EphemeralTask,
tasks: EphemeralTask[],
options?: Record<string, unknown>
) => void;
private readonly auditLogger?: AuditLogger;
Expand Down Expand Up @@ -493,8 +493,8 @@ export class ActionsClient {
return this.executionEnqueuer(this.unsecuredSavedObjectsClient, options);
}

public async executeEphemeralTask(
task: EphemeralTask,
public async executeEphemeralTasks(
tasks: EphemeralTask[],
options?: Record<string, unknown>
): Promise<void> {
// const { source } = options;
Expand All @@ -504,7 +504,7 @@ export class ActionsClient {
// ) {
// await this.authorization.ensureAuthorized('execute');
// }
return this.ephemeralRunNow(task, options);
return this.ephemeralRunNow(tasks, options);
}

public async listTypes(): Promise<ActionType[]> {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/actions/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
private readonly telemetryLogger: Logger;
private readonly preconfiguredActions: PreConfiguredAction[];
private readonly kibanaIndexConfig: { kibana: { index: string } };
private ephemeralRunNow?: (task: EphemeralTask, options?: Record<string, unknown>) => void;
private ephemeralRunNow?: (tasks: EphemeralTask[], options?: Record<string, unknown>) => void;

constructor(initContext: PluginInitializerContext) {
this.logger = initContext.logger.get('actions');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
RawAlert,
} from '../types';
import { NormalizedAlertType } from '../alert_type_registry';
import { EphemeralTask } from '../../../task_manager/server';

export interface CreateExecutionHandlerOptions<
Params extends AlertTypeParams,
Expand Down Expand Up @@ -143,7 +144,9 @@ export function createExecutionHandler<

const alertLabel = `${alertType.id}:${alertId}: '${alertName}'`;

const promises = [];
// const promises = [];

const tasks: EphemeralTask[] = [];

for (const action of actions) {
if (
Expand All @@ -157,50 +160,50 @@ export function createExecutionHandler<

// TODO would be nice to add the action name here, but it's not available
const actionLabel = `${action.actionTypeId}:${action.id}`;
const actionsClient = await actionsPlugin.getActionsClientWithRequest(request);
const promise = new Promise(async (resolve, reject) => {
try {
await actionsClient.executeEphemeralTask({
taskType: `actions:${action.actionTypeId}`,
params: {
...action.params,
taskParams: {
actionId: action.id,
apiKey,
},
},
state: {},
});

const namespace = spaceId === 'default' ? {} : { namespace: spaceId };

const event: IEvent = {
event: { action: EVENT_LOG_ACTIONS.executeAction },
kibana: {
alerting: {
instance_id: alertInstanceId,
action_group_id: actionGroup,
action_subgroup: actionSubgroup,
},
saved_objects: [
{ rel: SAVED_OBJECT_REL_PRIMARY, type: 'alert', id: alertId, ...namespace },
{ type: 'action', id: action.id, ...namespace },
],
},
};

event.message = `alert: ${alertLabel} instanceId: '${alertInstanceId}' scheduled ${
actionSubgroup
? `actionGroup(subgroup): '${actionGroup}(${actionSubgroup})'`
: `actionGroup: '${actionGroup}'`
} action: ${actionLabel}`;
eventLogger.logEvent(event);
resolve(true);
} catch (err) {
return reject(err);
}

// const promise = new Promise(async (resolve, reject) => {
// try {
tasks.push({
taskType: `actions:${action.actionTypeId}`,
params: {
...action.params,
taskParams: {
actionId: action.id,
apiKey,
},
},
state: {},
});
promises.push(promise);

const namespace = spaceId === 'default' ? {} : { namespace: spaceId };

const event: IEvent = {
event: { action: EVENT_LOG_ACTIONS.executeAction },
kibana: {
alerting: {
instance_id: alertInstanceId,
action_group_id: actionGroup,
action_subgroup: actionSubgroup,
},
saved_objects: [
{ rel: SAVED_OBJECT_REL_PRIMARY, type: 'alert', id: alertId, ...namespace },
{ type: 'action', id: action.id, ...namespace },
],
},
};

event.message = `alert: ${alertLabel} instanceId: '${alertInstanceId}' scheduled ${
actionSubgroup
? `actionGroup(subgroup): '${actionGroup}(${actionSubgroup})'`
: `actionGroup: '${actionGroup}'`
} action: ${actionLabel}`;
eventLogger.logEvent(event);
// resolve(true);
// } catch (err) {
// return reject(err);
// }
// });
// promises.push(promise);
// await actionsClient.executeEphemeralTask({
// taskType: `actions:${action.actionTypeId}`,
// params: {
Expand All @@ -214,6 +217,9 @@ export function createExecutionHandler<
// });
}

await Promise.all(promises);
const actionsClient = await actionsPlugin.getActionsClientWithRequest(request);
await actionsClient.executeEphemeralTasks(tasks);

// await Promise.all(promises);
};
}
7 changes: 7 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
export const DEFAULT_MAX_EPHEMERAL_TASKS_PER_CYCLE = 10;

// Monitoring Constants
// ===================
Expand Down Expand Up @@ -109,6 +110,12 @@ export const configSchema = schema.object(
defaultValue: {},
}),
}),
ephemeral_tasks: schema.object({
max_per_cycle: schema.number({
defaultValue: DEFAULT_MAX_EPHEMERAL_TASKS_PER_CYCLE,
}),
enabled: schema.boolean({ defaultValue: false }),
}),
},
{
validate: (config) => {
Expand Down
45 changes: 22 additions & 23 deletions x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,31 +86,30 @@ export class EphemeralTaskLifecycle {
.subscribe(async (e) => {
let overallCapacity = this.getCapacity();
const capacityByType = new Map<string, number>();
const queue = [...this.ephemeralTaskQueue].filter((ephemeralTask) => {
if (overallCapacity > 0) {
if (!capacityByType.has(ephemeralTask.taskType)) {
capacityByType.set(ephemeralTask.taskType, this.getCapacity(ephemeralTask.taskType));
}
if (capacityByType.get(ephemeralTask.taskType)! > 0) {
overallCapacity--;
capacityByType.set(
ephemeralTask.taskType,
capacityByType.get(ephemeralTask.taskType)! - 1
);
return true;
}
}
});
const runQueue = this.config.ephemeral_tasks.enabled
? queue.slice(0, this.config.ephemeral_tasks.max_per_cycle)
: queue;
this.pool
.run(
[...this.ephemeralTaskQueue]
.filter((ephemeralTask) => {
if (overallCapacity > 0) {
if (!capacityByType.has(ephemeralTask.taskType)) {
capacityByType.set(
ephemeralTask.taskType,
this.getCapacity(ephemeralTask.taskType)
);
}
if (capacityByType.get(ephemeralTask.taskType)! > 0) {
overallCapacity--;
capacityByType.set(
ephemeralTask.taskType,
capacityByType.get(ephemeralTask.taskType)! - 1
);
return true;
}
}
})
.map((taskToRun) => {
this.ephemeralTaskQueue.delete(taskToRun);
return this.createTaskRunnerForTask(taskToRun);
})
runQueue.map((taskToRun) => {
this.ephemeralTaskQueue.delete(taskToRun);
return this.createTaskRunnerForTask(taskToRun);
})
)
.then((successTaskPoolRunResult) => {
this.logger.debug(
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ export class TaskManagerPlugin
schedule: (...args) => taskScheduling.schedule(...args),
ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args),
runNow: (...args) => taskScheduling.runNow(...args),
ephemeralRunNow: (task: EphemeralTask, options?: Record<string, unknown>) =>
taskScheduling.ephemeralRunNow(task),
ephemeralRunNow: (tasks: EphemeralTask[], options?: Record<string, unknown>) =>
taskScheduling.ephemeralRunNow(tasks),
};
}

Expand Down
44 changes: 26 additions & 18 deletions x-pack/plugins/task_manager/server/task_scheduling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,33 @@ export class TaskScheduling {
* @returns {Promise<ConcreteTaskInstance>}
*/
public async ephemeralRunNow(
task: EphemeralTask,
tasks: EphemeralTask[],
options?: Record<string, unknown>
): Promise<RunNowResult> {
const id = uuid.v4();
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance: task,
});
return new Promise(async (resolve, reject) => {
this.awaitTaskRunResult(id).then(resolve).catch(reject);
this.ephemeralTaskLifecycle.attemptToRun({
id,
scheduledAt: new Date(),
runAt: new Date(),
status: TaskStatus.Idle,
ownerId: this.taskManagerId,
...modifiedTask,
});
});
): Promise<RunNowResult[]> {
return new Promise(async () =>
Promise.allSettled(
tasks.map(
async (task): Promise<RunNowResult> => {
const id = uuid.v4();
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance: task,
});
return new Promise(async (resolve, reject) => {
this.awaitTaskRunResult(id).then(resolve).catch(reject);
this.ephemeralTaskLifecycle.attemptToRun({
id,
scheduledAt: new Date(),
runAt: new Date(),
status: TaskStatus.Idle,
ownerId: this.taskManagerId,
...modifiedTask,
});
});
}
)
)
);
}

/**
Expand Down

0 comments on commit 3800f5c

Please sign in to comment.