Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explore: Support mixed data sources for supplementary query #63036

Merged
merged 64 commits into from Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
0e94112
Consolidate logs volume logic (full range and limited)
ifrost Jan 24, 2023
f4de1ea
Merge branch 'main' into ifrost/consolidate-logs-volume-logic
ifrost Jan 30, 2023
76d14fa
Fix showing limited histogram message
ifrost Jan 30, 2023
012d9f2
Test passing meta data to logs volume provider
ifrost Jan 30, 2023
571fe12
Merge branch 'main' into ifrost/consolidate-logs-volume-logic
ifrost Feb 1, 2023
1d604df
Improve readability
ifrost Feb 1, 2023
f4190d1
Clean up types
ifrost Feb 1, 2023
aec436c
Add basic support for multiple log volumes
ifrost Feb 3, 2023
12ec6fb
Merge branch 'main' into ifrost/consolidate-logs-volume-logic
ifrost Feb 3, 2023
7da84bd
Move the comment back to the right place
ifrost Feb 3, 2023
7ce6677
Improve readability
ifrost Feb 3, 2023
4d620e6
Merge branch 'ifrost/consolidate-logs-volume-logic' into ifrost/mixed…
ifrost Feb 3, 2023
6db9023
Clean up the logic to support Logs Samples
ifrost Feb 3, 2023
8837186
Update docs
ifrost Feb 3, 2023
8484aa1
Sort log volumes
ifrost Feb 3, 2023
94fd719
Provide title to logs volume panel
ifrost Feb 3, 2023
942e103
Merge branch 'main' into ifrost/mixed-data-source-and-supplementary-q…
ifrost Feb 6, 2023
85078e2
Merge branch 'main' into ifrost/consolidate-logs-volume-logic
ifrost Feb 6, 2023
e1fcf3d
Merge branch 'ifrost/consolidate-logs-volume-logic' into ifrost/mixed…
ifrost Feb 6, 2023
4917b1a
Move logs volume cache to the provider factory
ifrost Feb 7, 2023
9f88a91
Add helper functions
ifrost Feb 7, 2023
7c7fd15
Merge branch 'main' into ifrost/mixed-data-source-and-supplementary-q…
ifrost Feb 7, 2023
d26f5ec
Reuse only if queries are the same
ifrost Feb 7, 2023
3c729b2
Fix alphabetical sorting
ifrost Feb 7, 2023
e44540f
Move caching out of the provider
ifrost Feb 8, 2023
d73125d
Support errors and loading state
ifrost Feb 8, 2023
060f9f7
Remove unused code
ifrost Feb 8, 2023
6cbcec2
Consolidate supplementary query utils
ifrost Feb 9, 2023
b195bd1
Add tests for supplementaryQueries
ifrost Feb 9, 2023
3088e45
Update tests
ifrost Feb 9, 2023
5b87e70
Merge branch 'main' into ifrost/mixed-data-source-and-supplementary-q…
ifrost Feb 9, 2023
6a95285
Simplify logs volume extra info
ifrost Feb 9, 2023
9683748
Update tests
ifrost Feb 9, 2023
aeb3041
Remove comment
ifrost Feb 9, 2023
ab41f33
Update tests
ifrost Feb 9, 2023
b14bdb1
Fix hiding the histogram for hidden queries
ifrost Feb 10, 2023
0812579
Simplify loading message
ifrost Feb 10, 2023
b483d13
Update tests
ifrost Feb 10, 2023
945b07f
Merge branch 'main' into ifrost/mixed-data-source-and-supplementary-q…
ifrost Feb 10, 2023
cf638a5
Wait for full fallback histogram to load before showing it
ifrost Feb 10, 2023
994b4d6
Fix a typo
ifrost Feb 10, 2023
fe1833f
Add feedback comments
ifrost Feb 10, 2023
c74bec5
Move feedback comments to github
ifrost Feb 20, 2023
ade2e2f
Do not filter out hidden queries as they may be used as references in…
ifrost Feb 20, 2023
d52650e
Merge branch 'main' into ifrost/mixed-data-source-and-supplementary-q…
ifrost Feb 20, 2023
89bd095
Group log volume by refId
ifrost Feb 21, 2023
f7ed140
Support showing fallback histograms per query to avoid duplicates
ifrost Feb 21, 2023
020a035
Merge branch 'main' into ifrost/mixed-data-source-and-supplementary-q…
ifrost Feb 21, 2023
589a978
Improve type-checking
ifrost Feb 21, 2023
14dda0b
Fix supplementaryQueries.test.ts
ifrost Feb 22, 2023
d930c2e
Fix logsModel.test.ts
ifrost Feb 22, 2023
c6d8eec
Fix loading fallback results
ifrost Feb 22, 2023
ceb2117
Fix unit tests
ifrost Feb 22, 2023
e6c4e8f
Merge branch 'main' into ifrost/mixed-data-source-and-supplementary-q…
ifrost Feb 23, 2023
6c70ef7
Merge branch 'main' into ifrost/mixed-data-source-and-supplementary-q…
ifrost Feb 24, 2023
c66d958
WIP
ifrost Feb 24, 2023
46ef4b1
Merge branch 'main' into ifrost/mixed-data-source-and-supplementary-q…
ifrost Mar 2, 2023
d74a32b
Update deprecated styles
ifrost Mar 2, 2023
836895f
Simplify test
ifrost Mar 2, 2023
b74adfb
Simplify rendering zoom info
ifrost Mar 2, 2023
559805c
Update deprecated styles
ifrost Mar 2, 2023
7ab13b6
Simplify getLogsVolumeDataSourceInfo
ifrost Mar 3, 2023
32e1e50
Simplify isLogsVolumeLimited()
ifrost Mar 3, 2023
34a67ff
Simplify rendering zoom info
ifrost Mar 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions .betterer.results
Expand Up @@ -2278,10 +2278,6 @@ exports[`better eslint`] = {
"public/app/core/history/richHistoryLocalStorageUtils.ts:5381": [
[0, 0, 0, "Unexpected any. Specify a different type.", "0"]
],
"public/app/core/logsModel.ts:5381": [
[0, 0, 0, "Do not use any type assertions.", "0"],
[0, 0, 0, "Do not use any type assertions.", "1"]
],
ifrost marked this conversation as resolved.
Show resolved Hide resolved
"public/app/core/navigation/GrafanaRoute.test.tsx:5381": [
[0, 0, 0, "Unexpected any. Specify a different type.", "0"],
[0, 0, 0, "Unexpected any. Specify a different type.", "1"]
Expand Down
30 changes: 30 additions & 0 deletions packages/grafana-data/src/types/logs.ts
Expand Up @@ -91,6 +91,7 @@ export interface LogsModel {
// visibleRange is time range for histogram created from log results
visibleRange?: AbsoluteTimeRange;
queries?: DataQuery[];
bucketSize?: number;
}

export interface LogSearchMatch {
Expand Down Expand Up @@ -205,6 +206,35 @@ export enum LogsVolumeType {
Limited = 'Limited',
}

/**
* Custom meta information required by Logs Volume responses
*/
export type LogsVolumeCustomMetaData = {
absoluteRange: AbsoluteTimeRange;
logsVolumeType: LogsVolumeType;
datasourceName: string;
sourceQuery: DataQuery;
};

export const getLogsVolumeAbsoluteRange = (
dataFrames: DataFrame[],
defaultRange: AbsoluteTimeRange
): AbsoluteTimeRange => {
return dataFrames[0].meta?.custom?.absoluteRange || defaultRange;
};

export const getLogsVolumeDataSourceInfo = (dataFrames: DataFrame[]): { uid: string; name: string; refId: string } => {
ifrost marked this conversation as resolved.
Show resolved Hide resolved
return {
uid: dataFrames[0]?.meta?.custom?.datasourceUid || '',
name: dataFrames[0]?.meta?.custom?.datasourceName || '',
refId: dataFrames[0]?.meta?.custom?.sourceQuery?.refId || '',
};
};

export const isLogsVolumeLimited = (dataFrames: DataFrame[]) => {
return dataFrames.length && dataFrames[0].meta?.custom?.logsVolumeType === LogsVolumeType.Limited;
ifrost marked this conversation as resolved.
Show resolved Hide resolved
};

/**
* Data sources that support supplementary queries in Explore.
* This will enable users to see additional data when running original queries.
Expand Down
74 changes: 21 additions & 53 deletions public/app/core/logsModel.test.ts
Expand Up @@ -28,8 +28,8 @@ import {
getSeriesProperties,
LIMIT_LABEL,
logSeriesToLogsModel,
queryLogsVolume,
queryLogsSample,
queryLogsVolume,
} from './logsModel';

const FROM = dateTimeParse('2021-06-17 00:00:00', { timeZone: 'utc' });
Expand Down Expand Up @@ -1122,8 +1122,9 @@ describe('logs volume', () => {
datasource: MockObservableDataSourceApi,
request: DataQueryRequest<TestDataQuery>;

function createFrame(labels: object, timestamps: number[], values: number[]) {
function createFrame(labels: object, timestamps: number[], values: number[], refId: string) {
return toDataFrame({
refId,
fields: [
{ name: 'Time', type: FieldType.time, values: timestamps },
{
Expand All @@ -1136,20 +1137,13 @@ describe('logs volume', () => {
});
}

function createExpectedFields(levelName: string) {
return [
expect.objectContaining({ name: 'Time' }),
expect.objectContaining({
name: 'Value',
config: expect.objectContaining({ displayNameFromDS: levelName }),
}),
];
}

function setup(datasourceSetup: () => void) {
datasourceSetup();
request = {
targets: [{ target: 'volume query 1' }, { target: 'volume query 2' }],
targets: [
{ refId: 'A', target: 'volume query 1' },
{ refId: 'B', target: 'volume query 2' },
],
scopedVars: {},
} as unknown as DataQueryRequest<TestDataQuery>;
volumeProvider = queryLogsVolume(datasource, request, {
Expand All @@ -1167,37 +1161,39 @@ describe('logs volume', () => {

function setupMultipleResults() {
// level=unknown
const resultAFrame1 = createFrame({ app: 'app01' }, [100, 200, 300], [5, 5, 5]);
const resultAFrame1 = createFrame({ app: 'app01' }, [100, 200, 300], [5, 5, 5], 'A');
// level=error
const resultAFrame2 = createFrame({ app: 'app01', level: 'error' }, [100, 200, 300], [0, 1, 0]);
const resultAFrame2 = createFrame({ app: 'app01', level: 'error' }, [100, 200, 300], [0, 1, 0], 'B');
// level=unknown
const resultBFrame1 = createFrame({ app: 'app02' }, [100, 200, 300], [1, 2, 3]);
const resultBFrame1 = createFrame({ app: 'app02' }, [100, 200, 300], [1, 2, 3], 'A');
// level=error
const resultBFrame2 = createFrame({ app: 'app02', level: 'error' }, [100, 200, 300], [1, 1, 1]);
const resultBFrame2 = createFrame({ app: 'app02', level: 'error' }, [100, 200, 300], [1, 1, 1], 'B');

datasource = new MockObservableDataSourceApi('loki', [
{
state: LoadingState.Loading,
data: [resultAFrame1, resultAFrame2],
},
{
state: LoadingState.Done,
data: [resultBFrame1, resultBFrame2],
},
]);
}

function setupMultipleResultsStreaming() {
// level=unknown
const resultAFrame1 = createFrame({ app: 'app01' }, [100, 200, 300], [5, 5, 5]);
const resultAFrame1 = createFrame({ app: 'app01' }, [100, 200, 300], [5, 5, 5], 'A');
// level=error
const resultAFrame2 = createFrame({ app: 'app01', level: 'error' }, [100, 200, 300], [0, 1, 0]);
const resultAFrame2 = createFrame({ app: 'app01', level: 'error' }, [100, 200, 300], [0, 1, 0], 'B');

datasource = new MockObservableDataSourceApi('loki', [
{
state: LoadingState.Streaming,
data: [resultAFrame1],
},
{
state: LoadingState.Streaming,
state: LoadingState.Done,
data: [resultAFrame1, resultAFrame2],
},
]);
Expand All @@ -1220,14 +1216,8 @@ describe('logs volume', () => {
fields: expect.anything(),
meta: {
custom: {
targets: [
{
target: 'volume query 1',
},
{
target: 'volume query 2',
},
],
sourceQuery: { refId: 'A', target: 'volume query 1' },
datasourceName: 'loki',
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: {
from: FROM.valueOf(),
Expand All @@ -1242,7 +1232,7 @@ describe('logs volume', () => {
});
});

it('applies correct meta datya when streaming', async () => {
it('applies correct meta data when streaming', async () => {
setup(setupMultipleResultsStreaming);

await expect(volumeProvider).toEmitValuesWith((received) => {
Expand All @@ -1255,14 +1245,8 @@ describe('logs volume', () => {
fields: expect.anything(),
meta: {
custom: {
targets: [
{
target: 'volume query 1',
},
{
target: 'volume query 2',
},
],
sourceQuery: { refId: 'A', target: 'volume query 1' },
datasourceName: 'loki',
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: {
from: FROM.valueOf(),
Expand All @@ -1277,22 +1261,6 @@ describe('logs volume', () => {
});
});

it('aggregates data frames by level', async () => {
setup(setupMultipleResults);

await expect(volumeProvider).toEmitValuesWith((received) => {
expect(received).toContainEqual({
state: LoadingState.Done,
error: undefined,
data: expect.arrayContaining([
expect.objectContaining({
fields: expect.arrayContaining(createExpectedFields('error')),
}),
]),
});
});
});

it('returns error', async () => {
setup(setupErrorResponse);

Expand Down
124 changes: 41 additions & 83 deletions public/app/core/logsModel.ts
@@ -1,4 +1,4 @@
import { size } from 'lodash';
import { groupBy, size } from 'lodash';
import { from, isObservable, Observable } from 'rxjs';

import {
Expand All @@ -13,7 +13,6 @@ import {
dateTimeFormatTimeAgo,
FieldCache,
FieldColorModeId,
FieldConfig,
FieldType,
FieldWithIndex,
findCommonLabels,
Expand All @@ -26,8 +25,8 @@ import {
LogsMetaItem,
LogsMetaKind,
LogsModel,
LogsVolumeCustomMetaData,
LogsVolumeType,
MutableDataFrame,
rangeUtil,
ScopedVars,
textUtil,
Expand Down Expand Up @@ -222,6 +221,7 @@ export function dataFrameToLogsModel(
absoluteRange
);
logsModel.visibleRange = visibleRange;
logsModel.bucketSize = bucketSize;
logsModel.series = makeDataFramesForLogs(sortedRows, bucketSize);

if (logsModel.meta) {
Expand Down Expand Up @@ -602,69 +602,22 @@ function getLogVolumeFieldConfig(level: LogLevel, oneLevelDetected: boolean) {
};
}

/**
* Take multiple data frames, sum up values and group by level.
* Return a list of data frames, each representing single level.
*/
export function aggregateRawLogsVolume(
rawLogsVolume: DataFrame[],
extractLevel: (dataFrame: DataFrame) => LogLevel
): DataFrame[] {
const logsVolumeByLevelMap: Partial<Record<LogLevel, DataFrame[]>> = {};

rawLogsVolume.forEach((dataFrame) => {
const level = extractLevel(dataFrame);
if (!logsVolumeByLevelMap[level]) {
logsVolumeByLevelMap[level] = [];
const updateLogsVolumeConfig = (
dataFrame: DataFrame,
extractLevel: (dataFrame: DataFrame) => LogLevel,
oneLevelDetected: boolean
): DataFrame => {
dataFrame.fields = dataFrame.fields.map((field) => {
if (field.type === FieldType.number) {
field.config = {
...field.config,
...getLogVolumeFieldConfig(extractLevel(dataFrame), oneLevelDetected),
};
}
logsVolumeByLevelMap[level]!.push(dataFrame);
});

return Object.keys(logsVolumeByLevelMap).map((level: string) => {
return aggregateFields(
logsVolumeByLevelMap[level as LogLevel]!,
getLogVolumeFieldConfig(level as LogLevel, Object.keys(logsVolumeByLevelMap).length === 1)
);
return field;
});
}

/**
* Aggregate multiple data frames into a single data frame by adding values.
* Multiple data frames for the same level are passed here to get a single
* data frame for a given level. Aggregation by level happens in aggregateRawLogsVolume()
*/
function aggregateFields(dataFrames: DataFrame[], config: FieldConfig): DataFrame {
const aggregatedDataFrame = new MutableDataFrame();
if (!dataFrames.length) {
return aggregatedDataFrame;
}

const totalLength = dataFrames[0].length;
const timeField = new FieldCache(dataFrames[0]).getFirstFieldOfType(FieldType.time);

if (!timeField) {
return aggregatedDataFrame;
}

aggregatedDataFrame.addField({ name: 'Time', type: FieldType.time }, totalLength);
aggregatedDataFrame.addField({ name: 'Value', type: FieldType.number, config }, totalLength);

dataFrames.forEach((dataFrame) => {
dataFrame.fields.forEach((field) => {
if (field.type === FieldType.number) {
for (let pointIndex = 0; pointIndex < totalLength; pointIndex++) {
const currentValue = aggregatedDataFrame.get(pointIndex).Value;
const valueToAdd = field.values.get(pointIndex);
const totalValue =
currentValue === null && valueToAdd === null ? null : (currentValue || 0) + (valueToAdd || 0);
aggregatedDataFrame.set(pointIndex, { Value: totalValue, Time: timeField.values.get(pointIndex) });
}
}
});
});

return aggregatedDataFrame;
}
return dataFrame;
};

type LogsVolumeQueryOptions<T extends DataQuery> = {
extractLevel: (dataFrame: DataFrame) => LogLevel;
Expand Down Expand Up @@ -694,7 +647,7 @@ export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataS
logsVolumeRequest.hideFromInspector = true;

return new Observable((observer) => {
let rawLogsVolume: DataFrame[] = [];
let logsVolumeData: DataFrame[] = [];
observer.next({
state: LoadingState.Loading,
error: undefined,
Expand All @@ -706,11 +659,6 @@ export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataS

const subscription = queryObservable.subscribe({
complete: () => {
observer.next({
state: LoadingState.Done,
error: undefined,
data: rawLogsVolume,
});
observer.complete();
},
next: (dataQueryResponse: DataQueryResponse) => {
Expand All @@ -723,24 +671,34 @@ export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataS
});
observer.error(error);
} else {
const aggregatedLogsVolume = aggregateRawLogsVolume(
dataQueryResponse.data.map(toDataFrame),
options.extractLevel
);
if (aggregatedLogsVolume[0]) {
aggregatedLogsVolume[0].meta = {
const framesByRefId = groupBy(dataQueryResponse.data, 'refId');
logsVolumeData = dataQueryResponse.data.map((dataFrame) => {
let sourceRefId = dataFrame.refId || '';
if (sourceRefId.startsWith('log-volume-')) {
sourceRefId = sourceRefId.substr('log-volume-'.length);
ifrost marked this conversation as resolved.
Show resolved Hide resolved
}

const logsVolumeCustomMetaData: LogsVolumeCustomMetaData = {
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() },
datasourceName: datasource.name,
sourceQuery: options.targets.find((dataQuery) => dataQuery.refId === sourceRefId)!,
};

dataFrame.meta = {
...dataFrame.meta,
custom: {
targets: options.targets,
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() },
...dataFrame.meta?.custom,
...logsVolumeCustomMetaData,
},
};
}
rawLogsVolume = aggregatedLogsVolume;
return updateLogsVolumeConfig(dataFrame, options.extractLevel, framesByRefId[dataFrame.refId].length === 1);
});

observer.next({
state: dataQueryResponse.state ?? LoadingState.Streaming,
state: dataQueryResponse.state,
error: undefined,
data: rawLogsVolume,
data: logsVolumeData,
});
}
},
Expand Down