Skip to content

Commit

Permalink
[7.x] Introduce geo-threshold alerts (#76285) (#79591)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Caldwell committed Oct 6, 2020
1 parent f09a57e commit c0e2275
Show file tree
Hide file tree
Showing 31 changed files with 2,691 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { i18n } from '@kbn/i18n';
import { schema } from '@kbn/config-schema';
import { Service } from '../../types';
import { BUILT_IN_ALERTS_FEATURE_ID } from '../../../common';
import { getGeoThresholdExecutor } from './geo_threshold';
import {
ActionGroup,
AlertServices,
ActionVariable,
AlertTypeState,
} from '../../../../alerts/server';

export const GEO_THRESHOLD_ID = '.geo-threshold';
export type TrackingEvent = 'entered' | 'exited';
export const ActionGroupId = 'tracking threshold met';

const actionVariableContextToEntityDateTimeLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextToEntityDateTimeLabel',
{
defaultMessage: `The time the entity was detected in the current boundary`,
}
);

const actionVariableContextFromEntityDateTimeLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextFromEntityDateTimeLabel',
{
defaultMessage: `The last time the entity was recorded in the previous boundary`,
}
);

const actionVariableContextToEntityLocationLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextToEntityLocationLabel',
{
defaultMessage: 'The most recently captured location of the entity',
}
);

const actionVariableContextCrossingLineLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextCrossingLineLabel',
{
defaultMessage:
'GeoJSON line connecting the two locations that were used to determine the crossing event',
}
);

const actionVariableContextFromEntityLocationLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextFromEntityLocationLabel',
{
defaultMessage: 'The previously captured location of the entity',
}
);

const actionVariableContextToBoundaryIdLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextCurrentBoundaryIdLabel',
{
defaultMessage: 'The current boundary id containing the entity (if any)',
}
);

const actionVariableContextToBoundaryNameLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextToBoundaryNameLabel',
{
defaultMessage: 'The boundary (if any) the entity has crossed into and is currently located',
}
);

const actionVariableContextFromBoundaryNameLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextFromBoundaryNameLabel',
{
defaultMessage: 'The boundary (if any) the entity has crossed from and was previously located',
}
);

const actionVariableContextFromBoundaryIdLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextFromBoundaryIdLabel',
{
defaultMessage: 'The previous boundary id containing the entity (if any)',
}
);

const actionVariableContextToEntityDocumentIdLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextCrossingDocumentIdLabel',
{
defaultMessage: 'The id of the crossing entity document',
}
);

const actionVariableContextFromEntityDocumentIdLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextFromEntityDocumentIdLabel',
{
defaultMessage: 'The id of the crossing entity document',
}
);

const actionVariableContextTimeOfDetectionLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextTimeOfDetectionLabel',
{
defaultMessage: 'The alert interval end time this change was recorded',
}
);

const actionVariableContextEntityIdLabel = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionVariableContextEntityIdLabel',
{
defaultMessage: 'The entity ID of the document that triggered the alert',
}
);

const actionVariables = {
context: [
// Alert-specific data
{ name: 'entityId', description: actionVariableContextEntityIdLabel },
{ name: 'timeOfDetection', description: actionVariableContextTimeOfDetectionLabel },
{ name: 'crossingLine', description: actionVariableContextCrossingLineLabel },

// Corresponds to a specific document in the entity-index
{ name: 'toEntityLocation', description: actionVariableContextToEntityLocationLabel },
{
name: 'toEntityDateTime',
description: actionVariableContextToEntityDateTimeLabel,
},
{ name: 'toEntityDocumentId', description: actionVariableContextToEntityDocumentIdLabel },

// Corresponds to a specific document in the boundary-index
{ name: 'toBoundaryId', description: actionVariableContextToBoundaryIdLabel },
{ name: 'toBoundaryName', description: actionVariableContextToBoundaryNameLabel },

// Corresponds to a specific document in the entity-index (from)
{ name: 'fromEntityLocation', description: actionVariableContextFromEntityLocationLabel },
{ name: 'fromEntityDateTime', description: actionVariableContextFromEntityDateTimeLabel },
{ name: 'fromEntityDocumentId', description: actionVariableContextFromEntityDocumentIdLabel },

// Corresponds to a specific document in the boundary-index (from)
{ name: 'fromBoundaryId', description: actionVariableContextFromBoundaryIdLabel },
{ name: 'fromBoundaryName', description: actionVariableContextFromBoundaryNameLabel },
],
};

export const ParamsSchema = schema.object({
index: schema.string({ minLength: 1 }),
indexId: schema.string({ minLength: 1 }),
geoField: schema.string({ minLength: 1 }),
entity: schema.string({ minLength: 1 }),
dateField: schema.string({ minLength: 1 }),
trackingEvent: schema.string({ minLength: 1 }),
boundaryType: schema.string({ minLength: 1 }),
boundaryIndexTitle: schema.string({ minLength: 1 }),
boundaryIndexId: schema.string({ minLength: 1 }),
boundaryGeoField: schema.string({ minLength: 1 }),
boundaryNameField: schema.maybe(schema.string({ minLength: 1 })),
delayOffsetWithUnits: schema.maybe(schema.string({ minLength: 1 })),
});

