Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Infra UI] Convert terms aggregation to composite for field selection filtering #47500

Merged
merged 9 commits into from
Oct 18, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,27 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { startsWith, uniq } from 'lodash';
import { InfraBackendFrameworkAdapter, InfraFrameworkRequest } from '../framework';
import { set, startsWith, uniq, first } from 'lodash';
import {
InfraBackendFrameworkAdapter,
InfraFrameworkRequest,
InfraDatabaseSearchResponse,
} from '../framework';
import { FieldsAdapter, IndexFieldDescriptor } from './adapter_types';
import { getAllowedListForPrefix } from '../../../../common/ecs_allowed_list';
import { getAllCompositeData } from '../../../utils/get_all_composite_data';

interface Bucket {
key: string;
key: { dataset: string };
doc_count: number;
}

interface DataSetResponse {
modules: {
buckets: Bucket[];
};
dataSets: {
datasets: {
buckets: Bucket[];
after_key: {
dataset: string;
};
};
}

Expand Down Expand Up @@ -59,34 +64,59 @@ export class FrameworkFieldsAdapter implements FieldsAdapter {
allowNoIndices: true,
ignoreUnavailable: true,
body: {
size: 0,
aggs: {
modules: {
terms: {
field: 'event.modules',
size: 1000,
},
},
dataSets: {
terms: {
field: 'event.dataset',
size: 1000,
datasets: {
composite: {
sources: [
{
dataset: {
terms: {
field: 'event.dataset',
},
},
},
],
},
},
},
},
};
const response = await this.framework.callWithRequest<{}, DataSetResponse>(

const bucketSelector = (response: InfraDatabaseSearchResponse<{}, DataSetResponse>) =>
(response.aggregations && response.aggregations.datasets.buckets) || [];

const handleAfterKey = (
options: object,
response: InfraDatabaseSearchResponse<{}, DataSetResponse>
) => {
if (!response.aggregations) {
return options;
}
const newOptions = { ...options };
set(
newOptions,
'body.aggs.datasets.composite.after',
response.aggregations.datasets.after_key
);
return newOptions;
};

const buckets = await getAllCompositeData<DataSetResponse, Bucket>(
this.framework,
request,
'search',
params
params,
bucketSelector,
handleAfterKey
);
if (!response.aggregations) {
return { dataSets: [], modules: [] };
}
const { modules, dataSets } = response.aggregations;
return {
modules: modules.buckets.map(bucket => bucket.key),
dataSets: dataSets.buckets.map(bucket => bucket.key),
};
const dataSets = buckets.map(bucket => bucket.key.dataset);
const modules = dataSets.reduce(
(acc, dataset) => {
const module = first(dataset.split(/\./));
return module ? uniq([...acc, module]) : acc;
},
[] as string[]
);
return { modules, dataSets };
}
}
85 changes: 33 additions & 52 deletions x-pack/legacy/plugins/infra/server/lib/snapshot/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { set } from 'lodash';
import {
InfraSnapshotGroupbyInput,
InfraSnapshotMetricInput,
Expand All @@ -12,7 +13,11 @@ import {
InfraNodeType,
InfraSourceConfiguration,
} from '../../graphql/types';
import { InfraBackendFrameworkAdapter, InfraFrameworkRequest } from '../adapters/framework';
import {
InfraBackendFrameworkAdapter,
InfraFrameworkRequest,
InfraDatabaseSearchResponse,
} from '../adapters/framework';
import { InfraSources } from '../sources';

import { JsonObject } from '../../../common/typed_json';
Expand All @@ -31,6 +36,7 @@ import {
InfraSnapshotNodeMetricsBucket,
} from './response_helpers';
import { IP_FIELDS } from '../constants';
import { getAllCompositeData } from '../../utils/get_all_composite_data';

export interface InfraSnapshotRequestOptions {
nodeType: InfraNodeType;
Expand Down Expand Up @@ -63,6 +69,24 @@ export class InfraSnapshot {
}
}

// This is used by the getAllCompositeData to select the bucket to return.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love comments, but I'm not sure we need these given the parameters for getAllCompositeData() are fully typed and explain themselves?

const bucketSelector = (
response: InfraDatabaseSearchResponse<{}, InfraSnapshotAggregationResponse>
) => (response.aggregations && response.aggregations.nodes.buckets) || [];

// This is used by getAllCompositeData to add the after key to the subsequent requests
const handleAfterKey = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the two uses of handleAfterKey in this PR are very, very similar - with only the actual keys and portions of the response to be used changing - is it worth making this into a helper function too? I imagine the

  if (!response.aggregations) {
    return options;
  }

check and the merging of the options will always be the same.

options: object,
response: InfraDatabaseSearchResponse<{}, InfraSnapshotAggregationResponse>
) => {
if (!response.aggregations) {
return options;
}
const newOptions = { ...options };
set(newOptions, 'body.aggregations.nodes.composite.after', response.aggregations.nodes.after_key);
return newOptions;
};

const requestGroupedNodes = async (
request: InfraFrameworkRequest,
options: InfraSnapshotRequestOptions,
Expand Down Expand Up @@ -112,11 +136,10 @@ const requestGroupedNodes = async (
},
};

return await getAllCompositeAggregationData<InfraSnapshotNodeGroupByBucket>(
framework,
request,
query
);
return await getAllCompositeData<
InfraSnapshotAggregationResponse,
InfraSnapshotNodeGroupByBucket
>(framework, request, query, bucketSelector, handleAfterKey);
};

const requestNodeMetrics = async (
Expand Down Expand Up @@ -174,12 +197,10 @@ const requestNodeMetrics = async (
},
},
};

