diff --git a/x-pack/legacy/plugins/alerting/server/lib/alert_instance.test.ts b/x-pack/legacy/plugins/alerting/server/alert_instance/alert_instance.test.ts similarity index 100% rename from x-pack/legacy/plugins/alerting/server/lib/alert_instance.test.ts rename to x-pack/legacy/plugins/alerting/server/alert_instance/alert_instance.test.ts diff --git a/x-pack/legacy/plugins/alerting/server/lib/alert_instance.ts b/x-pack/legacy/plugins/alerting/server/alert_instance/alert_instance.ts similarity index 97% rename from x-pack/legacy/plugins/alerting/server/lib/alert_instance.ts rename to x-pack/legacy/plugins/alerting/server/alert_instance/alert_instance.ts index 1e2cc26f364ad2..a56e2077cdfd83 100644 --- a/x-pack/legacy/plugins/alerting/server/lib/alert_instance.ts +++ b/x-pack/legacy/plugins/alerting/server/alert_instance/alert_instance.ts @@ -5,7 +5,7 @@ */ import { State, Context } from '../types'; -import { parseDuration } from './parse_duration'; +import { parseDuration } from '../lib'; interface Meta { lastScheduledActions?: { diff --git a/x-pack/legacy/plugins/alerting/server/lib/create_alert_instance_factory.test.ts b/x-pack/legacy/plugins/alerting/server/alert_instance/create_alert_instance_factory.test.ts similarity index 100% rename from x-pack/legacy/plugins/alerting/server/lib/create_alert_instance_factory.test.ts rename to x-pack/legacy/plugins/alerting/server/alert_instance/create_alert_instance_factory.test.ts diff --git a/x-pack/legacy/plugins/alerting/server/lib/create_alert_instance_factory.ts b/x-pack/legacy/plugins/alerting/server/alert_instance/create_alert_instance_factory.ts similarity index 100% rename from x-pack/legacy/plugins/alerting/server/lib/create_alert_instance_factory.ts rename to x-pack/legacy/plugins/alerting/server/alert_instance/create_alert_instance_factory.ts diff --git a/x-pack/legacy/plugins/alerting/server/alert_instance/index.ts b/x-pack/legacy/plugins/alerting/server/alert_instance/index.ts new file mode 100644 index 00000000000000..40ee0874e805cd --- /dev/null +++ b/x-pack/legacy/plugins/alerting/server/alert_instance/index.ts @@ -0,0 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export { AlertInstance } from './alert_instance'; +export { createAlertInstanceFactory } from './create_alert_instance_factory'; diff --git a/x-pack/legacy/plugins/alerting/server/alert_type_registry.test.ts b/x-pack/legacy/plugins/alerting/server/alert_type_registry.test.ts index 57e1b965960e84..8e96ad8dae31cc 100644 --- a/x-pack/legacy/plugins/alerting/server/alert_type_registry.test.ts +++ b/x-pack/legacy/plugins/alerting/server/alert_type_registry.test.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { TaskRunnerFactory } from './lib'; +import { TaskRunnerFactory } from './task_runner'; import { AlertTypeRegistry } from './alert_type_registry'; import { taskManagerMock } from '../../task_manager/server/task_manager.mock'; diff --git a/x-pack/legacy/plugins/alerting/server/alert_type_registry.ts b/x-pack/legacy/plugins/alerting/server/alert_type_registry.ts index b7512864c2a981..2003e810a05b53 100644 --- a/x-pack/legacy/plugins/alerting/server/alert_type_registry.ts +++ b/x-pack/legacy/plugins/alerting/server/alert_type_registry.ts @@ -6,8 +6,8 @@ import Boom from 'boom'; import { i18n } from '@kbn/i18n'; -import { TaskRunnerFactory } from './lib'; -import { RunContext } from '../../task_manager/server'; +import { TaskRunnerFactory } from './task_runner'; +import { RunContext } from '../../task_manager'; import { TaskManagerSetupContract } from './shim'; import { AlertType } from './types'; diff --git a/x-pack/legacy/plugins/alerting/server/lib/alerts_client_factory.test.ts b/x-pack/legacy/plugins/alerting/server/alerts_client_factory.test.ts similarity index 81% rename from x-pack/legacy/plugins/alerting/server/lib/alerts_client_factory.test.ts rename to x-pack/legacy/plugins/alerting/server/alerts_client_factory.test.ts index 838c567fb28787..519001d07e089c 100644 --- a/x-pack/legacy/plugins/alerting/server/lib/alerts_client_factory.test.ts +++ b/x-pack/legacy/plugins/alerting/server/alerts_client_factory.test.ts @@ -6,13 +6,13 @@ import { Request } from 'hapi'; import { AlertsClientFactory, ConstructorOpts } from './alerts_client_factory'; -import { alertTypeRegistryMock } from '../alert_type_registry.mock'; -import { taskManagerMock } from '../../../task_manager/server/task_manager.mock'; -import { KibanaRequest } from '../../../../../../src/core/server'; -import { loggingServiceMock } from '../../../../../../src/core/server/mocks'; -import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks'; +import { alertTypeRegistryMock } from './alert_type_registry.mock'; +import { taskManagerMock } from '../../task_manager/server/task_manager.mock'; +import { KibanaRequest } from '../../../../../src/core/server'; +import { loggingServiceMock } from '../../../../../src/core/server/mocks'; +import { encryptedSavedObjectsMock } from '../../../../plugins/encrypted_saved_objects/server/mocks'; -jest.mock('../alerts_client'); +jest.mock('./alerts_client'); const savedObjectsClient = jest.fn(); const securityPluginSetup = { @@ -55,7 +55,7 @@ test('creates an alerts client with proper constructor arguments', async () => { const factory = new AlertsClientFactory(alertsClientFactoryParams); factory.create(KibanaRequest.from(fakeRequest), fakeRequest); - expect(jest.requireMock('../alerts_client').AlertsClient).toHaveBeenCalledWith({ + expect(jest.requireMock('./alerts_client').AlertsClient).toHaveBeenCalledWith({ savedObjectsClient, logger: alertsClientFactoryParams.logger, taskManager: alertsClientFactoryParams.taskManager, @@ -72,7 +72,7 @@ test('creates an alerts client with proper constructor arguments', async () => { test('getUserName() returns null when security is disabled', async () => { const factory = new AlertsClientFactory(alertsClientFactoryParams); factory.create(KibanaRequest.from(fakeRequest), fakeRequest); - const constructorCall = jest.requireMock('../alerts_client').AlertsClient.mock.calls[0][0]; + const constructorCall = jest.requireMock('./alerts_client').AlertsClient.mock.calls[0][0]; const userNameResult = await constructorCall.getUserName(); expect(userNameResult).toEqual(null); @@ -84,7 +84,7 @@ test('getUserName() returns a name when security is enabled', async () => { securityPluginSetup: securityPluginSetup as any, }); factory.create(KibanaRequest.from(fakeRequest), fakeRequest); - const constructorCall = jest.requireMock('../alerts_client').AlertsClient.mock.calls[0][0]; + const constructorCall = jest.requireMock('./alerts_client').AlertsClient.mock.calls[0][0]; securityPluginSetup.authc.getCurrentUser.mockResolvedValueOnce({ username: 'bob' }); const userNameResult = await constructorCall.getUserName(); @@ -94,7 +94,7 @@ test('getUserName() returns a name when security is enabled', async () => { test('createAPIKey() returns { apiKeysEnabled: false } when security is disabled', async () => { const factory = new AlertsClientFactory(alertsClientFactoryParams); factory.create(KibanaRequest.from(fakeRequest), fakeRequest); - const constructorCall = jest.requireMock('../alerts_client').AlertsClient.mock.calls[0][0]; + const constructorCall = jest.requireMock('./alerts_client').AlertsClient.mock.calls[0][0]; const createAPIKeyResult = await constructorCall.createAPIKey(); expect(createAPIKeyResult).toEqual({ apiKeysEnabled: false }); @@ -103,7 +103,7 @@ test('createAPIKey() returns { apiKeysEnabled: false } when security is disabled test('createAPIKey() returns { apiKeysEnabled: false } when security is enabled but ES security is disabled', async () => { const factory = new AlertsClientFactory(alertsClientFactoryParams); factory.create(KibanaRequest.from(fakeRequest), fakeRequest); - const constructorCall = jest.requireMock('../alerts_client').AlertsClient.mock.calls[0][0]; + const constructorCall = jest.requireMock('./alerts_client').AlertsClient.mock.calls[0][0]; securityPluginSetup.authc.createAPIKey.mockResolvedValueOnce(null); const createAPIKeyResult = await constructorCall.createAPIKey(); @@ -116,7 +116,7 @@ test('createAPIKey() returns an API key when security is enabled', async () => { securityPluginSetup: securityPluginSetup as any, }); factory.create(KibanaRequest.from(fakeRequest), fakeRequest); - const constructorCall = jest.requireMock('../alerts_client').AlertsClient.mock.calls[0][0]; + const constructorCall = jest.requireMock('./alerts_client').AlertsClient.mock.calls[0][0]; securityPluginSetup.authc.createAPIKey.mockResolvedValueOnce({ api_key: '123', id: 'abc' }); const createAPIKeyResult = await constructorCall.createAPIKey(); @@ -132,7 +132,7 @@ test('createAPIKey() throws when security plugin createAPIKey throws an error', securityPluginSetup: securityPluginSetup as any, }); factory.create(KibanaRequest.from(fakeRequest), fakeRequest); - const constructorCall = jest.requireMock('../alerts_client').AlertsClient.mock.calls[0][0]; + const constructorCall = jest.requireMock('./alerts_client').AlertsClient.mock.calls[0][0]; securityPluginSetup.authc.createAPIKey.mockRejectedValueOnce(new Error('TLS disabled')); await expect(constructorCall.createAPIKey()).rejects.toThrowErrorMatchingInlineSnapshot( diff --git a/x-pack/legacy/plugins/alerting/server/lib/alerts_client_factory.ts b/x-pack/legacy/plugins/alerting/server/alerts_client_factory.ts similarity index 92% rename from x-pack/legacy/plugins/alerting/server/lib/alerts_client_factory.ts rename to x-pack/legacy/plugins/alerting/server/alerts_client_factory.ts index 026d6c92b0d75d..94a396fbaa8063 100644 --- a/x-pack/legacy/plugins/alerting/server/lib/alerts_client_factory.ts +++ b/x-pack/legacy/plugins/alerting/server/alerts_client_factory.ts @@ -6,12 +6,12 @@ import Hapi from 'hapi'; import uuid from 'uuid'; -import { AlertsClient } from '../alerts_client'; -import { AlertTypeRegistry, SpaceIdToNamespaceFunction } from '../types'; -import { SecurityPluginStartContract, TaskManagerStartContract } from '../shim'; -import { KibanaRequest, Logger } from '../../../../../../src/core/server'; -import { InvalidateAPIKeyParams } from '../../../../../plugins/security/server'; -import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../../../plugins/encrypted_saved_objects/server'; +import { AlertsClient } from './alerts_client'; +import { AlertTypeRegistry, SpaceIdToNamespaceFunction } from './types'; +import { SecurityPluginStartContract, TaskManagerStartContract } from './shim'; +import { KibanaRequest, Logger } from '../../../../../src/core/server'; +import { InvalidateAPIKeyParams } from '../../../../plugins/security/server'; +import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../../plugins/encrypted_saved_objects/server'; export interface ConstructorOpts { logger: Logger; diff --git a/x-pack/legacy/plugins/alerting/server/lib/index.ts b/x-pack/legacy/plugins/alerting/server/lib/index.ts index ca4ddf9e11ad28..c41ea4a5998ff4 100644 --- a/x-pack/legacy/plugins/alerting/server/lib/index.ts +++ b/x-pack/legacy/plugins/alerting/server/lib/index.ts @@ -4,8 +4,6 @@ * you may not use this file except in compliance with the Elastic License. */ -export { AlertInstance } from './alert_instance'; -export { validateAlertTypeParams } from './validate_alert_type_params'; export { parseDuration, getDurationSchema } from './parse_duration'; -export { AlertsClientFactory } from './alerts_client_factory'; -export { TaskRunnerFactory } from './task_runner_factory'; +export { LicenseState } from './license_state'; +export { validateAlertTypeParams } from './validate_alert_type_params'; diff --git a/x-pack/legacy/plugins/alerting/server/lib/result_type.ts b/x-pack/legacy/plugins/alerting/server/lib/result_type.ts new file mode 100644 index 00000000000000..644ae51292249d --- /dev/null +++ b/x-pack/legacy/plugins/alerting/server/lib/result_type.ts @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export interface Ok { + tag: 'ok'; + value: T; +} + +export interface Err { + tag: 'err'; + error: E; +} +export type Result = Ok | Err; + +export function asOk(value: T): Ok { + return { + tag: 'ok', + value, + }; +} + +export function asErr(error: T): Err { + return { + tag: 'err', + error, + }; +} + +export function isOk(result: Result): result is Ok { + return result.tag === 'ok'; +} + +export function isErr(result: Result): result is Err { + return !isOk(result); +} + +export async function promiseResult(future: Promise): Promise> { + try { + return asOk(await future); + } catch (e) { + return asErr(e); + } +} + +export function map( + result: Result, + onOk: (value: T) => Resolution, + onErr: (error: E) => Resolution +): Resolution { + return isOk(result) ? onOk(result.value) : onErr(result.error); +} diff --git a/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.test.ts b/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.test.ts deleted file mode 100644 index fd13452e045353..00000000000000 --- a/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.test.ts +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import sinon from 'sinon'; -import { schema } from '@kbn/config-schema'; -import { AlertExecutorOptions } from '../types'; -import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server'; -import { TaskRunnerContext, TaskRunnerFactory } from './task_runner_factory'; -import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks'; -import { - savedObjectsClientMock, - loggingServiceMock, -} from '../../../../../../src/core/server/mocks'; - -const alertType = { - id: 'test', - name: 'My test alert', - actionGroups: ['default'], - executor: jest.fn(), -}; -let fakeTimer: sinon.SinonFakeTimers; -let taskRunnerFactory: TaskRunnerFactory; -let mockedTaskInstance: ConcreteTaskInstance; - -beforeAll(() => { - fakeTimer = sinon.useFakeTimers(); - mockedTaskInstance = { - id: '', - attempts: 0, - status: TaskStatus.Running, - version: '123', - runAt: new Date(), - scheduledAt: new Date(), - startedAt: new Date(), - retryAt: new Date(Date.now() + 5 * 60 * 1000), - state: { - startedAt: new Date(Date.now() - 5 * 60 * 1000), - }, - taskType: 'alerting:test', - params: { - alertId: '1', - }, - ownerId: null, - }; - taskRunnerFactory = new TaskRunnerFactory(); - taskRunnerFactory.initialize(taskRunnerFactoryInitializerParams); -}); - -afterAll(() => fakeTimer.restore()); - -const savedObjectsClient = savedObjectsClientMock.create(); -const encryptedSavedObjectsPlugin = encryptedSavedObjectsMock.createStart(); -const services = { - log: jest.fn(), - callCluster: jest.fn(), - savedObjectsClient, -}; - -const taskRunnerFactoryInitializerParams: jest.Mocked = { - getServices: jest.fn().mockReturnValue(services), - executeAction: jest.fn(), - encryptedSavedObjectsPlugin, - logger: loggingServiceMock.create().get(), - spaceIdToNamespace: jest.fn().mockReturnValue(undefined), - getBasePath: jest.fn().mockReturnValue(undefined), -}; - -const mockedAlertTypeSavedObject = { - id: '1', - type: 'alert', - attributes: { - enabled: true, - alertTypeId: '123', - schedule: { interval: '10s' }, - mutedInstanceIds: [], - params: { - bar: true, - }, - actions: [ - { - group: 'default', - actionRef: 'action_0', - params: { - foo: true, - }, - }, - ], - }, - references: [ - { - name: 'action_0', - type: 'action', - id: '1', - }, - ], -}; - -beforeEach(() => { - jest.resetAllMocks(); - taskRunnerFactoryInitializerParams.getServices.mockReturnValue(services); -}); - -test(`throws an error if factory isn't initialized`, () => { - const factory = new TaskRunnerFactory(); - expect(() => - factory.create(alertType, { taskInstance: mockedTaskInstance }) - ).toThrowErrorMatchingInlineSnapshot(`"TaskRunnerFactory not initialized"`); -}); - -test(`throws an error if factory is already initialized`, () => { - const factory = new TaskRunnerFactory(); - factory.initialize(taskRunnerFactoryInitializerParams); - expect(() => - factory.initialize(taskRunnerFactoryInitializerParams) - ).toThrowErrorMatchingInlineSnapshot(`"TaskRunnerFactory already initialized"`); -}); - -test('successfully executes the task', async () => { - const taskRunner = taskRunnerFactory.create(alertType, { - taskInstance: mockedTaskInstance, - }); - savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '1', - type: 'alert', - attributes: { - apiKey: Buffer.from('123:abc').toString('base64'), - }, - references: [], - }); - const runnerResult = await taskRunner.run(); - expect(runnerResult).toMatchInlineSnapshot(` - Object { - "runAt": 1970-01-01T00:00:10.000Z, - "state": Object { - "alertInstances": Object {}, - "alertTypeState": undefined, - "previousStartedAt": 1970-01-01T00:00:00.000Z, - }, - } - `); - expect(alertType.executor).toHaveBeenCalledTimes(1); - const call = alertType.executor.mock.calls[0][0]; - expect(call.params).toMatchInlineSnapshot(` - Object { - "bar": true, - } - `); - expect(call.startedAt).toMatchInlineSnapshot(`1970-01-01T00:00:00.000Z`); - expect(call.state).toMatchInlineSnapshot(`Object {}`); - expect(call.services.alertInstanceFactory).toBeTruthy(); - expect(call.services.callCluster).toBeTruthy(); - expect(call.services).toBeTruthy(); -}); - -test('executeAction is called per alert instance that is scheduled', async () => { - alertType.executor.mockImplementation(({ services: executorServices }: AlertExecutorOptions) => { - executorServices.alertInstanceFactory('1').scheduleActions('default'); - }); - const taskRunner = taskRunnerFactory.create(alertType, { - taskInstance: mockedTaskInstance, - }); - savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '1', - type: 'alert', - attributes: { - apiKey: Buffer.from('123:abc').toString('base64'), - }, - references: [], - }); - await taskRunner.run(); - expect(taskRunnerFactoryInitializerParams.executeAction).toHaveBeenCalledTimes(1); - expect(taskRunnerFactoryInitializerParams.executeAction.mock.calls[0]).toMatchInlineSnapshot(` - Array [ - Object { - "apiKey": "MTIzOmFiYw==", - "id": "1", - "params": Object { - "foo": true, - }, - "spaceId": undefined, - }, - ] - `); -}); - -test('persists alertInstances passed in from state, only if they are scheduled for execution', async () => { - alertType.executor.mockImplementation(({ services: executorServices }: AlertExecutorOptions) => { - executorServices.alertInstanceFactory('1').scheduleActions('default'); - }); - const taskRunner = taskRunnerFactory.create(alertType, { - taskInstance: { - ...mockedTaskInstance, - state: { - ...mockedTaskInstance.state, - alertInstances: { - '1': { meta: {}, state: { bar: false } }, - '2': { meta: {}, state: { bar: false } }, - }, - }, - }, - }); - savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '1', - type: 'alert', - attributes: { - apiKey: Buffer.from('123:abc').toString('base64'), - }, - references: [], - }); - const runnerResult = await taskRunner.run(); - expect(runnerResult.state.alertInstances).toMatchInlineSnapshot(` - Object { - "1": Object { - "meta": Object { - "lastScheduledActions": Object { - "date": 1970-01-01T00:00:00.000Z, - "group": "default", - }, - }, - "state": Object { - "bar": false, - }, - }, - } - `); -}); - -test('validates params before executing the alert type', async () => { - const taskRunner = taskRunnerFactory.create( - { - ...alertType, - validate: { - params: schema.object({ - param1: schema.string(), - }), - }, - }, - { taskInstance: mockedTaskInstance } - ); - savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '1', - type: 'alert', - attributes: { - apiKey: Buffer.from('123:abc').toString('base64'), - }, - references: [], - }); - await expect(taskRunner.run()).rejects.toThrowErrorMatchingInlineSnapshot( - `"params invalid: [param1]: expected value of type [string] but got [undefined]"` - ); -}); - -test('throws error if reference not found', async () => { - const taskRunner = taskRunnerFactory.create(alertType, { - taskInstance: mockedTaskInstance, - }); - savedObjectsClient.get.mockResolvedValueOnce({ - ...mockedAlertTypeSavedObject, - references: [], - }); - encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '1', - type: 'alert', - attributes: { - apiKey: Buffer.from('123:abc').toString('base64'), - }, - references: [], - }); - await expect(taskRunner.run()).rejects.toThrowErrorMatchingInlineSnapshot( - `"Action reference \\"action_0\\" not found in alert id: 1"` - ); -}); - -test('uses API key when provided', async () => { - const taskRunner = taskRunnerFactory.create(alertType, { - taskInstance: mockedTaskInstance, - }); - savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '1', - type: 'alert', - attributes: { - apiKey: Buffer.from('123:abc').toString('base64'), - }, - references: [], - }); - - await taskRunner.run(); - expect(taskRunnerFactoryInitializerParams.getServices).toHaveBeenCalledWith({ - getBasePath: expect.anything(), - headers: { - // base64 encoded "123:abc" - authorization: 'ApiKey MTIzOmFiYw==', - }, - path: '/', - route: { settings: {} }, - url: { - href: '/', - }, - raw: { - req: { - url: '/', - }, - }, - }); -}); - -test(`doesn't use API key when not provided`, async () => { - const factory = new TaskRunnerFactory(); - factory.initialize(taskRunnerFactoryInitializerParams); - const taskRunner = factory.create(alertType, { - taskInstance: mockedTaskInstance, - }); - savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '1', - type: 'alert', - attributes: {}, - references: [], - }); - - await taskRunner.run(); - - expect(taskRunnerFactoryInitializerParams.getServices).toHaveBeenCalledWith({ - getBasePath: expect.anything(), - headers: {}, - path: '/', - route: { settings: {} }, - url: { - href: '/', - }, - raw: { - req: { - url: '/', - }, - }, - }); -}); diff --git a/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.ts b/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.ts deleted file mode 100644 index 5614188795ded7..00000000000000 --- a/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.ts +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { Logger } from '../../../../../../src/core/server'; -import { RunContext } from '../../../task_manager/server'; -import { createExecutionHandler } from './create_execution_handler'; -import { createAlertInstanceFactory } from './create_alert_instance_factory'; -import { AlertInstance } from './alert_instance'; -import { getNextRunAt } from './get_next_run_at'; -import { validateAlertTypeParams } from './validate_alert_type_params'; -import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../../../plugins/encrypted_saved_objects/server'; -import { PluginStartContract as ActionsPluginStartContract } from '../../../actions'; -import { - AlertType, - AlertServices, - GetBasePathFunction, - GetServicesFunction, - RawAlert, - SpaceIdToNamespaceFunction, - IntervalSchedule, -} from '../types'; - -export interface TaskRunnerContext { - logger: Logger; - getServices: GetServicesFunction; - executeAction: ActionsPluginStartContract['execute']; - encryptedSavedObjectsPlugin: EncryptedSavedObjectsStartContract; - spaceIdToNamespace: SpaceIdToNamespaceFunction; - getBasePath: GetBasePathFunction; -} - -export class TaskRunnerFactory { - private isInitialized = false; - private taskRunnerContext?: TaskRunnerContext; - - public initialize(taskRunnerContext: TaskRunnerContext) { - if (this.isInitialized) { - throw new Error('TaskRunnerFactory already initialized'); - } - this.isInitialized = true; - this.taskRunnerContext = taskRunnerContext; - } - - public create(alertType: AlertType, { taskInstance }: RunContext) { - if (!this.isInitialized) { - throw new Error('TaskRunnerFactory not initialized'); - } - - const { - logger, - getServices, - executeAction, - encryptedSavedObjectsPlugin, - spaceIdToNamespace, - getBasePath, - } = this.taskRunnerContext!; - - return { - async run() { - const { alertId, spaceId } = taskInstance.params; - const requestHeaders: Record = {}; - const namespace = spaceIdToNamespace(spaceId); - // Only fetch encrypted attributes here, we'll create a saved objects client - // scoped with the API key to fetch the remaining data. - const { - attributes: { apiKey }, - } = await encryptedSavedObjectsPlugin.getDecryptedAsInternalUser( - 'alert', - alertId, - { namespace } - ); - - if (apiKey) { - requestHeaders.authorization = `ApiKey ${apiKey}`; - } - - const fakeRequest = { - headers: requestHeaders, - getBasePath: () => getBasePath(spaceId), - path: '/', - route: { settings: {} }, - url: { - href: '/', - }, - raw: { - req: { - url: '/', - }, - }, - }; - - const services = getServices(fakeRequest); - // Ensure API key is still valid and user has access - const { - attributes: { params, actions, schedule, throttle, muteAll, mutedInstanceIds }, - references, - } = await services.savedObjectsClient.get('alert', alertId); - - // Validate - const validatedAlertTypeParams = validateAlertTypeParams(alertType, params); - - // Inject ids into actions - const actionsWithIds = actions.map(action => { - const actionReference = references.find(obj => obj.name === action.actionRef); - if (!actionReference) { - throw new Error( - `Action reference "${action.actionRef}" not found in alert id: ${alertId}` - ); - } - return { - ...action, - id: actionReference.id, - }; - }); - - const executionHandler = createExecutionHandler({ - alertId, - logger, - executeAction, - apiKey, - actions: actionsWithIds, - spaceId, - alertType, - }); - const alertInstances: Record = {}; - const alertInstancesData = taskInstance.state.alertInstances || {}; - for (const id of Object.keys(alertInstancesData)) { - alertInstances[id] = new AlertInstance(alertInstancesData[id]); - } - const alertInstanceFactory = createAlertInstanceFactory(alertInstances); - - const alertTypeServices: AlertServices = { - ...services, - alertInstanceFactory, - }; - - const alertTypeState = await alertType.executor({ - alertId, - services: alertTypeServices, - params: validatedAlertTypeParams, - state: taskInstance.state.alertTypeState || {}, - startedAt: taskInstance.startedAt!, - previousStartedAt: taskInstance.state.previousStartedAt, - }); - - await Promise.all( - Object.keys(alertInstances).map(alertInstanceId => { - const alertInstance = alertInstances[alertInstanceId]; - if (alertInstance.hasScheduledActions()) { - if ( - alertInstance.isThrottled(throttle) || - muteAll || - mutedInstanceIds.includes(alertInstanceId) - ) { - return; - } - const { actionGroup, context, state } = alertInstance.getScheduledActionOptions()!; - alertInstance.updateLastScheduledActions(actionGroup); - alertInstance.unscheduleActions(); - return executionHandler({ actionGroup, context, state, alertInstanceId }); - } else { - // Cleanup alert instances that are no longer scheduling actions to avoid over populating the alertInstances object - delete alertInstances[alertInstanceId]; - } - }) - ); - - const nextRunAt = getNextRunAt( - new Date(taskInstance.startedAt!), - // we do not currently have a good way of returning the type - // from SavedObjectsClient, and as we currenrtly require a schedule - // and we only support `interval`, we can cast this safely - schedule as IntervalSchedule - ); - - return { - state: { - alertTypeState, - alertInstances, - previousStartedAt: taskInstance.startedAt!, - }, - runAt: nextRunAt, - }; - }, - }; - } -} diff --git a/x-pack/legacy/plugins/alerting/server/plugin.ts b/x-pack/legacy/plugins/alerting/server/plugin.ts index 24d4467dbd8073..ede95f76bf8113 100644 --- a/x-pack/legacy/plugins/alerting/server/plugin.ts +++ b/x-pack/legacy/plugins/alerting/server/plugin.ts @@ -9,7 +9,8 @@ import { first } from 'rxjs/operators'; import { Services } from './types'; import { AlertsClient } from './alerts_client'; import { AlertTypeRegistry } from './alert_type_registry'; -import { AlertsClientFactory, TaskRunnerFactory } from './lib'; +import { TaskRunnerFactory } from './task_runner'; +import { AlertsClientFactory } from './alerts_client_factory'; import { LicenseState } from './lib/license_state'; import { IClusterClient, KibanaRequest, Logger } from '../../../../../src/core/server'; import { diff --git a/x-pack/legacy/plugins/alerting/server/lib/create_execution_handler.test.ts b/x-pack/legacy/plugins/alerting/server/task_runner/create_execution_handler.test.ts similarity index 100% rename from x-pack/legacy/plugins/alerting/server/lib/create_execution_handler.test.ts rename to x-pack/legacy/plugins/alerting/server/task_runner/create_execution_handler.test.ts diff --git a/x-pack/legacy/plugins/alerting/server/lib/create_execution_handler.ts b/x-pack/legacy/plugins/alerting/server/task_runner/create_execution_handler.ts similarity index 100% rename from x-pack/legacy/plugins/alerting/server/lib/create_execution_handler.ts rename to x-pack/legacy/plugins/alerting/server/task_runner/create_execution_handler.ts diff --git a/x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.test.ts b/x-pack/legacy/plugins/alerting/server/task_runner/get_next_run_at.test.ts similarity index 100% rename from x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.test.ts rename to x-pack/legacy/plugins/alerting/server/task_runner/get_next_run_at.test.ts diff --git a/x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.ts b/x-pack/legacy/plugins/alerting/server/task_runner/get_next_run_at.ts similarity index 92% rename from x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.ts rename to x-pack/legacy/plugins/alerting/server/task_runner/get_next_run_at.ts index f9867b53729080..cea4584e1f713b 100644 --- a/x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.ts +++ b/x-pack/legacy/plugins/alerting/server/task_runner/get_next_run_at.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { parseDuration } from './parse_duration'; +import { parseDuration } from '../lib'; import { IntervalSchedule } from '../types'; export function getNextRunAt(currentRunAt: Date, schedule: IntervalSchedule) { diff --git a/x-pack/legacy/plugins/alerting/server/task_runner/index.ts b/x-pack/legacy/plugins/alerting/server/task_runner/index.ts new file mode 100644 index 00000000000000..f5401fbd9cd746 --- /dev/null +++ b/x-pack/legacy/plugins/alerting/server/task_runner/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export { TaskRunnerFactory } from './task_runner_factory'; diff --git a/x-pack/legacy/plugins/alerting/server/task_runner/task_runner.test.ts b/x-pack/legacy/plugins/alerting/server/task_runner/task_runner.test.ts new file mode 100644 index 00000000000000..10627c655eca86 --- /dev/null +++ b/x-pack/legacy/plugins/alerting/server/task_runner/task_runner.test.ts @@ -0,0 +1,400 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import sinon from 'sinon'; +import { schema } from '@kbn/config-schema'; +import { AlertExecutorOptions } from '../types'; +import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager'; +import { TaskRunnerContext } from './task_runner_factory'; +import { TaskRunner } from './task_runner'; +import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks'; +import { + savedObjectsClientMock, + loggingServiceMock, +} from '../../../../../../src/core/server/mocks'; + +const alertType = { + id: 'test', + name: 'My test alert', + actionGroups: ['default'], + executor: jest.fn(), +}; +let fakeTimer: sinon.SinonFakeTimers; + +describe('Task Runner', () => { + let mockedTaskInstance: ConcreteTaskInstance; + + beforeAll(() => { + fakeTimer = sinon.useFakeTimers(); + mockedTaskInstance = { + id: '', + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: { + startedAt: new Date(Date.now() - 5 * 60 * 1000), + }, + taskType: 'alerting:test', + params: { + alertId: '1', + }, + ownerId: null, + }; + }); + + afterAll(() => fakeTimer.restore()); + + const savedObjectsClient = savedObjectsClientMock.create(); + const encryptedSavedObjectsPlugin = encryptedSavedObjectsMock.createStart(); + const services = { + log: jest.fn(), + callCluster: jest.fn(), + savedObjectsClient, + }; + + const taskRunnerFactoryInitializerParams: jest.Mocked = { + getServices: jest.fn().mockReturnValue(services), + executeAction: jest.fn(), + encryptedSavedObjectsPlugin, + logger: loggingServiceMock.create().get(), + spaceIdToNamespace: jest.fn().mockReturnValue(undefined), + getBasePath: jest.fn().mockReturnValue(undefined), + }; + + const mockedAlertTypeSavedObject = { + id: '1', + type: 'alert', + attributes: { + enabled: true, + alertTypeId: '123', + schedule: { interval: '10s' }, + mutedInstanceIds: [], + params: { + bar: true, + }, + actions: [ + { + group: 'default', + actionRef: 'action_0', + params: { + foo: true, + }, + }, + ], + }, + references: [ + { + name: 'action_0', + type: 'action', + id: '1', + }, + ], + }; + + beforeEach(() => { + jest.resetAllMocks(); + taskRunnerFactoryInitializerParams.getServices.mockReturnValue(services); + }); + + test('successfully executes the task', async () => { + const taskRunner = new TaskRunner( + alertType, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + const runnerResult = await taskRunner.run(); + expect(runnerResult).toMatchInlineSnapshot(` + Object { + "runAt": 1970-01-01T00:00:10.000Z, + "state": Object { + "alertInstances": Object {}, + "alertTypeState": undefined, + "previousStartedAt": 1970-01-01T00:00:00.000Z, + }, + } + `); + expect(alertType.executor).toHaveBeenCalledTimes(1); + const call = alertType.executor.mock.calls[0][0]; + expect(call.params).toMatchInlineSnapshot(` + Object { + "bar": true, + } + `); + expect(call.startedAt).toMatchInlineSnapshot(`1970-01-01T00:00:00.000Z`); + expect(call.state).toMatchInlineSnapshot(`Object {}`); + expect(call.services.alertInstanceFactory).toBeTruthy(); + expect(call.services.callCluster).toBeTruthy(); + expect(call.services).toBeTruthy(); + }); + + test('executeAction is called per alert instance that is scheduled', async () => { + alertType.executor.mockImplementation( + ({ services: executorServices }: AlertExecutorOptions) => { + executorServices.alertInstanceFactory('1').scheduleActions('default'); + } + ); + const taskRunner = new TaskRunner( + alertType, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + await taskRunner.run(); + expect(taskRunnerFactoryInitializerParams.executeAction).toHaveBeenCalledTimes(1); + expect(taskRunnerFactoryInitializerParams.executeAction.mock.calls[0]).toMatchInlineSnapshot(` + Array [ + Object { + "apiKey": "MTIzOmFiYw==", + "id": "1", + "params": Object { + "foo": true, + }, + "spaceId": undefined, + }, + ] + `); + }); + + test('persists alertInstances passed in from state, only if they are scheduled for execution', async () => { + alertType.executor.mockImplementation( + ({ services: executorServices }: AlertExecutorOptions) => { + executorServices.alertInstanceFactory('1').scheduleActions('default'); + } + ); + const taskRunner = new TaskRunner( + alertType, + { + ...mockedTaskInstance, + state: { + ...mockedTaskInstance.state, + alertInstances: { + '1': { meta: {}, state: { bar: false } }, + '2': { meta: {}, state: { bar: false } }, + }, + }, + }, + taskRunnerFactoryInitializerParams + ); + savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + const runnerResult = await taskRunner.run(); + expect(runnerResult.state.alertInstances).toMatchInlineSnapshot(` + Object { + "1": Object { + "meta": Object { + "lastScheduledActions": Object { + "date": 1970-01-01T00:00:00.000Z, + "group": "default", + }, + }, + "state": Object { + "bar": false, + }, + }, + } + `); + }); + + test('validates params before executing the alert type', async () => { + const taskRunner = new TaskRunner( + { + ...alertType, + validate: { + params: schema.object({ + param1: schema.string(), + }), + }, + }, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + expect(await taskRunner.run()).toMatchInlineSnapshot(` + Object { + "runAt": 1970-01-01T00:00:10.000Z, + "state": Object { + "previousStartedAt": 1970-01-01T00:00:00.000Z, + "startedAt": 1969-12-31T23:55:00.000Z, + }, + } + `); + expect(taskRunnerFactoryInitializerParams.logger.error).toHaveBeenCalledWith( + `Executing Alert \"1\" has resulted in Error: params invalid: [param1]: expected value of type [string] but got [undefined]` + ); + }); + + test('throws error if reference not found', async () => { + const taskRunner = new TaskRunner( + alertType, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + savedObjectsClient.get.mockResolvedValueOnce({ + ...mockedAlertTypeSavedObject, + references: [], + }); + encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + expect(await taskRunner.run()).toMatchInlineSnapshot(` + Object { + "runAt": 1970-01-01T00:00:10.000Z, + "state": Object { + "previousStartedAt": 1970-01-01T00:00:00.000Z, + "startedAt": 1969-12-31T23:55:00.000Z, + }, + } + `); + expect(taskRunnerFactoryInitializerParams.logger.error).toHaveBeenCalledWith( + `Executing Alert \"1\" has resulted in Error: Action reference \"action_0\" not found in alert id: 1` + ); + }); + + test('uses API key when provided', async () => { + const taskRunner = new TaskRunner( + alertType, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + + await taskRunner.run(); + expect(taskRunnerFactoryInitializerParams.getServices).toHaveBeenCalledWith({ + getBasePath: expect.anything(), + headers: { + // base64 encoded "123:abc" + authorization: 'ApiKey MTIzOmFiYw==', + }, + path: '/', + route: { settings: {} }, + url: { + href: '/', + }, + raw: { + req: { + url: '/', + }, + }, + }); + }); + + test(`doesn't use API key when not provided`, async () => { + const taskRunner = new TaskRunner( + alertType, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: {}, + references: [], + }); + + await taskRunner.run(); + + expect(taskRunnerFactoryInitializerParams.getServices).toHaveBeenCalledWith({ + getBasePath: expect.anything(), + headers: {}, + path: '/', + route: { settings: {} }, + url: { + href: '/', + }, + raw: { + req: { + url: '/', + }, + }, + }); + }); + + test('recovers gracefully when the AlertType executor throws an exception', async () => { + alertType.executor.mockImplementation( + ({ services: executorServices }: AlertExecutorOptions) => { + throw new Error('OMG'); + } + ); + + const taskRunner = new TaskRunner( + alertType, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + + savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + + const runnerResult = await taskRunner.run(); + + expect(runnerResult).toMatchInlineSnapshot(` + Object { + "runAt": 1970-01-01T00:00:10.000Z, + "state": Object { + "previousStartedAt": 1970-01-01T00:00:00.000Z, + "startedAt": 1969-12-31T23:55:00.000Z, + }, + } + `); + }); +}); diff --git a/x-pack/legacy/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/legacy/plugins/alerting/server/task_runner/task_runner.ts new file mode 100644 index 00000000000000..2347e9e608ed9f --- /dev/null +++ b/x-pack/legacy/plugins/alerting/server/task_runner/task_runner.ts @@ -0,0 +1,241 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { pick, mapValues, omit } from 'lodash'; +import { Logger } from '../../../../../../src/core/server'; +import { SavedObject } from '../../../../../../src/core/server'; +import { TaskRunnerContext } from './task_runner_factory'; +import { ConcreteTaskInstance } from '../../../task_manager'; +import { createExecutionHandler } from './create_execution_handler'; +import { AlertInstance, createAlertInstanceFactory } from '../alert_instance'; +import { getNextRunAt } from './get_next_run_at'; +import { validateAlertTypeParams } from '../lib'; +import { AlertType, RawAlert, IntervalSchedule, Services, State } from '../types'; +import { promiseResult, map } from '../lib/result_type'; + +type AlertInstances = Record; + +export class TaskRunner { + private context: TaskRunnerContext; + private logger: Logger; + private taskInstance: ConcreteTaskInstance; + private alertType: AlertType; + + constructor( + alertType: AlertType, + taskInstance: ConcreteTaskInstance, + context: TaskRunnerContext + ) { + this.context = context; + this.logger = context.logger; + this.alertType = alertType; + this.taskInstance = taskInstance; + } + + async getApiKeyForAlertPermissions(alertId: string, spaceId: string) { + const namespace = this.context.spaceIdToNamespace(spaceId); + // Only fetch encrypted attributes here, we'll create a saved objects client + // scoped with the API key to fetch the remaining data. + const { + attributes: { apiKey }, + } = await this.context.encryptedSavedObjectsPlugin.getDecryptedAsInternalUser( + 'alert', + alertId, + { namespace } + ); + + return apiKey; + } + + async getServicesWithSpaceLevelPermissions(spaceId: string, apiKey: string | null) { + const requestHeaders: Record = {}; + + if (apiKey) { + requestHeaders.authorization = `ApiKey ${apiKey}`; + } + + const fakeRequest = { + headers: requestHeaders, + getBasePath: () => this.context.getBasePath(spaceId), + path: '/', + route: { settings: {} }, + url: { + href: '/', + }, + raw: { + req: { + url: '/', + }, + }, + }; + + return this.context.getServices(fakeRequest); + } + + getExecutionHandler( + alertId: string, + spaceId: string, + apiKey: string | null, + actions: RawAlert['actions'], + references: SavedObject['references'] + ) { + // Inject ids into actions + const actionsWithIds = actions.map(action => { + const actionReference = references.find(obj => obj.name === action.actionRef); + if (!actionReference) { + throw new Error(`Action reference "${action.actionRef}" not found in alert id: ${alertId}`); + } + return { + ...action, + id: actionReference.id, + }; + }); + + return createExecutionHandler({ + alertId, + logger: this.logger, + executeAction: this.context.executeAction, + apiKey, + actions: actionsWithIds, + spaceId, + alertType: this.alertType, + }); + } + + async executeAlertInstance( + alertInstanceId: string, + alertInstance: AlertInstance, + executionHandler: ReturnType + ) { + const { actionGroup, context, state } = alertInstance.getScheduledActionOptions()!; + alertInstance.updateLastScheduledActions(actionGroup); + alertInstance.unscheduleActions(); + return executionHandler({ actionGroup, context, state, alertInstanceId }); + } + + async executeAlertInstances( + services: Services, + { params, throttle, muteAll, mutedInstanceIds }: SavedObject['attributes'], + executionHandler: ReturnType + ): Promise { + const { + params: { alertId }, + state: { alertInstances: alertRawInstances = {}, alertTypeState = {}, previousStartedAt }, + } = this.taskInstance; + + const alertInstances = mapValues( + alertRawInstances, + alert => new AlertInstance(alert) + ); + + const updatedAlertTypeState = await this.alertType.executor({ + alertId, + services: { + ...services, + alertInstanceFactory: createAlertInstanceFactory(alertInstances), + }, + params, + state: alertTypeState, + startedAt: this.taskInstance.startedAt!, + previousStartedAt, + }); + + // Cleanup alert instances that are no longer scheduling actions to avoid over populating the alertInstances object + const instancesWithScheduledActions = pick( + alertInstances, + alertInstance => alertInstance.hasScheduledActions() + ); + + if (!muteAll) { + const enabledAlertInstances = omit( + instancesWithScheduledActions, + ...mutedInstanceIds + ); + + await Promise.all( + Object.entries(enabledAlertInstances) + .filter( + ([, alertInstance]: [string, AlertInstance]) => !alertInstance.isThrottled(throttle) + ) + .map(([id, alertInstance]: [string, AlertInstance]) => + this.executeAlertInstance(id, alertInstance, executionHandler) + ) + ); + } + + return { + alertTypeState: updatedAlertTypeState, + alertInstances: instancesWithScheduledActions, + }; + } + + async validateAndRunAlert( + services: Services, + apiKey: string | null, + attributes: SavedObject['attributes'], + references: SavedObject['references'] + ) { + const { + params: { alertId, spaceId }, + } = this.taskInstance; + + // Validate + const params = validateAlertTypeParams(this.alertType, attributes.params); + const executionHandler = this.getExecutionHandler( + alertId, + spaceId, + apiKey, + attributes.actions, + references + ); + return this.executeAlertInstances(services, { ...attributes, params }, executionHandler); + } + + async run() { + const { + params: { alertId, spaceId }, + startedAt: previousStartedAt, + state: originalState, + } = this.taskInstance; + + const apiKey = await this.getApiKeyForAlertPermissions(alertId, spaceId); + const services = await this.getServicesWithSpaceLevelPermissions(spaceId, apiKey); + + // Ensure API key is still valid and user has access + const { attributes, references } = await services.savedObjectsClient.get( + 'alert', + alertId + ); + + return { + state: map( + await promiseResult( + this.validateAndRunAlert(services, apiKey, attributes, references) + ), + (stateUpdates: State) => { + return { + ...stateUpdates, + previousStartedAt, + }; + }, + (err: Error) => { + this.logger.error(`Executing Alert "${alertId}" has resulted in Error: ${err.message}`); + return { + ...originalState, + previousStartedAt, + }; + } + ), + runAt: getNextRunAt( + new Date(this.taskInstance.startedAt!), + // we do not currently have a good way of returning the type + // from SavedObjectsClient, and as we currenrtly require a schedule + // and we only support `interval`, we can cast this safely + attributes.schedule as IntervalSchedule + ), + }; + } +} diff --git a/x-pack/legacy/plugins/alerting/server/task_runner/task_runner_factory.test.ts b/x-pack/legacy/plugins/alerting/server/task_runner/task_runner_factory.test.ts new file mode 100644 index 00000000000000..2ea1256352bec9 --- /dev/null +++ b/x-pack/legacy/plugins/alerting/server/task_runner/task_runner_factory.test.ts @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import sinon from 'sinon'; +import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager'; +import { TaskRunnerContext, TaskRunnerFactory } from './task_runner_factory'; +import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks'; +import { + savedObjectsClientMock, + loggingServiceMock, +} from '../../../../../../src/core/server/mocks'; + +const alertType = { + id: 'test', + name: 'My test alert', + actionGroups: ['default'], + executor: jest.fn(), +}; +let fakeTimer: sinon.SinonFakeTimers; + +describe('Task Runner Factory', () => { + let mockedTaskInstance: ConcreteTaskInstance; + + beforeAll(() => { + fakeTimer = sinon.useFakeTimers(); + mockedTaskInstance = { + id: '', + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: { + startedAt: new Date(Date.now() - 5 * 60 * 1000), + }, + taskType: 'alerting:test', + params: { + alertId: '1', + }, + ownerId: null, + }; + }); + + afterAll(() => fakeTimer.restore()); + + const savedObjectsClient = savedObjectsClientMock.create(); + const encryptedSavedObjectsPlugin = encryptedSavedObjectsMock.createStart(); + const services = { + log: jest.fn(), + callCluster: jest.fn(), + savedObjectsClient, + }; + + const taskRunnerFactoryInitializerParams: jest.Mocked = { + getServices: jest.fn().mockReturnValue(services), + executeAction: jest.fn(), + encryptedSavedObjectsPlugin, + logger: loggingServiceMock.create().get(), + spaceIdToNamespace: jest.fn().mockReturnValue(undefined), + getBasePath: jest.fn().mockReturnValue(undefined), + }; + + beforeEach(() => { + jest.resetAllMocks(); + taskRunnerFactoryInitializerParams.getServices.mockReturnValue(services); + }); + + test(`throws an error if factory isn't initialized`, () => { + const factory = new TaskRunnerFactory(); + expect(() => + factory.create(alertType, { taskInstance: mockedTaskInstance }) + ).toThrowErrorMatchingInlineSnapshot(`"TaskRunnerFactory not initialized"`); + }); + + test(`throws an error if factory is already initialized`, () => { + const factory = new TaskRunnerFactory(); + factory.initialize(taskRunnerFactoryInitializerParams); + expect(() => + factory.initialize(taskRunnerFactoryInitializerParams) + ).toThrowErrorMatchingInlineSnapshot(`"TaskRunnerFactory already initialized"`); + }); +}); diff --git a/x-pack/legacy/plugins/alerting/server/task_runner/task_runner_factory.ts b/x-pack/legacy/plugins/alerting/server/task_runner/task_runner_factory.ts new file mode 100644 index 00000000000000..7186e1e729bda4 --- /dev/null +++ b/x-pack/legacy/plugins/alerting/server/task_runner/task_runner_factory.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { Logger } from '../../../../../../src/core/server'; +import { RunContext } from '../../../task_manager'; +import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../../../plugins/encrypted_saved_objects/server'; +import { PluginStartContract as ActionsPluginStartContract } from '../../../actions'; +import { + AlertType, + GetBasePathFunction, + GetServicesFunction, + SpaceIdToNamespaceFunction, +} from '../types'; +import { TaskRunner } from './task_runner'; + +export interface TaskRunnerContext { + logger: Logger; + getServices: GetServicesFunction; + executeAction: ActionsPluginStartContract['execute']; + encryptedSavedObjectsPlugin: EncryptedSavedObjectsStartContract; + spaceIdToNamespace: SpaceIdToNamespaceFunction; + getBasePath: GetBasePathFunction; +} + +export class TaskRunnerFactory { + private isInitialized = false; + private taskRunnerContext?: TaskRunnerContext; + + public initialize(taskRunnerContext: TaskRunnerContext) { + if (this.isInitialized) { + throw new Error('TaskRunnerFactory already initialized'); + } + this.isInitialized = true; + this.taskRunnerContext = taskRunnerContext; + } + + public create(alertType: AlertType, { taskInstance }: RunContext) { + if (!this.isInitialized) { + throw new Error('TaskRunnerFactory not initialized'); + } + + return new TaskRunner(alertType, taskInstance, this.taskRunnerContext!); + } +} diff --git a/x-pack/legacy/plugins/alerting/server/lib/transform_action_params.test.ts b/x-pack/legacy/plugins/alerting/server/task_runner/transform_action_params.test.ts similarity index 100% rename from x-pack/legacy/plugins/alerting/server/lib/transform_action_params.test.ts rename to x-pack/legacy/plugins/alerting/server/task_runner/transform_action_params.test.ts diff --git a/x-pack/legacy/plugins/alerting/server/lib/transform_action_params.ts b/x-pack/legacy/plugins/alerting/server/task_runner/transform_action_params.ts similarity index 100% rename from x-pack/legacy/plugins/alerting/server/lib/transform_action_params.ts rename to x-pack/legacy/plugins/alerting/server/task_runner/transform_action_params.ts diff --git a/x-pack/legacy/plugins/alerting/server/types.ts b/x-pack/legacy/plugins/alerting/server/types.ts index 62dcf07abb7bdd..9b03f9b02aa0af 100644 --- a/x-pack/legacy/plugins/alerting/server/types.ts +++ b/x-pack/legacy/plugins/alerting/server/types.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { AlertInstance } from './lib'; +import { AlertInstance } from './alert_instance'; import { AlertTypeRegistry as OrigAlertTypeRegistry } from './alert_type_registry'; import { PluginSetupContract, PluginStartContract } from './plugin'; import { SavedObjectAttributes, SavedObjectsClientContract } from '../../../../../src/core/server'; diff --git a/x-pack/test/alerting_api_integration/common/lib/alert_utils.ts b/x-pack/test/alerting_api_integration/common/lib/alert_utils.ts index 487f396d7a3dca..c47649544f9a76 100644 --- a/x-pack/test/alerting_api_integration/common/lib/alert_utils.ts +++ b/x-pack/test/alerting_api_integration/common/lib/alert_utils.ts @@ -17,7 +17,7 @@ export interface AlertUtilsOpts { objectRemover?: ObjectRemover; } -export interface CreateAlwaysFiringActionOpts { +export interface CreateAlertWithActionOpts { indexRecordActionId?: string; objectRemover?: ObjectRemover; overwrites?: Record; @@ -159,7 +159,7 @@ export class AlertUtils { overwrites = {}, indexRecordActionId, reference, - }: CreateAlwaysFiringActionOpts) { + }: CreateAlertWithActionOpts) { const objRemover = objectRemover || this.objectRemover; const actionId = indexRecordActionId || this.indexRecordActionId; @@ -207,4 +207,47 @@ export class AlertUtils { } return response; } + + public async createAlwaysFailingAction({ + objectRemover, + overwrites = {}, + indexRecordActionId, + reference, + }: CreateAlertWithActionOpts) { + const objRemover = objectRemover || this.objectRemover; + const actionId = indexRecordActionId || this.indexRecordActionId; + + if (!objRemover) { + throw new Error('objectRemover is required'); + } + if (!actionId) { + throw new Error('indexRecordActionId is required '); + } + + let request = this.supertestWithoutAuth + .post(`${getUrlPrefix(this.space.id)}/api/alert`) + .set('kbn-xsrf', 'foo'); + if (this.user) { + request = request.auth(this.user.username, this.user.password); + } + const response = await request.send({ + enabled: true, + name: 'fail', + schedule: { interval: '30s' }, + throttle: '30s', + tags: [], + alertTypeId: 'test.failing', + consumer: 'bar', + params: { + index: ES_TEST_INDEX_NAME, + reference, + }, + actions: [], + ...overwrites, + }); + if (response.statusCode === 200) { + objRemover.add(this.space.id, response.body.id, 'alert'); + } + return response; + } } diff --git a/x-pack/test/alerting_api_integration/common/lib/index.ts b/x-pack/test/alerting_api_integration/common/lib/index.ts index a2f21264634f83..c1e59664f9ce2e 100644 --- a/x-pack/test/alerting_api_integration/common/lib/index.ts +++ b/x-pack/test/alerting_api_integration/common/lib/index.ts @@ -10,4 +10,5 @@ export { ES_TEST_INDEX_NAME, ESTestIndexTool } from './es_test_index_tool'; export { getTestAlertData } from './get_test_alert_data'; export { AlertUtils } from './alert_utils'; export { TaskManagerUtils } from './task_manager_utils'; +export * from './test_assertions'; export { checkAAD } from './check_aad'; diff --git a/x-pack/test/alerting_api_integration/common/lib/test_assertions.ts b/x-pack/test/alerting_api_integration/common/lib/test_assertions.ts new file mode 100644 index 00000000000000..9495dd4cfae82a --- /dev/null +++ b/x-pack/test/alerting_api_integration/common/lib/test_assertions.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import expect from '@kbn/expect'; + +export function ensureDatetimeIsWithinRange( + date: number, + expectedDiff: number, + buffer: number = 10000 +) { + const diff = date - Date.now(); + expect(diff).to.be.greaterThan(expectedDiff - buffer); + expect(diff).to.be.lessThan(expectedDiff + buffer); +} diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts index e89b54b1caa554..2a7e0b22038248 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts @@ -7,7 +7,13 @@ import expect from '@kbn/expect'; import { Response as SupertestResponse } from 'supertest'; import { UserAtSpaceScenarios } from '../../scenarios'; -import { checkAAD, getUrlPrefix, getTestAlertData, ObjectRemover } from '../../../common/lib'; +import { + checkAAD, + getUrlPrefix, + getTestAlertData, + ObjectRemover, + ensureDatetimeIsWithinRange, +} from '../../../common/lib'; import { FtrProviderContext } from '../../../common/ftr_provider_context'; // eslint-disable-next-line import/no-default-export @@ -406,10 +412,3 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { } }); } - -function ensureDatetimeIsWithinRange(scheduledRunTime: number, expectedDiff: number) { - const buffer = 10000; - const diff = scheduledRunTime - Date.now(); - expect(diff).to.be.greaterThan(expectedDiff - buffer); - expect(diff).to.be.lessThan(expectedDiff + buffer); -} diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts.ts index 03e973194b4e2c..032fee15882cf8 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts.ts @@ -5,6 +5,7 @@ */ import expect from '@kbn/expect'; +import { Response as SupertestResponse } from 'supertest'; import { Spaces } from '../../scenarios'; import { FtrProviderContext } from '../../../common/ftr_provider_context'; import { @@ -14,6 +15,7 @@ import { getTestAlertData, ObjectRemover, AlertUtils, + ensureDatetimeIsWithinRange, } from '../../../common/lib'; // eslint-disable-next-line import/no-default-export @@ -23,6 +25,13 @@ export default function alertTests({ getService }: FtrProviderContext) { const retry = getService('retry'); const esTestIndexTool = new ESTestIndexTool(es, retry); + function getAlertingTaskById(taskId: string) { + return supertestWithoutAuth + .get(`/api/alerting_tasks/${taskId}`) + .expect(200) + .then((response: SupertestResponse) => response.body); + } + describe('alerts', () => { let alertUtils: AlertUtils; let indexRecordActionId: string; @@ -100,6 +109,37 @@ export default function alertTests({ getService }: FtrProviderContext) { }); }); + it('should reschedule failing alerts using the alerting interval and not the Task Manager retry logic', async () => { + /* + Alerting does not use the Task Manager schedule and instead implements its own due to a current limitation + in TaskManager's ability to update an existing Task. + For this reason we need to handle the retry when Alert executors fail, as TaskManager doesn't understand that + alerting tasks are recurring tasks. + */ + const alertIntervalInSeconds = 30; + const reference = alertUtils.generateReference(); + const response = await alertUtils.createAlwaysFailingAction({ + reference, + overwrites: { schedule: { interval: `${alertIntervalInSeconds}s` } }, + }); + + expect(response.statusCode).to.eql(200); + + // wait for executor Alert Executor to be run, which means the underlying task is running + await esTestIndexTool.waitForDocs('alert:test.failing', reference); + + await retry.try(async () => { + const alertTask = (await getAlertingTaskById(response.body.scheduledTaskId)).docs[0]; + expect(alertTask.status).to.eql('idle'); + // ensure the alert is rescheduled to a minute from now + ensureDatetimeIsWithinRange( + Date.parse(alertTask.runAt), + alertIntervalInSeconds * 1000, + 5000 + ); + }); + }); + it('should handle custom retry logic', async () => { // We'll use this start time to query tasks created after this point const testStart = new Date();