export interface GeoThresholdParams {
index: string;
indexId: string;
geoField: string;
entity: string;
dateField: string;
trackingEvent: string;
boundaryType: string;
boundaryIndexTitle: string;
boundaryIndexId: string;
boundaryGeoField: string;
boundaryNameField?: string;
delayOffsetWithUnits?: string;
}

export function getAlertType(
service: Omit<Service, 'indexThreshold'>
): {
defaultActionGroupId: string;
actionGroups: ActionGroup[];
executor: ({
previousStartedAt: currIntervalStartTime,
startedAt: currIntervalEndTime,
services,
params,
alertId,
state,
}: {
previousStartedAt: Date | null;
startedAt: Date;
services: AlertServices;
params: GeoThresholdParams;
alertId: string;
state: AlertTypeState;
}) => Promise<AlertTypeState>;
validate?: {
params?: {
validate: (object: unknown) => GeoThresholdParams;
};
};
name: string;
producer: string;
id: string;
actionVariables?: {
context?: ActionVariable[];
state?: ActionVariable[];
params?: ActionVariable[];
};
} {
const alertTypeName = i18n.translate('xpack.alertingBuiltins.geoThreshold.alertTypeTitle', {
defaultMessage: 'Geo tracking threshold',
});

const actionGroupName = i18n.translate(
'xpack.alertingBuiltins.geoThreshold.actionGroupThresholdMetTitle',
{
defaultMessage: 'Tracking threshold met',
}
);

return {
id: GEO_THRESHOLD_ID,
name: alertTypeName,
actionGroups: [{ id: ActionGroupId, name: actionGroupName }],
defaultActionGroupId: ActionGroupId,
executor: getGeoThresholdExecutor(service),
producer: BUILT_IN_ALERTS_FEATURE_ID,
validate: {
params: ParamsSchema,
},
actionVariables,
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { ILegacyScopedClusterClient } from 'kibana/server';
import { SearchResponse } from 'elasticsearch';
import { Logger } from '../../types';

export const OTHER_CATEGORY = 'other';
// Consider dynamically obtaining from config?
const MAX_TOP_LEVEL_QUERY_SIZE = 0;
const MAX_SHAPES_QUERY_SIZE = 10000;
const MAX_BUCKETS_LIMIT = 65535;

export async function getShapesFilters(
boundaryIndexTitle: string,
boundaryGeoField: string,
geoField: string,
callCluster: ILegacyScopedClusterClient['callAsCurrentUser'],
log: Logger,
alertId: string,
boundaryNameField?: string
) {
const filters: Record<string, unknown> = {};
const shapesIdsNamesMap: Record<string, unknown> = {};
// Get all shapes in index
const boundaryData: SearchResponse<Record<string, unknown>> = await callCluster('search', {
index: boundaryIndexTitle,
body: {
size: MAX_SHAPES_QUERY_SIZE,
},
});
boundaryData.hits.hits.forEach(({ _index, _id }) => {
filters[_id] = {
geo_shape: {
[geoField]: {
indexed_shape: {
index: _index,
id: _id,
path: boundaryGeoField,
},
},
},
};
});
if (boundaryNameField) {
boundaryData.hits.hits.forEach(
({ _source, _id }: { _source: Record<string, unknown>; _id: string }) => {
shapesIdsNamesMap[_id] = _source[boundaryNameField];
}
);
}
return {
shapesFilters: filters,
shapesIdsNamesMap,
};
}

export async function executeEsQueryFactory(
{
entity,
index,
dateField,
boundaryGeoField,
geoField,
boundaryIndexTitle,
}: {
entity: string;
index: string;
dateField: string;
boundaryGeoField: string;
geoField: string;
boundaryIndexTitle: string;
boundaryNameField?: string;
},
{ callCluster }: { callCluster: ILegacyScopedClusterClient['callAsCurrentUser'] },
log: Logger,
shapesFilters: Record<string, unknown>
) {
return async (
gteDateTime: Date | null,
ltDateTime: Date | null
): Promise<SearchResponse<unknown> | undefined> => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const esQuery: Record<string, any> = {
index,
body: {
size: MAX_TOP_LEVEL_QUERY_SIZE,
aggs: {
shapes: {
filters: {
other_bucket_key: OTHER_CATEGORY,
filters: shapesFilters,
},
aggs: {
entitySplit: {
terms: {
size: MAX_BUCKETS_LIMIT / ((Object.keys(shapesFilters).length || 1) * 2),
field: entity,
},
aggs: {
entityHits: {
top_hits: {
size: 1,
sort: [
{
[dateField]: {
order: 'desc',
},
},
],
docvalue_fields: [entity, dateField, geoField],
_source: false,
},
},
},
},
},
},
},
query: {
bool: {
must: [],
filter: [
{
match_all: {},
},
{
range: {
[dateField]: {
...(gteDateTime ? { gte: gteDateTime } : {}),
lt: ltDateTime, // 'less than' to prevent overlap between intervals
format: 'strict_date_optional_time',
},
},
},
],
should: [],
must_not: [],
},
},
stored_fields: ['*'],
docvalue_fields: [
{
field: dateField,
format: 'date_time',
},
],
},
};

let esResult: SearchResponse<unknown> | undefined;
try {
esResult = await callCluster('search', esQuery);
} catch (err) {
log.warn(`${err.message}`);
}
return esResult;
};
}
Loading

0 comments on commit c0e2275

Please sign in to comment.