Skip to content

Commit

Permalink
refactored alert task runner
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Dec 20, 2019
1 parent 9513123 commit f2971df
Showing 1 changed file with 84 additions and 48 deletions.
132 changes: 84 additions & 48 deletions x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Logger } from '../../../../../../src/core/server';
import { pick, mapValues, omit } from 'lodash';
import { Logger, SavedObject } from '../../../../../../src/core/server';
import { RunContext } from '../../../task_manager';
import { createExecutionHandler } from './create_execution_handler';
import { createAlertInstanceFactory } from './create_alert_instance_factory';
Expand All @@ -15,7 +15,6 @@ import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../..
import { PluginStartContract as ActionsPluginStartContract } from '../../../actions';
import {
AlertType,
AlertServices,
GetBasePathFunction,
GetServicesFunction,
RawAlert,
Expand All @@ -32,6 +31,8 @@ export interface TaskRunnerContext {
getBasePath: GetBasePathFunction;
}

type AlertInstances = Record<string, AlertInstance>;

export class TaskRunnerFactory {
private isInitialized = false;
private taskRunnerContext?: TaskRunnerContext;
Expand Down Expand Up @@ -59,9 +60,7 @@ export class TaskRunnerFactory {
} = this.taskRunnerContext!;

return {
async run() {
const { alertId, spaceId } = taskInstance.params;
const requestHeaders: Record<string, string> = {};
async getApiKeyForAlertPermissions(alertId: string, spaceId: string) {
const namespace = spaceIdToNamespace(spaceId);
// Only fetch encrypted attributes here, we'll create a saved objects client
// scoped with the API key to fetch the remaining data.
Expand All @@ -73,6 +72,12 @@ export class TaskRunnerFactory {
{ namespace }
);

return apiKey;
},

async getServicesWithSpaceLevelPermissions(spaceId: string, apiKey: string | null) {
const requestHeaders: Record<string, string> = {};

if (apiKey) {
requestHeaders.authorization = `ApiKey ${apiKey}`;
}
Expand All @@ -92,16 +97,16 @@ export class TaskRunnerFactory {
},
};

const services = getServices(fakeRequest);
// Ensure API key is still valid and user has access
const {
attributes: { params, actions, schedule, throttle, muteAll, mutedInstanceIds },
references,
} = await services.savedObjectsClient.get<RawAlert>('alert', alertId);

// Validate
const validatedAlertTypeParams = validateAlertTypeParams(alertType, params);
return getServices(fakeRequest);
},

getExecutionHandler(
alertId: string,
spaceId: string,
apiKey: string | null,
actions: RawAlert['actions'],
references: SavedObject['references']
) {
// Inject ids into actions
const actionsWithIds = actions.map(action => {
const actionReference = references.find(obj => obj.name === action.actionRef);
Expand All @@ -116,7 +121,7 @@ export class TaskRunnerFactory {
};
});

const executionHandler = createExecutionHandler({
return createExecutionHandler({
alertId,
logger,
executeAction,
Expand All @@ -125,49 +130,79 @@ export class TaskRunnerFactory {
spaceId,
alertType,
});
const alertInstances: Record<string, AlertInstance> = {};
const alertInstancesData = taskInstance.state.alertInstances || {};
for (const id of Object.keys(alertInstancesData)) {
alertInstances[id] = new AlertInstance(alertInstancesData[id]);
}
const alertInstanceFactory = createAlertInstanceFactory(alertInstances);
},

const alertTypeServices: AlertServices = {
...services,
alertInstanceFactory,
};
async executeAlertInstance(
alertInstanceId: string,
alertInstance: AlertInstance,
executionHandler: ReturnType<typeof createExecutionHandler>
) {
const { actionGroup, context, state } = alertInstance.getScheduledActionOptions()!;
alertInstance.updateLastScheduledActions(actionGroup);
alertInstance.unscheduleActions();
return executionHandler({ actionGroup, context, state, alertInstanceId });
},

async run() {
const {
params: { alertId, spaceId },
state: { alertInstances: alertRawInstances = {} },
} = taskInstance;
const apiKey = await this.getApiKeyForAlertPermissions(alertId, spaceId);
const services = await this.getServicesWithSpaceLevelPermissions(spaceId, apiKey);

// Ensure API key is still valid and user has access
const {
attributes: { params, actions, schedule, throttle, muteAll, mutedInstanceIds },
references,
} = await services.savedObjectsClient.get<RawAlert>('alert', alertId);

// Validate
const validatedAlertTypeParams = validateAlertTypeParams(alertType, params);

const alertInstances = mapValues(alertRawInstances, alert => new AlertInstance(alert));

const alertTypeState = await alertType.executor({
alertId,
services: alertTypeServices,
services: {
...services,
alertInstanceFactory: createAlertInstanceFactory(alertInstances),
},
params: validatedAlertTypeParams,
state: taskInstance.state.alertTypeState || {},
startedAt: taskInstance.startedAt!,
previousStartedAt: taskInstance.state.previousStartedAt,
});

await Promise.all(
Object.keys(alertInstances).map(alertInstanceId => {
const alertInstance = alertInstances[alertInstanceId];
if (alertInstance.hasScheduledActions()) {
if (
alertInstance.isThrottled(throttle) ||
muteAll ||
mutedInstanceIds.includes(alertInstanceId)
) {
return;
}
const { actionGroup, context, state } = alertInstance.getScheduledActionOptions()!;
alertInstance.updateLastScheduledActions(actionGroup);
alertInstance.unscheduleActions();
return executionHandler({ actionGroup, context, state, alertInstanceId });
} else {
// Cleanup alert instances that are no longer scheduling actions to avoid over populating the alertInstances object
delete alertInstances[alertInstanceId];
}
})
const instancesWithScheduledActions = pick<AlertInstances, AlertInstances>(
alertInstances,
alertInstance => alertInstance.hasScheduledActions()
);

if (!muteAll) {
const executionHandler = this.getExecutionHandler(
alertId,
spaceId,
apiKey,
actions,
references
);
const enabledAlertInstances = omit<AlertInstances, AlertInstances>(
instancesWithScheduledActions,
...mutedInstanceIds
);

await Promise.all(
Object.entries(enabledAlertInstances)
.filter(
([, alertInstance]: [string, AlertInstance]) => !alertInstance.isThrottled(throttle)
)
.map(([id, alertInstance]: [string, AlertInstance]) =>
this.executeAlertInstance(id, alertInstance, executionHandler)
)
);
}

const nextRunAt = getNextRunAt(
new Date(taskInstance.startedAt!),
// we do not currently have a good way of returning the type
Expand All @@ -179,7 +214,8 @@ export class TaskRunnerFactory {
return {
state: {
alertTypeState,
alertInstances,
// Cleanup alert instances that are no longer scheduling actions to avoid over populating the alertInstances object
alertInstances: instancesWithScheduledActions,
previousStartedAt: taskInstance.startedAt!,
},
runAt: nextRunAt,
Expand Down

0 comments on commit f2971df

Please sign in to comment.