Skip to content

Commit

Permalink
[Lens] Random sampling feature (#143221)
Browse files Browse the repository at this point in the history
* ✨ First pass with UI for random sampling

* ✨ Initial working version

* 🔥 Remove unused stuff

* 🔧 Refactor layer settings panel

* 🐛 Fix terms other query and some refactor

* 🏷️ Fix types issues

* 🐛 Fix sampling for other terms agg

* 🐛 Fix issue with count operation

* ✅ Fix jest tests

* 🐛 fix test stability

* 🐛 fix test with newer params

* 💄 Add tech preview label

* ✅ Add new tests for sampling

* ✅ Add more tests

* ✅ Add new test for suggestions

* ✅ Add functional tests for layer actions and random sampling

* Update x-pack/plugins/lens/public/datasources/form_based/layer_settings.tsx

Co-authored-by: Michael Marcialis <michael@marcial.is>

* 👌 Integrated design feedback

Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
Co-authored-by: Michael Marcialis <michael@marcial.is>
  • Loading branch information
3 people committed Oct 24, 2022
1 parent b921a49 commit 9d819ba
Show file tree
Hide file tree
Showing 31 changed files with 1,494 additions and 656 deletions.
155 changes: 155 additions & 0 deletions src/plugins/data/common/search/aggs/agg_configs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ import type { DataView } from '@kbn/data-views-plugin/common';
import { stubIndexPattern } from '../../stubs';
import { IEsSearchResponse } from '..';

// Mute moment.tz warnings about not finding a mock timezone
jest.mock('../utils', () => {
const original = jest.requireActual('../utils');
return {
...original,
getUserTimeZone: jest.fn(() => 'US/Pacific'),
};
});

describe('AggConfigs', () => {
const indexPattern: DataView = stubIndexPattern;
let typesRegistry: AggTypesRegistryStart;
Expand Down Expand Up @@ -563,6 +572,82 @@ describe('AggConfigs', () => {
'1-bucket>_count'
);
});

it('prepends a sampling agg whenever sampling is enabled', () => {
const configStates = [
{
enabled: true,
id: '1',
type: 'avg_bucket',
schema: 'metric',
params: {
customBucket: {
id: '1-bucket',
type: 'date_histogram',
schema: 'bucketAgg',
params: {
field: '@timestamp',
interval: '10s',
},
},
customMetric: {
id: '1-metric',
type: 'count',
schema: 'metricAgg',
params: {},
},
},
},
{
enabled: true,
id: '2',
type: 'terms',
schema: 'bucket',
params: {
field: 'clientip',
},
},
{
enabled: true,
id: '3',
type: 'terms',
schema: 'bucket',
params: {
field: 'machine.os.raw',
},
},
];

const ac = new AggConfigs(
indexPattern,
configStates,
{ typesRegistry, hierarchical: true, probability: 0.5 },
jest.fn()
);
const topLevelDsl = ac.toDsl();

expect(Object.keys(topLevelDsl)).toContain('sampling');
expect(Object.keys(topLevelDsl.sampling)).toEqual(['random_sampler', 'aggs']);
expect(Object.keys(topLevelDsl.sampling.aggs)).toContain('2');
expect(Object.keys(topLevelDsl.sampling.aggs['2'].aggs)).toEqual(['1', '3', '1-bucket']);
});

it('should not prepend a sampling agg when no nested agg is avaialble', () => {
const ac = new AggConfigs(
indexPattern,
[
{
enabled: true,
type: 'count',
schema: 'metric',
},
],
{ typesRegistry, probability: 0.5 },
jest.fn()
);
const topLevelDsl = ac.toDsl();
expect(Object.keys(topLevelDsl)).not.toContain('sampling');
});
});