return await getAllCompositeAggregationData<InfraSnapshotNodeMetricsBucket>(
framework,
request,
query
);
return await getAllCompositeData<
InfraSnapshotAggregationResponse,
InfraSnapshotNodeMetricsBucket
>(framework, request, query, bucketSelector, handleAfterKey);
};

// buckets can be InfraSnapshotNodeGroupByBucket[] or InfraSnapshotNodeMetricsBucket[]
Expand All @@ -191,46 +212,6 @@ interface InfraSnapshotAggregationResponse {
};
}

const getAllCompositeAggregationData = async <BucketType>(
framework: InfraBackendFrameworkAdapter,
request: InfraFrameworkRequest,
query: any,
previousBuckets: BucketType[] = []
): Promise<BucketType[]> => {
const response = await framework.callWithRequest<{}, InfraSnapshotAggregationResponse>(
request,
'search',
query
);

// Nothing available, return the previous buckets.
if (response.hits.total.value === 0) {
return previousBuckets;
}

// if ES doesn't return an aggregations key, something went seriously wrong.
if (!response.aggregations) {
throw new Error('Whoops!, `aggregations` key must always be returned.');
}

const currentBuckets = response.aggregations.nodes.buckets;

// if there are no currentBuckets then we are finished paginating through the results
if (currentBuckets.length === 0) {
return previousBuckets;
}

// There is possibly more data, concat previous and current buckets and call ourselves recursively.
const newQuery = { ...query };
newQuery.body.aggregations.nodes.composite.after = response.aggregations.nodes.after_key;
return getAllCompositeAggregationData(
framework,
request,
query,
previousBuckets.concat(currentBuckets)
);
};

const mergeNodeBuckets = (
nodeGroupByBuckets: InfraSnapshotNodeGroupByBucket[],
nodeMetricsBuckets: InfraSnapshotNodeMetricsBucket[],
Expand Down
54 changes: 54 additions & 0 deletions x-pack/legacy/plugins/infra/server/utils/get_all_composite_data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import {
InfraBackendFrameworkAdapter,
InfraFrameworkRequest,
InfraDatabaseSearchResponse,
} from '../lib/adapters/framework';

export const getAllCompositeData = async <
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍

Aggregation = undefined,
Bucket = {},
Options extends object = {}
>(
framework: InfraBackendFrameworkAdapter,
request: InfraFrameworkRequest,
options: Options,
bucketSelector: (response: InfraDatabaseSearchResponse<{}, Aggregation>) => Bucket[],
onAfterKey: (options: Options, response: InfraDatabaseSearchResponse<{}, Aggregation>) => Options,
previousBuckets: Bucket[] = []
): Promise<Bucket[]> => {
const response = await framework.callWithRequest<{}, Aggregation>(request, 'search', options);

// Nothing available, return the previous buckets.
if (response.hits.total.value === 0) {
return previousBuckets;
}

// if ES doesn't return an aggregations key, something went seriously wrong.
if (!response.aggregations) {
throw new Error('Whoops!, `aggregations` key must always be returned.');
}

const currentBuckets = bucketSelector(response);

// if there are no currentBuckets then we are finished paginating through the results
if (currentBuckets.length === 0) {
return previousBuckets;
}

// There is possibly more data, concat previous and current buckets and call ourselves recursively.
const newOptions = onAfterKey(options, response);
return getAllCompositeData(
framework,
request,
newOptions,
bucketSelector,
onAfterKey,
previousBuckets.concat(currentBuckets)
);
};