From 23936fd9bf2d160bb40d0c9830aacac93d9e3348 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Wed, 17 Mar 2021 10:19:23 +0100 Subject: [PATCH 1/4] start implementing time offset for metrics --- .../data/common/search/aggs/agg_config.ts | 8 + .../data/common/search/aggs/metrics/count.ts | 7 + .../common/search/aggs/metrics/count_fn.ts | 4 + src/plugins/data/common/search/aggs/types.ts | 2 +- .../expressions/esaggs/request_handler.ts | 288 ++++++++++-------- .../editor_frame/frame_layout.tsx | 1 - .../operations/definitions/count.tsx | 1 + 7 files changed, 190 insertions(+), 121 deletions(-) diff --git a/src/plugins/data/common/search/aggs/agg_config.ts b/src/plugins/data/common/search/aggs/agg_config.ts index f62fedc13b32a0..bc8a13cba88cbc 100644 --- a/src/plugins/data/common/search/aggs/agg_config.ts +++ b/src/plugins/data/common/search/aggs/agg_config.ts @@ -17,6 +17,7 @@ import { SerializedFieldFormat, } from 'src/plugins/expressions/common'; +import moment from 'moment'; import { IAggType } from './agg_type'; import { writeParams } from './agg_params'; import { IAggConfigs } from './agg_configs'; @@ -172,6 +173,13 @@ export class AggConfig { return _.get(this.params, key); } + getTimeShift(): undefined | moment.Duration { + const rawTimeShift = this.getParam('timeShift'); + if (!rawTimeShift) return undefined; + const [, amount, unit] = rawTimeShift.match(/(\d+)(\w)/); + return moment.duration(Number(amount), unit); + } + write(aggs?: IAggConfigs) { return writeParams(this.type.params, this, aggs); } diff --git a/src/plugins/data/common/search/aggs/metrics/count.ts b/src/plugins/data/common/search/aggs/metrics/count.ts index 8a10d7edb3f835..2d2809e6bd4c88 100644 --- a/src/plugins/data/common/search/aggs/metrics/count.ts +++ b/src/plugins/data/common/search/aggs/metrics/count.ts @@ -25,6 +25,13 @@ export const getCountMetricAgg = () => defaultMessage: 'Count', }); }, + params: [ + { + name: 'timeShift', + type: 'string', + write: () => {}, + }, + ], getSerializedFormat(agg) { return { id: 'number', diff --git a/src/plugins/data/common/search/aggs/metrics/count_fn.ts b/src/plugins/data/common/search/aggs/metrics/count_fn.ts index 40c87db57eedca..c1b46f1a332d18 100644 --- a/src/plugins/data/common/search/aggs/metrics/count_fn.ts +++ b/src/plugins/data/common/search/aggs/metrics/count_fn.ts @@ -48,6 +48,10 @@ export const aggCount = (): FunctionDefinition => ({ defaultMessage: 'Schema to use for this aggregation', }), }, + timeShift: { + types: ['string'], + help: '', + }, customLabel: { types: ['string'], help: i18n.translate('data.search.aggs.metrics.count.customLabel.help', { diff --git a/src/plugins/data/common/search/aggs/types.ts b/src/plugins/data/common/search/aggs/types.ts index 48ded7fa7a7fcf..b3a59d2c5da80e 100644 --- a/src/plugins/data/common/search/aggs/types.ts +++ b/src/plugins/data/common/search/aggs/types.ts @@ -176,7 +176,7 @@ export interface AggParamsMapping { [BUCKET_TYPES.TERMS]: AggParamsTerms; [METRIC_TYPES.AVG]: AggParamsAvg; [METRIC_TYPES.CARDINALITY]: AggParamsCardinality; - [METRIC_TYPES.COUNT]: BaseAggParams; + [METRIC_TYPES.COUNT]: BaseAggParams & { timeShift?: string }; [METRIC_TYPES.GEO_BOUNDS]: AggParamsGeoBounds; [METRIC_TYPES.GEO_CENTROID]: AggParamsGeoCentroid; [METRIC_TYPES.MAX]: AggParamsMax; diff --git a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts index e2ee1a31757cb7..1732cf775ea1b6 100644 --- a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts +++ b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts @@ -7,6 +7,7 @@ */ import { i18n } from '@kbn/i18n'; +import moment from 'moment'; import { Adapters } from 'src/plugins/inspector/common'; import { @@ -62,126 +63,175 @@ export const handleRequest = async ({ searchSource.setField('index', indexPattern); searchSource.setField('size', 0); - // Create a new search source that inherits the original search source - // but has the appropriate timeRange applied via a filter. - // This is a temporary solution until we properly pass down all required - // information for the request to the request handler (https://github.com/elastic/kibana/issues/16641). - // Using callParentStartHandlers: true we make sure, that the parent searchSource - // onSearchRequestStart will be called properly even though we use an inherited - // search source. - const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true }); - const requestSearchSource = timeFilterSearchSource.createChild({ callParentStartHandlers: true }); - - aggs.setTimeRange(timeRange as TimeRange); - - // For now we need to mirror the history of the passed search source, since - // the request inspector wouldn't work otherwise. - Object.defineProperty(requestSearchSource, 'history', { - get() { - return searchSource.history; - }, - set(history) { - return (searchSource.history = history); - }, - }); - - requestSearchSource.setField('aggs', function () { - return aggs.toDsl(metricsAtAllLevels); - }); - - requestSearchSource.onRequestStart((paramSearchSource, options) => { - return aggs.onSearchRequestStart(paramSearchSource, options); - }); - - // If timeFields have been specified, use the specified ones, otherwise use primary time field of index - // pattern if it's available. - const defaultTimeField = indexPattern?.getTimeField?.(); - const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : []; - const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields; - - // If a timeRange has been specified and we had at least one timeField available, create range - // filters for that those time fields - if (timeRange && allTimeFields.length > 0) { - timeFilterSearchSource.setField('filter', () => { - return allTimeFields - .map((fieldName) => getTime(indexPattern, timeRange, { fieldName, forceNow })) - .filter(isRangeFilter); + const timeShifts: Record = {}; + aggs + .getAll() + .filter((agg) => agg.schema === 'metric') + .map((agg) => agg.getTimeShift()) + .forEach((timeShift) => { + timeShifts[String(timeShift?.asMilliseconds() || 0)] = timeShift; }); - } - - requestSearchSource.setField('filter', filters); - requestSearchSource.setField('query', query); - - let request; - if (inspectorAdapters.requests) { - inspectorAdapters.requests.reset(); - request = inspectorAdapters.requests.start( - i18n.translate('data.functions.esaggs.inspector.dataRequest.title', { - defaultMessage: 'Data', - }), - { - description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', { - defaultMessage: - 'This request queries Elasticsearch to fetch the data for the visualization.', - }), - searchSessionId, + + const originalAggs = aggs; + + const partialResponses = await Promise.all( + Object.values(timeShifts).map(async (timeShift) => { + if (timeShift) { + aggs = originalAggs.clone(); + aggs.aggs = aggs.aggs.filter( + (agg) => + agg.schema !== 'metric' || + (agg.getTimeShift() && + agg.getTimeShift()!.asMilliseconds() === timeShift.asMilliseconds()) + ); + } else { + aggs = originalAggs; + } + // Create a new search source that inherits the original search source + // but has the appropriate timeRange applied via a filter. + // This is a temporary solution until we properly pass down all required + // information for the request to the request handler (https://github.com/elastic/kibana/issues/16641). + // Using callParentStartHandlers: true we make sure, that the parent searchSource + // onSearchRequestStart will be called properly even though we use an inherited + // search source. + const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true }); + const requestSearchSource = timeFilterSearchSource.createChild({ + callParentStartHandlers: true, + }); + + const currentTimeRange: TimeRange | undefined = timeRange ? { ...timeRange } : undefined; + + if (currentTimeRange) { + if (timeShift && currentTimeRange.from) { + currentTimeRange.from = moment(currentTimeRange.from).subtract(timeShift).toISOString(); + } + + if (timeShift && currentTimeRange.from) { + currentTimeRange.to = moment(currentTimeRange.to).subtract(timeShift).toISOString(); + } + } + + aggs.setTimeRange(currentTimeRange as TimeRange); + + // For now we need to mirror the history of the passed search source, since + // the request inspector wouldn't work otherwise. + Object.defineProperty(requestSearchSource, 'history', { + get() { + return searchSource.history; + }, + set(history) { + return (searchSource.history = history); + }, + }); + + requestSearchSource.setField('aggs', function () { + return aggs.toDsl(metricsAtAllLevels); + }); + + requestSearchSource.onRequestStart((paramSearchSource, options) => { + return aggs.onSearchRequestStart(paramSearchSource, options); + }); + + // If timeFields have been specified, use the specified ones, otherwise use primary time field of index + // pattern if it's available. + const defaultTimeField = indexPattern?.getTimeField?.(); + const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : []; + const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields; + + // If a timeRange has been specified and we had at least one timeField available, create range + // filters for that those time fields + if (currentTimeRange && allTimeFields.length > 0) { + timeFilterSearchSource.setField('filter', () => { + return allTimeFields + .map((fieldName) => getTime(indexPattern, currentTimeRange, { fieldName, forceNow })) + .filter(isRangeFilter); + }); + } + + requestSearchSource.setField('filter', filters); + requestSearchSource.setField('query', query); + + let request; + if (inspectorAdapters.requests) { + inspectorAdapters.requests.reset(); + request = inspectorAdapters.requests.start( + i18n.translate('data.functions.esaggs.inspector.dataRequest.title', { + defaultMessage: 'Data', + }), + { + description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', { + defaultMessage: + 'This request queries Elasticsearch to fetch the data for the visualization.', + }), + searchSessionId, + } + ); + request.stats(getRequestInspectorStats(requestSearchSource)); + } + + try { + const response = await requestSearchSource.fetch({ + abortSignal, + sessionId: searchSessionId, + }); + + if (request) { + request.stats(getResponseInspectorStats(response, searchSource)).ok({ json: response }); + } + + if (!timeShift) { + (searchSource as any).rawResponse = response; + } + (requestSearchSource as any).rawResponse = response; + } catch (e) { + // Log any error during request to the inspector + if (request) { + request.error({ json: e }); + } + throw e; + } finally { + // Add the request body no matter if things went fine or not + if (request) { + request.json(await requestSearchSource.getSearchRequestBody()); + } + } + + // Note that rawResponse is not deeply cloned here, so downstream applications using courier + // must take care not to mutate it, or it could have unintended side effects, e.g. displaying + // response data incorrectly in the inspector. + let response = (requestSearchSource as any).rawResponse; + for (const agg of aggs.aggs) { + if (agg.enabled && typeof agg.type.postFlightRequest === 'function') { + response = await agg.type.postFlightRequest( + response, + aggs, + agg, + requestSearchSource, + inspectorAdapters.requests, + abortSignal, + searchSessionId + ); + } } - ); - request.stats(getRequestInspectorStats(requestSearchSource)); - } - - try { - const response = await requestSearchSource.fetch({ - abortSignal, - sessionId: searchSessionId, - }); - if (request) { - request.stats(getResponseInspectorStats(response, searchSource)).ok({ json: response }); - } - - (searchSource as any).rawResponse = response; - } catch (e) { - // Log any error during request to the inspector - if (request) { - request.error({ json: e }); - } - throw e; - } finally { - // Add the request body no matter if things went fine or not - if (request) { - request.json(await requestSearchSource.getSearchRequestBody()); - } - } - - // Note that rawResponse is not deeply cloned here, so downstream applications using courier - // must take care not to mutate it, or it could have unintended side effects, e.g. displaying - // response data incorrectly in the inspector. - let response = (searchSource as any).rawResponse; - for (const agg of aggs.aggs) { - if (agg.enabled && typeof agg.type.postFlightRequest === 'function') { - response = await agg.type.postFlightRequest( - response, - aggs, - agg, - requestSearchSource, - inspectorAdapters.requests, - abortSignal, - searchSessionId - ); - } - } - - const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null; - const tabifyParams = { - metricsAtAllLevels, - partialRows, - timeRange: parsedTimeRange - ? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields } - : undefined, - }; - - const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams); - - return tabifiedResponse; + const parsedTimeRange = currentTimeRange + ? calculateBounds(currentTimeRange, { forceNow }) + : null; + const tabifyParams = { + metricsAtAllLevels, + partialRows, + timeRange: parsedTimeRange + ? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields } + : undefined, + }; + + const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams); + console.log(tabifiedResponse); + + return tabifiedResponse; + }) + ); + + // todo - do an outer join on all partial responses + return partialResponses[0]; }; diff --git a/x-pack/plugins/lens/public/editor_frame_service/editor_frame/frame_layout.tsx b/x-pack/plugins/lens/public/editor_frame_service/editor_frame/frame_layout.tsx index a54901a2a2fe1d..1595e88bf98ecb 100644 --- a/x-pack/plugins/lens/public/editor_frame_service/editor_frame/frame_layout.tsx +++ b/x-pack/plugins/lens/public/editor_frame_service/editor_frame/frame_layout.tsx @@ -45,7 +45,6 @@ export function FrameLayout(props: FrameLayoutProps) { {props.workspacePanel} - {props.suggestionsPanel}
{ From 8fe95ef9c5a0126311338998a9eeab64ae04804b Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Wed, 17 Mar 2021 11:12:32 +0100 Subject: [PATCH 2/4] add stuff --- .../expressions/esaggs/request_handler.ts | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts index 1732cf775ea1b6..51b015f0a8b3ec 100644 --- a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts +++ b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts @@ -76,16 +76,18 @@ export const handleRequest = async ({ const partialResponses = await Promise.all( Object.values(timeShifts).map(async (timeShift) => { + let currentAggs = aggs; if (timeShift) { - aggs = originalAggs.clone(); - aggs.aggs = aggs.aggs.filter( + currentAggs = originalAggs.clone(); + currentAggs.aggs = currentAggs.aggs.filter( (agg) => agg.schema !== 'metric' || (agg.getTimeShift() && agg.getTimeShift()!.asMilliseconds() === timeShift.asMilliseconds()) ); } else { - aggs = originalAggs; + currentAggs = Object.values(timeShifts).length === 1 ? originalAggs : originalAggs.clone(); + currentAggs.aggs = currentAggs.aggs.filter((agg) => !agg.getTimeShift()); } // Create a new search source that inherits the original search source // but has the appropriate timeRange applied via a filter. @@ -111,7 +113,7 @@ export const handleRequest = async ({ } } - aggs.setTimeRange(currentTimeRange as TimeRange); + currentAggs.setTimeRange(currentTimeRange as TimeRange); // For now we need to mirror the history of the passed search source, since // the request inspector wouldn't work otherwise. @@ -125,11 +127,11 @@ export const handleRequest = async ({ }); requestSearchSource.setField('aggs', function () { - return aggs.toDsl(metricsAtAllLevels); + return currentAggs.toDsl(metricsAtAllLevels); }); requestSearchSource.onRequestStart((paramSearchSource, options) => { - return aggs.onSearchRequestStart(paramSearchSource, options); + return currentAggs.onSearchRequestStart(paramSearchSource, options); }); // If timeFields have been specified, use the specified ones, otherwise use primary time field of index @@ -200,11 +202,11 @@ export const handleRequest = async ({ // must take care not to mutate it, or it could have unintended side effects, e.g. displaying // response data incorrectly in the inspector. let response = (requestSearchSource as any).rawResponse; - for (const agg of aggs.aggs) { + for (const agg of currentAggs.aggs) { if (agg.enabled && typeof agg.type.postFlightRequest === 'function') { response = await agg.type.postFlightRequest( response, - aggs, + currentAggs, agg, requestSearchSource, inspectorAdapters.requests, @@ -225,7 +227,7 @@ export const handleRequest = async ({ : undefined, }; - const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams); + const tabifiedResponse = tabifyAggResponse(currentAggs, response, tabifyParams); console.log(tabifiedResponse); return tabifiedResponse; @@ -233,5 +235,17 @@ export const handleRequest = async ({ ); // todo - do an outer join on all partial responses + // if (partialResponses.length === 1) { + // return partialResponses[0]; + // } else { + // const joinAggs = aggs + // .bySchemaName('bucket') + // .filter((agg) => !timeFields || !timeFields.includes(agg.fieldName())); + // const fullResponse = partialResponses[0]; + // partialResponses.shift(); + // partialResponses.forEach(partialResponse => { + + // }); + // } return partialResponses[0]; }; From bfd355d905693ad46e23b7a8c2b6830a382d72a9 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Thu, 18 Mar 2021 11:46:48 +0100 Subject: [PATCH 3/4] join tables --- .../expressions/esaggs/request_handler.ts | 119 ++++++++++++++---- 1 file changed, 96 insertions(+), 23 deletions(-) diff --git a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts index 51b015f0a8b3ec..42d68f41b9b1ac 100644 --- a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts +++ b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts @@ -8,6 +8,7 @@ import { i18n } from '@kbn/i18n'; import moment from 'moment'; +import { DatatableColumn, DatatableRow } from 'src/plugins/expressions'; import { Adapters } from 'src/plugins/inspector/common'; import { @@ -76,18 +77,18 @@ export const handleRequest = async ({ const partialResponses = await Promise.all( Object.values(timeShifts).map(async (timeShift) => { - let currentAggs = aggs; + const currentAggs = aggs; if (timeShift) { - currentAggs = originalAggs.clone(); - currentAggs.aggs = currentAggs.aggs.filter( - (agg) => - agg.schema !== 'metric' || - (agg.getTimeShift() && - agg.getTimeShift()!.asMilliseconds() === timeShift.asMilliseconds()) - ); + // currentAggs = originalAggs.clone(); + // currentAggs.aggs = currentAggs.aggs.filter( + // (agg) => + // agg.schema !== 'metric' || + // (agg.getTimeShift() && + // agg.getTimeShift()!.asMilliseconds() === timeShift.asMilliseconds()) + // ); } else { - currentAggs = Object.values(timeShifts).length === 1 ? originalAggs : originalAggs.clone(); - currentAggs.aggs = currentAggs.aggs.filter((agg) => !agg.getTimeShift()); + // currentAggs = Object.values(timeShifts).length === 1 ? originalAggs : originalAggs.clone(); + // currentAggs.aggs = currentAggs.aggs.filter((agg) => !agg.getTimeShift()); } // Create a new search source that inherits the original search source // but has the appropriate timeRange applied via a filter. @@ -235,17 +236,89 @@ export const handleRequest = async ({ ); // todo - do an outer join on all partial responses - // if (partialResponses.length === 1) { - // return partialResponses[0]; - // } else { - // const joinAggs = aggs - // .bySchemaName('bucket') - // .filter((agg) => !timeFields || !timeFields.includes(agg.fieldName())); - // const fullResponse = partialResponses[0]; - // partialResponses.shift(); - // partialResponses.forEach(partialResponse => { - - // }); - // } - return partialResponses[0]; + if (partialResponses.length === 1) { + return partialResponses[0]; + } else { + const fullResponse = partialResponses[0]; + const fullResponseTimeShift = Object.values(timeShifts)[0]; + fullResponse.rows.forEach((row) => { + fullResponse.columns.forEach((column) => { + const columnAgg = aggs.aggs.find((a) => a.id === column.meta.sourceParams.id)!; + if ( + columnAgg.getTimeShift()?.asMilliseconds() !== fullResponseTimeShift?.asMilliseconds() + ) { + delete row[column.id]; + } + }); + }); + const joinAggs = aggs + .bySchemaName('bucket') + .filter((agg) => !timeFields || !timeFields.includes(agg.fieldName())); + const joinColumns = fullResponse.columns.filter( + (c) => + c.meta.sourceParams.schema !== 'metric' && + (!timeFields || !timeFields.includes(c.meta.sourceParams?.params?.field)) + ); + const timeJoinAggs = aggs + .bySchemaName('bucket') + .filter((agg) => timeFields && timeFields.includes(agg.fieldName())); + const timeJoinColumns = fullResponse.columns.filter( + (c) => + c.meta.sourceParams.schema !== 'metric' && + timeFields && + timeFields.includes(c.meta.sourceParams?.params?.field) + ); + partialResponses.shift(); + partialResponses.forEach((partialResponse, index) => { + const timeShift = Object.values(timeShifts)[index + 1]; + const missingCols: DatatableColumn[] = []; + partialResponse.columns.forEach((column) => { + const columnAgg = aggs.aggs.find((a) => a.id === column.meta.sourceParams.id)!; + if (columnAgg.getTimeShift()?.asMilliseconds() === timeShift?.asMilliseconds()) { + missingCols.push(column); + } + }); + partialResponse.rows.forEach((row) => { + const targetRow = getColumnIdentifier(joinColumns, row, timeJoinColumns, timeShift); + const targetRowIndex = fullResponse.rows.findIndex((r) => { + return ( + getColumnIdentifier(joinColumns, r, timeJoinColumns, moment.duration(0, 'ms')) === + targetRow + ); + }); + if (targetRowIndex !== -1) { + missingCols.forEach((c) => { + fullResponse.rows[targetRowIndex][c.id] = row[c.id]; + }); + } else { + // add it to the bottom - this might be confusing in some cases + // can we insert it at the right place? + const updatedRow: DatatableRow = {}; + joinColumns.forEach((c) => { + updatedRow[c.id] = row[c.id]; + }); + timeJoinColumns.forEach((c) => { + updatedRow[c.id] = moment(row[c.id]).add(timeShift).valueOf(); + }); + missingCols.forEach((c) => { + updatedRow[c.id] = row[c.id]; + }); + fullResponse.rows.push(updatedRow); + } + }); + }); + return fullResponse; + } }; +function getColumnIdentifier( + joinColumns: DatatableColumn[], + row: DatatableRow, + timeJoinColumns: DatatableColumn[], + timeShift: moment.Duration | undefined +) { + const joinStr = joinColumns.map((c) => String(row[c.id])); + const timeJoinStr = timeJoinColumns.map((c) => + String(moment(row[c.id]).add(timeShift).valueOf()) + ); + return joinStr.join(',') + timeJoinStr.join(','); +} From abac552a049ff239e34a28e9e4e75905733e837f Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 29 Mar 2021 11:19:08 +0200 Subject: [PATCH 4/4] time offset using filters --- .../es_query/es_query/build_es_query.ts | 30 +- .../data/common/search/aggs/agg_config.ts | 5 + .../data/common/search/aggs/agg_type.ts | 8 + .../search/aggs/buckets/date_histogram.ts | 16 + .../common/search/aggs/buckets/filters.ts | 2 +- .../data/common/search/aggs/buckets/terms.ts | 4 + .../expressions/esaggs/request_handler.ts | 304 +++++++++++++----- 7 files changed, 281 insertions(+), 88 deletions(-) diff --git a/src/plugins/data/common/es_query/es_query/build_es_query.ts b/src/plugins/data/common/es_query/es_query/build_es_query.ts index 18b360de9aaa64..bdbb381262ccca 100644 --- a/src/plugins/data/common/es_query/es_query/build_es_query.ts +++ b/src/plugins/data/common/es_query/es_query/build_es_query.ts @@ -50,6 +50,13 @@ export function buildEsQuery( config.allowLeadingWildcards, config.dateFormatTZ ); + // TODO there is probably a more elegant way than this. + // We need to pass raw queries to the filters agg somehow + const rawQuery = buildQueryFromFilters( + queriesByLanguage.rawQuery ? (queriesByLanguage.rawQuery.map((q) => q.query) as Filter[]) : [], + indexPattern, + config.ignoreFilterIfFieldNotInIndex + ); const luceneQuery = buildQueryFromLucene( queriesByLanguage.lucene, config.queryStringOptions, @@ -63,10 +70,25 @@ export function buildEsQuery( return { bool: { - must: [...kueryQuery.must, ...luceneQuery.must, ...filterQuery.must], - filter: [...kueryQuery.filter, ...luceneQuery.filter, ...filterQuery.filter], - should: [...kueryQuery.should, ...luceneQuery.should, ...filterQuery.should], - must_not: [...kueryQuery.must_not, ...luceneQuery.must_not, ...filterQuery.must_not], + must: [...kueryQuery.must, ...luceneQuery.must, ...filterQuery.must, ...rawQuery.must], + filter: [ + ...kueryQuery.filter, + ...luceneQuery.filter, + ...filterQuery.filter, + ...rawQuery.filter, + ], + should: [ + ...kueryQuery.should, + ...luceneQuery.should, + ...filterQuery.should, + ...rawQuery.should, + ], + must_not: [ + ...kueryQuery.must_not, + ...luceneQuery.must_not, + ...filterQuery.must_not, + ...rawQuery.must_not, + ], }, }; } diff --git a/src/plugins/data/common/search/aggs/agg_config.ts b/src/plugins/data/common/search/aggs/agg_config.ts index da42da79a87e1f..ee4e96ffcb46a7 100644 --- a/src/plugins/data/common/search/aggs/agg_config.ts +++ b/src/plugins/data/common/search/aggs/agg_config.ts @@ -180,6 +180,11 @@ export class AggConfig { return moment.duration(Number(amount), unit); } + getTimeShiftedFilter(timeShift: moment.Duration, value: any) { + // TODO better handling for implementation vs no implementation + return this.type.getTimeShiftedFilter!(this, timeShift, value); + } + write(aggs?: IAggConfigs) { return writeParams(this.type.params, this, aggs); } diff --git a/src/plugins/data/common/search/aggs/agg_type.ts b/src/plugins/data/common/search/aggs/agg_type.ts index 33fdc45a605b71..6e8025edfa93d6 100644 --- a/src/plugins/data/common/search/aggs/agg_type.ts +++ b/src/plugins/data/common/search/aggs/agg_type.ts @@ -53,6 +53,7 @@ export interface AggTypeConfig< getValue?: (agg: TAggConfig, bucket: any) => any; getKey?: (bucket: any, key: any, agg: TAggConfig) => any; getValueBucketPath?: (agg: TAggConfig) => string; + getTimeShiftedFilter?: (agg: TAggConfig, timeShift: moment.Duration, value: any) => any; } // TODO need to make a more explicit interface for this @@ -210,6 +211,8 @@ export class AggType< getValue: (agg: TAggConfig, bucket: any) => any; + getTimeShiftedFilter: (agg: TAggConfig, timeShift: moment.Duration, value: any) => any; + getKey?: (bucket: any, key: any, agg: TAggConfig) => any; paramByName = (name: string) => { @@ -283,6 +286,11 @@ export class AggType< this.getResponseAggs = config.getResponseAggs || (() => {}); this.decorateAggConfig = config.decorateAggConfig || (() => ({})); this.postFlightRequest = config.postFlightRequest || identity; + this.getTimeShiftedFilter = + config.getTimeShiftedFilter || + (() => { + throw new Error('not implemented'); + }); this.getSerializedFormat = config.getSerializedFormat || diff --git a/src/plugins/data/common/search/aggs/buckets/date_histogram.ts b/src/plugins/data/common/search/aggs/buckets/date_histogram.ts index 61ad66d7efdc9a..633ca15965550b 100644 --- a/src/plugins/data/common/search/aggs/buckets/date_histogram.ts +++ b/src/plugins/data/common/search/aggs/buckets/date_histogram.ts @@ -133,6 +133,22 @@ export const getDateHistogramBucketAgg = ({ }, }; }, + getTimeShiftedFilter: (agg, timeShift, val) => { + const bucketStart = moment(val).subtract(timeShift); + updateTimeBuckets(agg, calculateBounds); + + const { useNormalizedEsInterval } = agg.params; + const interval = agg.buckets.getInterval(useNormalizedEsInterval); + const bucketEnd = bucketStart.clone().add(interval); + return { + range: { + [agg.fieldName()]: { + gte: bucketStart.toISOString(), + lte: bucketEnd.toISOString(), + }, + }, + }; + }, params: [ { name: 'field', diff --git a/src/plugins/data/common/search/aggs/buckets/filters.ts b/src/plugins/data/common/search/aggs/buckets/filters.ts index 107b86de040587..e8cdceb4bf7bf6 100644 --- a/src/plugins/data/common/search/aggs/buckets/filters.ts +++ b/src/plugins/data/common/search/aggs/buckets/filters.ts @@ -23,7 +23,7 @@ const filtersTitle = i18n.translate('data.search.aggs.buckets.filtersTitle', { 'The name of an aggregation, that allows to specify multiple individual filters to group data by.', }); -interface FilterValue { +export interface FilterValue { input: Query; label: string; id: string; diff --git a/src/plugins/data/common/search/aggs/buckets/terms.ts b/src/plugins/data/common/search/aggs/buckets/terms.ts index 7d37dc83405b8b..2d4ed16c988d10 100644 --- a/src/plugins/data/common/search/aggs/buckets/terms.ts +++ b/src/plugins/data/common/search/aggs/buckets/terms.ts @@ -140,6 +140,10 @@ export const getTermsBucketAgg = () => } return resp; }, + getTimeShiftedFilter: (agg, _timeShift, val) => { + // TODO this doesn't work for other/missing buckets + return agg.createFilter(val).query; + }, params: [ { name: 'field', diff --git a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts index 42d68f41b9b1ac..ba74f08dfa2694 100644 --- a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts +++ b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts @@ -21,7 +21,7 @@ import { TimeRange, } from '../../../../common'; -import { IAggConfigs } from '../../aggs'; +import { FilterValue, IAggConfigs } from '../../aggs'; import { ISearchStartSearchSource } from '../../search_source'; import { tabifyAggResponse } from '../../tabify'; import { getRequestInspectorStats, getResponseInspectorStats } from '../utils'; @@ -64,32 +64,191 @@ export const handleRequest = async ({ searchSource.setField('index', indexPattern); searchSource.setField('size', 0); - const timeShifts: Record = {}; + const timeShifts: Record = {}; aggs .getAll() .filter((agg) => agg.schema === 'metric') .map((agg) => agg.getTimeShift()) .forEach((timeShift) => { - timeShifts[String(timeShift?.asMilliseconds() || 0)] = timeShift; + if (timeShift) { + timeShifts[String(timeShift.asMilliseconds())] = timeShift; + } }); const originalAggs = aggs; + const buildBaseResponse = async () => { + const currentAggs = aggs; + const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true }); + const requestSearchSource = timeFilterSearchSource.createChild({ + callParentStartHandlers: true, + }); + + const currentTimeRange: TimeRange | undefined = timeRange ? { ...timeRange } : undefined; + + currentAggs.setTimeRange(currentTimeRange as TimeRange); + + // For now we need to mirror the history of the passed search source, since + // the request inspector wouldn't work otherwise. + Object.defineProperty(requestSearchSource, 'history', { + get() { + return searchSource.history; + }, + set(history) { + return (searchSource.history = history); + }, + }); + + requestSearchSource.setField('aggs', function () { + return currentAggs.toDsl(metricsAtAllLevels); + }); + + requestSearchSource.onRequestStart((paramSearchSource, options) => { + return currentAggs.onSearchRequestStart(paramSearchSource, options); + }); + + // If timeFields have been specified, use the specified ones, otherwise use primary time field of index + // pattern if it's available. + const defaultTimeField = indexPattern?.getTimeField?.(); + const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : []; + const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields; + + // If a timeRange has been specified and we had at least one timeField available, create range + // filters for that those time fields + if (currentTimeRange && allTimeFields.length > 0) { + timeFilterSearchSource.setField('filter', () => { + return allTimeFields + .map((fieldName) => getTime(indexPattern, currentTimeRange, { fieldName, forceNow })) + .filter(isRangeFilter); + }); + } + + requestSearchSource.setField('filter', filters); + requestSearchSource.setField('query', query); + + let request; + if (inspectorAdapters.requests) { + inspectorAdapters.requests.reset(); + request = inspectorAdapters.requests.start( + i18n.translate('data.functions.esaggs.inspector.dataRequest.title', { + defaultMessage: 'Data', + }), + { + description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', { + defaultMessage: + 'This request queries Elasticsearch to fetch the data for the visualization.', + }), + searchSessionId, + } + ); + request.stats(getRequestInspectorStats(requestSearchSource)); + } + + try { + const response = await requestSearchSource.fetch({ + abortSignal, + sessionId: searchSessionId, + }); + + if (request) { + request.stats(getResponseInspectorStats(response, searchSource)).ok({ json: response }); + } + (requestSearchSource as any).rawResponse = response; + } catch (e) { + // Log any error during request to the inspector + if (request) { + request.error({ json: e }); + } + throw e; + } finally { + // Add the request body no matter if things went fine or not + if (request) { + request.json(await requestSearchSource.getSearchRequestBody()); + } + } + + // Note that rawResponse is not deeply cloned here, so downstream applications using courier + // must take care not to mutate it, or it could have unintended side effects, e.g. displaying + // response data incorrectly in the inspector. + let response = (requestSearchSource as any).rawResponse; + for (const agg of currentAggs.aggs) { + if (agg.enabled && typeof agg.type.postFlightRequest === 'function') { + response = await agg.type.postFlightRequest( + response, + currentAggs, + agg, + requestSearchSource, + inspectorAdapters.requests, + abortSignal, + searchSessionId + ); + } + } + + const parsedTimeRange = currentTimeRange + ? calculateBounds(currentTimeRange, { forceNow }) + : null; + const tabifyParams = { + metricsAtAllLevels, + partialRows, + timeRange: parsedTimeRange + ? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields } + : undefined, + }; + + const tabifiedResponse = tabifyAggResponse(currentAggs, response, tabifyParams); + return tabifiedResponse; + }; + + const baseResponse = await buildBaseResponse(); + + const filterableAggLength = aggs.aggs.filter((agg) => agg.schema === 'segment').length; + const partialResponses = await Promise.all( Object.values(timeShifts).map(async (timeShift) => { - const currentAggs = aggs; - if (timeShift) { - // currentAggs = originalAggs.clone(); - // currentAggs.aggs = currentAggs.aggs.filter( - // (agg) => - // agg.schema !== 'metric' || - // (agg.getTimeShift() && - // agg.getTimeShift()!.asMilliseconds() === timeShift.asMilliseconds()) - // ); - } else { - // currentAggs = Object.values(timeShifts).length === 1 ? originalAggs : originalAggs.clone(); - // currentAggs.aggs = currentAggs.aggs.filter((agg) => !agg.getTimeShift()); - } + const currentAggs = aggs.clone(); + // assuming those are ordered correctly + const filterableAggs = currentAggs.aggs.filter((agg) => agg.schema === 'segment'); + const rootFilterAgg = currentAggs.createAggConfig( + { + type: 'filters', + id: 'rootSerialization', + params: { + filters: baseResponse.rows.map((row, rowIndex) => { + const filter: Filter = { + meta: { + alias: null, + disabled: false, + negate: false, + }, + query: { + bool: { + filter: filterableAggs.map((a, i) => { + return a.getTimeShiftedFilter(timeShift, row[baseResponse.columns[i].id]); + }), + }, + }, + }; + return { + input: { + query: filter, + language: 'rawQuery', + }, + label: `serialized-row-${rowIndex}`, + id: `serialized-row-${rowIndex}`, + }; + }), + }, + enabled: true, + }, + { + addToAggConfigs: true, + } + ); + currentAggs.aggs = [ + rootFilterAgg, + ...currentAggs.aggs.filter((agg) => agg.schema === 'metric'), + ]; // Create a new search source that inherits the original search source // but has the appropriate timeRange applied via a filter. // This is a temporary solution until we properly pass down all required @@ -105,11 +264,11 @@ export const handleRequest = async ({ const currentTimeRange: TimeRange | undefined = timeRange ? { ...timeRange } : undefined; if (currentTimeRange) { - if (timeShift && currentTimeRange.from) { + if (currentTimeRange.from) { currentTimeRange.from = moment(currentTimeRange.from).subtract(timeShift).toISOString(); } - if (timeShift && currentTimeRange.from) { + if (currentTimeRange.from) { currentTimeRange.to = moment(currentTimeRange.to).subtract(timeShift).toISOString(); } } @@ -229,85 +388,64 @@ export const handleRequest = async ({ }; const tabifiedResponse = tabifyAggResponse(currentAggs, response, tabifyParams); - console.log(tabifiedResponse); return tabifiedResponse; }) ); // todo - do an outer join on all partial responses - if (partialResponses.length === 1) { - return partialResponses[0]; - } else { - const fullResponse = partialResponses[0]; - const fullResponseTimeShift = Object.values(timeShifts)[0]; - fullResponse.rows.forEach((row) => { - fullResponse.columns.forEach((column) => { - const columnAgg = aggs.aggs.find((a) => a.id === column.meta.sourceParams.id)!; - if ( - columnAgg.getTimeShift()?.asMilliseconds() !== fullResponseTimeShift?.asMilliseconds() - ) { - delete row[column.id]; - } - }); - }); - const joinAggs = aggs - .bySchemaName('bucket') - .filter((agg) => !timeFields || !timeFields.includes(agg.fieldName())); - const joinColumns = fullResponse.columns.filter( - (c) => - c.meta.sourceParams.schema !== 'metric' && - (!timeFields || !timeFields.includes(c.meta.sourceParams?.params?.field)) - ); - const timeJoinAggs = aggs - .bySchemaName('bucket') - .filter((agg) => timeFields && timeFields.includes(agg.fieldName())); - const timeJoinColumns = fullResponse.columns.filter( - (c) => - c.meta.sourceParams.schema !== 'metric' && - timeFields && - timeFields.includes(c.meta.sourceParams?.params?.field) - ); - partialResponses.shift(); + if (partialResponses.length > 0) { + // fullResponse.rows.forEach((row) => { + // fullResponse.columns.forEach((column) => { + // const columnAgg = aggs.aggs.find((a) => a.id === column.meta.sourceParams.id)!; + // if ( + // columnAgg.getTimeShift()?.asMilliseconds() !== fullResponseTimeShift?.asMilliseconds() + // ) { + // delete row[column.id]; + // } + // }); + // }); + // const joinAggs = aggs + // .bySchemaName('bucket') + // .filter((agg) => !timeFields || !timeFields.includes(agg.fieldName())); + // const joinColumns = fullResponse.columns.filter( + // (c) => + // c.meta.sourceParams.schema !== 'metric' && + // (!timeFields || !timeFields.includes(c.meta.sourceParams?.params?.field)) + // ); + // const timeJoinAggs = aggs + // .bySchemaName('bucket') + // .filter((agg) => timeFields && timeFields.includes(agg.fieldName())); + // const timeJoinColumns = fullResponse.columns.filter( + // (c) => + // c.meta.sourceParams.schema !== 'metric' && + // timeFields && + // timeFields.includes(c.meta.sourceParams?.params?.field) + // ); partialResponses.forEach((partialResponse, index) => { - const timeShift = Object.values(timeShifts)[index + 1]; - const missingCols: DatatableColumn[] = []; - partialResponse.columns.forEach((column) => { + const timeShift = Object.values(timeShifts)[index]; + const missingColIndexes: number[] = []; + partialResponse.columns.forEach((column, colIndex) => { + // skip the serialized filters agg + if (colIndex === 0) { + return; + } const columnAgg = aggs.aggs.find((a) => a.id === column.meta.sourceParams.id)!; if (columnAgg.getTimeShift()?.asMilliseconds() === timeShift?.asMilliseconds()) { - missingCols.push(column); + missingColIndexes.push(colIndex - 1 + filterableAggLength); } }); - partialResponse.rows.forEach((row) => { - const targetRow = getColumnIdentifier(joinColumns, row, timeJoinColumns, timeShift); - const targetRowIndex = fullResponse.rows.findIndex((r) => { - return ( - getColumnIdentifier(joinColumns, r, timeJoinColumns, moment.duration(0, 'ms')) === - targetRow - ); + partialResponse.rows.forEach((row, i2) => { + const targetRowIndex = i2; + missingColIndexes.forEach((baseResponseIndex) => { + baseResponse.rows[targetRowIndex][baseResponse.columns[baseResponseIndex].id] = + row[partialResponse.columns[baseResponseIndex + 1 - filterableAggLength].id]; }); - if (targetRowIndex !== -1) { - missingCols.forEach((c) => { - fullResponse.rows[targetRowIndex][c.id] = row[c.id]; - }); - } else { - // add it to the bottom - this might be confusing in some cases - // can we insert it at the right place? - const updatedRow: DatatableRow = {}; - joinColumns.forEach((c) => { - updatedRow[c.id] = row[c.id]; - }); - timeJoinColumns.forEach((c) => { - updatedRow[c.id] = moment(row[c.id]).add(timeShift).valueOf(); - }); - missingCols.forEach((c) => { - updatedRow[c.id] = row[c.id]; - }); - fullResponse.rows.push(updatedRow); - } }); }); - return fullResponse; + return baseResponse; + } else { + return baseResponse; } }; function getColumnIdentifier(