describe('#postFlightTransform', () => {
Expand Down Expand Up @@ -854,4 +939,74 @@ describe('AggConfigs', () => {
`);
});
});

describe('isSamplingEnabled', () => {
it('should return false if probability is 1', () => {
const ac = new AggConfigs(
indexPattern,
[{ enabled: true, type: 'avg', schema: 'metric', params: { field: 'bytes' } }],
{ typesRegistry, probability: 1 },
jest.fn()
);

expect(ac.isSamplingEnabled()).toBeFalsy();
});

it('should return true if probability is less than 1', () => {
const ac = new AggConfigs(
indexPattern,
[{ enabled: true, type: 'avg', schema: 'metric', params: { field: 'bytes' } }],
{ typesRegistry, probability: 0.1 },
jest.fn()
);

expect(ac.isSamplingEnabled()).toBeTruthy();
});

it('should return false when all aggs have hasNoDsl flag enabled', () => {
const ac = new AggConfigs(
indexPattern,
[
{
enabled: true,
type: 'count',
schema: 'metric',
},
],
{ typesRegistry, probability: 1 },
jest.fn()
);

expect(ac.isSamplingEnabled()).toBeFalsy();
});

it('should return false when no nested aggs are avaialble', () => {
const ac = new AggConfigs(
indexPattern,
[{ enabled: false, type: 'avg', schema: 'metric', params: { field: 'bytes' } }],
{ typesRegistry, probability: 1 },
jest.fn()
);

expect(ac.isSamplingEnabled()).toBeFalsy();
});

it('should return true if at least one nested agg is available and probability < 1', () => {
const ac = new AggConfigs(
indexPattern,
[
{
enabled: true,
type: 'count',
schema: 'metric',
},
{ enabled: true, type: 'avg', schema: 'metric', params: { field: 'bytes' } },
],
{ typesRegistry, probability: 0.1 },
jest.fn()
);

expect(ac.isSamplingEnabled()).toBeTruthy();
});
});
});
36 changes: 34 additions & 2 deletions src/plugins/data/common/search/aggs/agg_configs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { AggTypesDependencies, GetConfigFn, getUserTimeZone } from '../..';
import { getTime, calculateBounds } from '../..';
import type { IBucketAggConfig } from './buckets';
import { insertTimeShiftSplit, mergeTimeShifts } from './utils/time_splits';
import { createSamplerAgg, isSamplingEnabled } from './utils/sampler';

function removeParentAggs(obj: any) {
for (const prop in obj) {
Expand Down Expand Up @@ -55,6 +56,8 @@ export interface AggConfigsOptions {
hierarchical?: boolean;
aggExecutionContext?: AggTypesDependencies['aggExecutionContext'];
partialRows?: boolean;
probability?: number;
samplerSeed?: number;
}

export type CreateAggConfigParams = Assign<AggConfigSerialized, { type: string | IAggType }>;
Expand Down Expand Up @@ -107,6 +110,17 @@ export class AggConfigs {
return this.opts.partialRows ?? false;
}

public get samplerConfig() {
return { probability: this.opts.probability ?? 1, seed: this.opts.samplerSeed };
}

isSamplingEnabled() {
return (
isSamplingEnabled(this.opts.probability) &&
this.getRequestAggs().filter((agg) => !agg.type.hasNoDsl).length > 0
);
}

setTimeFields(timeFields: string[] | undefined) {
this.timeFields = timeFields;
}
Expand Down Expand Up @@ -225,7 +239,7 @@ export class AggConfigs {
}

toDsl(): Record<string, any> {
const dslTopLvl = {};
const dslTopLvl: Record<string, any> = {};
let dslLvlCursor: Record<string, any>;
let nestedMetrics: Array<{ config: AggConfig; dsl: Record<string, any> }> | [];

Expand Down Expand Up @@ -254,10 +268,21 @@ export class AggConfigs {
(config) => 'splitForTimeShift' in config.type && config.type.splitForTimeShift(config, this)
);

if (this.isSamplingEnabled()) {
dslTopLvl.sampling = createSamplerAgg({
probability: this.opts.probability ?? 1,
seed: this.opts.samplerSeed,
});
}

requestAggs.forEach((config: AggConfig, i: number, list) => {
if (!dslLvlCursor) {
// start at the top level
dslLvlCursor = dslTopLvl;
// when sampling jump directly to the aggs
if (this.isSamplingEnabled()) {
dslLvlCursor = dslLvlCursor.sampling.aggs;
}
} else {
const prevConfig: AggConfig = list[i - 1];
const prevDsl = dslLvlCursor[prevConfig.id];
Expand Down Expand Up @@ -452,7 +477,12 @@ export class AggConfigs {
doc_count: response.rawResponse.hits?.total as estypes.AggregationsAggregate,
};
}
const aggCursor = transformedRawResponse.aggregations!;
const aggCursor = this.isSamplingEnabled()
? (transformedRawResponse.aggregations!.sampling! as Record<
string,
estypes.AggregationsAggregate
>)
: transformedRawResponse.aggregations!;

mergeTimeShifts(this, aggCursor);
return {
Expand Down Expand Up @@ -531,6 +561,8 @@ export class AggConfigs {
metricsAtAllLevels: this.hierarchical,
partialRows: this.partialRows,
aggs: this.aggs.map((agg) => buildExpression(agg.toExpressionAst())),
probability: this.opts.probability,
samplerSeed: this.opts.samplerSeed,
}),
]).toAst();
}
Expand Down
Loading

0 comments on commit 9d819ba

Please sign in to comment.