Skip to content

Commit

Permalink
CloudWatch Logs: Disable query path using websockets (Live) feature (#…
Browse files Browse the repository at this point in the history
…39231)

* Use only non live branch for querying logs.

* Update tests

* fix lint
  • Loading branch information
aocenas committed Sep 15, 2021
1 parent 264946f commit 515d6af
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 158 deletions.
45 changes: 16 additions & 29 deletions public/app/plugins/datasource/cloudwatch/datasource.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { from, lastValueFrom, of } from 'rxjs';
import { setBackendSrv, setDataSourceSrv, setGrafanaLiveSrv } from '@grafana/runtime';
import { ArrayVector, dataFrameToJSON, dateTime, Field, MutableDataFrame } from '@grafana/data';
import { lastValueFrom, of } from 'rxjs';
import { setBackendSrv, setDataSourceSrv } from '@grafana/runtime';
import { ArrayVector, DataFrame, dataFrameToJSON, dateTime, Field, MutableDataFrame } from '@grafana/data';

import { TemplateSrv } from '../../../features/templating/template_srv';
import { CloudWatchDatasource } from './datasource';
import { toArray } from 'rxjs/operators';
import { CloudWatchLogsQueryStatus } from './types';

describe('datasource', () => {
describe('query', () => {
Expand Down Expand Up @@ -143,16 +144,15 @@ function setup({ data = [] }: { data?: any } = {}) {
}

function setupForLogs() {
const { datasource, fetchMock } = setup({
data: {
results: {
a: {
refId: 'a',
frames: [dataFrameToJSON(new MutableDataFrame({ fields: [], meta: { custom: { channelName: 'test' } } }))],
},
},
},
});
function envelope(frame: DataFrame) {
return { data: { results: { a: { refId: 'a', frames: [dataFrameToJSON(frame)] } } } };
}

const { datasource, fetchMock } = setup();

const startQueryFrame = new MutableDataFrame({ fields: [{ name: 'queryId', values: ['queryid'] }] });
fetchMock.mockReturnValueOnce(of(envelope(startQueryFrame)));

const logsFrame = new MutableDataFrame({
fields: [
{
Expand All @@ -168,23 +168,10 @@ function setupForLogs() {
values: new ArrayVector(['1-613f0d6b-3e7cb34375b60662359611bd']),
},
],
meta: { custom: { Status: CloudWatchLogsQueryStatus.Complete } },
});
setGrafanaLiveSrv({
getStream() {
return from([
{
type: 'message',
message: {
results: {
a: {
frames: [dataFrameToJSON(logsFrame)],
},
},
},
},
]);
},
} as any);

fetchMock.mockReturnValueOnce(of(envelope(logsFrame)));

setDataSourceSrv({
async get() {
Expand Down
160 changes: 31 additions & 129 deletions public/app/plugins/datasource/cloudwatch/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,8 @@ import React from 'react';
import angular from 'angular';
import { find, isEmpty, isString, set } from 'lodash';
import { from, lastValueFrom, merge, Observable, of, throwError, zip } from 'rxjs';
import {
catchError,
concatMap,
filter,
finalize,
map,
mergeMap,
repeat,
scan,
share,
takeWhile,
tap,
} from 'rxjs/operators';
import { DataSourceWithBackend, getBackendSrv, getGrafanaLiveSrv, toDataQueryResponse } from '@grafana/runtime';
import { catchError, concatMap, finalize, map, mergeMap, repeat, scan, share, takeWhile, tap } from 'rxjs/operators';
import { DataSourceWithBackend, getBackendSrv, toDataQueryResponse } from '@grafana/runtime';
import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider';
import {
DataFrame,
Expand All @@ -24,9 +12,6 @@ import {
DataQueryResponse,
DataSourceInstanceSettings,
dateMath,
LiveChannelEvent,
LiveChannelMessageEvent,
LiveChannelScope,
LoadingState,
LogRowModel,
rangeUtil,
Expand Down Expand Up @@ -64,7 +49,6 @@ import { CloudWatchLanguageProvider } from './language_provider';
import { VariableWithMultiSupport } from 'app/features/variables/types';
import { increasingInterval } from './utils/rxjs/increasingInterval';
import { toTestingStatus } from '@grafana/runtime/src/utils/queryResponse';
import config from 'app/core/config';
import { addDataLinksToLogsResponse } from './utils/datalinks';

const DS_QUERY_ENDPOINT = '/api/ds/query';
Expand Down Expand Up @@ -147,6 +131,12 @@ export class CloudWatchDatasource extends DataSourceWithBackend<CloudWatchQuery,
return merge(...dataQueryResponses);
}

/**
* Handle log query. The log query works by starting the query on the CloudWatch and then periodically polling for
* results.
* @param logQueries
* @param options
*/
handleLogQueries = (
logQueries: CloudWatchLogsQuery[],
options: DataQueryRequest<CloudWatchQuery>
Expand All @@ -161,116 +151,17 @@ export class CloudWatchDatasource extends DataSourceWithBackend<CloudWatchQuery,
return of({ data: [], state: LoadingState.Done });
}

const response = config.liveEnabled
? this.handleLiveLogQueries(validLogQueries, options)
: this.handleLegacyLogQueries(validLogQueries, options);

return response.pipe(
mergeMap((dataQueryResponse) => {
return from(
(async () => {
await addDataLinksToLogsResponse(
dataQueryResponse,
options,
this.timeSrv.timeRange(),
this.replace.bind(this),
this.getActualRegion.bind(this),
this.tracingDataSourceUid
);

return dataQueryResponse;
})()
);
})
);
};

/**
* Handle log query using grafana live feature. This means the backend will return a websocket channel name and it
* will listen on it for partial responses until it's terminated. This should give quicker partial data to the user
* as the log query can be long running. This requires that config.liveEnabled === true as that controls whether
* websocket connections can be made.
* @param logQueries
* @param options
*/
private handleLiveLogQueries = (
logQueries: CloudWatchLogsQuery[],
options: DataQueryRequest<CloudWatchQuery>
): Observable<DataQueryResponse> => {
const queryParams = logQueries.map((target: CloudWatchLogsQuery) => ({
intervalMs: 1, // dummy
maxDataPoints: 1, // dummy
datasourceId: this.id,
queryString: this.replace(target.expression, options.scopedVars, true),
refId: target.refId,
logGroupNames: target.logGroupNames?.map((logGroup) =>
this.replace(logGroup, options.scopedVars, true, 'log groups')
),
statsGroups: target.statsGroups,
region: this.getActualRegion(this.replace(target.region, options.scopedVars, true, 'region')),
type: 'liveLogAction',
}));

const range = this.timeSrv.timeRange();

const requestParams = {
from: range.from.valueOf().toString(),
to: range.to.valueOf().toString(),
queries: queryParams,
};

return this.awsRequest(DS_QUERY_ENDPOINT, requestParams).pipe(
mergeMap((response: TSDBResponse) => {
const dataQueryResponse = toDataQueryResponse({ data: response }, options.targets);
const channelName: string = dataQueryResponse.data[0].meta.custom.channelName;
return getGrafanaLiveSrv().getStream({
scope: LiveChannelScope.Plugin,
namespace: 'cloudwatch',
path: channelName,
});
}),
filter((e: LiveChannelEvent<any>) => e.type === 'message'),
map(({ message }: LiveChannelMessageEvent<TSDBResponse>) => {
const dataQueryResponse = toDataQueryResponse({
data: message,
});
dataQueryResponse.state = dataQueryResponse.data.every((dataFrame) =>
statusIsTerminated(dataFrame.meta?.custom?.['Status'])
)
? LoadingState.Done
: LoadingState.Loading;
dataQueryResponse.key = message.results[Object.keys(message.results)[0]].refId;
return dataQueryResponse;
}),
catchError((err) => {
if (err.data?.error) {
throw err.data.error;
}

throw err;
})
);
};

/**
* Handle query the old way (see handleLiveLogQueries) when websockets are not enabled. As enabling websockets is
* configurable we will have to be able to degrade gracefully for the time being.
* @param logQueries
* @param options
*/
private handleLegacyLogQueries = (
logQueries: CloudWatchLogsQuery[],
options: DataQueryRequest<CloudWatchQuery>
): Observable<DataQueryResponse> => {
const queryParams = logQueries.map((target: CloudWatchLogsQuery) => ({
queryString: target.expression,
refId: target.refId,
logGroupNames: target.logGroupNames,
region: this.replace(this.getActualRegion(target.region), options.scopedVars, true, 'region'),
}));

// This first starts the query which returns queryId which can be used to retrieve results.
return this.makeLogActionRequest('StartQuery', queryParams, options.scopedVars).pipe(
mergeMap((dataFrames) =>
// This queries for the results
this.logsQuery(
dataFrames.map((dataFrame) => ({
queryId: dataFrame.fields[0].values.get(0),
Expand All @@ -280,7 +171,23 @@ export class CloudWatchDatasource extends DataSourceWithBackend<CloudWatchQuery,
.statsGroups,
}))
)
)
),
mergeMap((dataQueryResponse) => {
return from(
(async () => {
await addDataLinksToLogsResponse(
dataQueryResponse,
options,
this.timeSrv.timeRange(),
this.replace.bind(this),
this.getActualRegion.bind(this),
this.tracingDataSourceUid
);

return dataQueryResponse;
})()
);
})
);
};

Expand Down Expand Up @@ -328,6 +235,10 @@ export class CloudWatchDatasource extends DataSourceWithBackend<CloudWatchQuery,
return this.performTimeSeriesQuery(request, options.range);
};

/**
* Checks progress and polls data of a started logs query with some retry logic.
* @param queryParams
*/
logsQuery(
queryParams: Array<{
queryId: string;
Expand Down Expand Up @@ -1041,12 +952,3 @@ function parseLogGroupName(logIdentifier: string): string {
const colonIndex = logIdentifier.lastIndexOf(':');
return logIdentifier.substr(colonIndex + 1);
}

function statusIsTerminated(status: string | CloudWatchLogsQueryStatus) {
return [
CloudWatchLogsQueryStatus.Complete,
CloudWatchLogsQueryStatus.Cancelled,
CloudWatchLogsQueryStatus.Failed,
CloudWatchLogsQueryStatus.Timeout,
].includes(status as CloudWatchLogsQueryStatus);
}

0 comments on commit 515d6af

Please sign in to comment.