Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live: support streaming results out-of-the-box #32821

Merged
merged 11 commits into from Apr 9, 2021
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -45,7 +45,7 @@ require (
github.com/grafana/grafana-aws-sdk v0.4.0
github.com/grafana/grafana-live-sdk v0.0.4
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.91.0
github.com/grafana/grafana-plugin-sdk-go v0.91.1-0.20210408195819-266fa63c3826
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed so that meta.channel is parsed

github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2
github.com/hashicorp/go-hclog v0.15.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Expand Up @@ -840,8 +840,9 @@ github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SP
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To=
github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60=
github.com/grafana/grafana-plugin-sdk-go v0.88.0/go.mod h1:PTALh0lz+Y7k0+OMczAABTpeocL63aw6FVOBptp5GVo=
github.com/grafana/grafana-plugin-sdk-go v0.91.0 h1:kiPS3NqR+KOvHrc32EkX7D40JON3+GYZ6Nm2WOtCElQ=
github.com/grafana/grafana-plugin-sdk-go v0.91.0/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU=
github.com/grafana/grafana-plugin-sdk-go v0.91.1-0.20210408195819-266fa63c3826 h1:2jafeRCAkBtM3QWQKZ+/0HwUGKl7V4xjTxiTeXcl5nk=
github.com/grafana/grafana-plugin-sdk-go v0.91.1-0.20210408195819-266fa63c3826/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU=
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 h1:iwcM8lkYJ3EhytGLJ2BvRSwutb0QWoI7EWbYv3yJRsY=
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387/go.mod h1:jHA1OHnPsuj3LLgMXmFopsKDt4ARHHUhrmT3JrGf71g=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
Expand Down
3 changes: 3 additions & 0 deletions packages/grafana-data/src/types/data.ts
Expand Up @@ -42,6 +42,9 @@ export interface QueryResultMeta {
/** Currently used to show results in Explore only in preferred visualisation option */
preferredVisualisationType?: PreferredVisualisationType;

/** The path for live stream updates for this frame */
channel?: string;

/**
* Optionally identify which topic the frame should be assigned to.
* A value specified in the response will override what the request asked for.
Expand Down
2 changes: 1 addition & 1 deletion packages/grafana-runtime/src/index.ts
Expand Up @@ -6,7 +6,7 @@
export * from './services';
export * from './config';
export * from './types';
export * from './measurement';
export * from './utils/liveQuery';
export { loadPluginCss, SystemJS, PluginCssOptions } from './utils/plugin';
export { reportMetaAnalytics } from './utils/analytics';
export { logInfo, logDebug, logWarning, logError } from './utils/logging';
Expand Down
1 change: 0 additions & 1 deletion packages/grafana-runtime/src/measurement/index.ts

This file was deleted.

@@ -1,6 +1,13 @@
import { BackendSrv, BackendSrvRequest } from 'src/services';
import { DataSourceWithBackend } from './DataSourceWithBackend';
import { DataSourceJsonData, DataQuery, DataSourceInstanceSettings, DataQueryRequest } from '@grafana/data';
import { DataSourceWithBackend, toStreamingDataResponse } from './DataSourceWithBackend';
import {
DataSourceJsonData,
DataQuery,
DataSourceInstanceSettings,
DataQueryRequest,
DataQueryResponseData,
MutableDataFrame,
} from '@grafana/data';
import { of } from 'rxjs';

class MyDataSource extends DataSourceWithBackend<DataQuery, DataSourceJsonData> {
Expand Down Expand Up @@ -73,4 +80,26 @@ describe('DataSourceWithBackend', () => {
}
`);
});

test('it converts results with channels to streaming queries', () => {
const request: DataQueryRequest = {
intervalMs: 100,
} as DataQueryRequest;

const rsp: DataQueryResponseData = {
data: [],
};

// Simple empty query
let obs = toStreamingDataResponse(request, rsp);
expect(obs).toBeDefined();

let frame = new MutableDataFrame();
frame.meta = {
channel: 'a/b/c',
};
rsp.data = [frame];
obs = toStreamingDataResponse(request, rsp);
expect(obs).toBeDefined();
});
});
55 changes: 51 additions & 4 deletions packages/grafana-runtime/src/utils/DataSourceWithBackend.ts
Expand Up @@ -7,11 +7,15 @@ import {
DataSourceJsonData,
ScopedVars,
makeClassES5Compatible,
DataFrame,
parseLiveChannelAddress,
StreamingFrameOptions,
} from '@grafana/data';
import { Observable, of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
import { merge, Observable, of } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';
import { getBackendSrv, getDataSourceSrv } from '../services';
import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse';
import { getLiveDataStream } from './liveQuery';

const ExpressionDatasourceID = '__expr__';

Expand Down Expand Up @@ -132,8 +136,13 @@ class DataSourceWithBackend<
requestId,
})
.pipe(
map((rsp) => {
return toDataQueryResponse(rsp, queries as DataQuery[]);
switchMap((raw) => {
const rsp = toDataQueryResponse(raw, queries as DataQuery[]);
// Check if any response should subscribe to a live stream
if (rsp.data?.length && rsp.data.find((f: DataFrame) => f.meta?.channel)) {
return toStreamingDataResponse(request, rsp);
}
return of(rsp);
}),
catchError((err) => {
return of(toDataQueryResponse(err));
Expand Down Expand Up @@ -209,6 +218,44 @@ class DataSourceWithBackend<
}
}

export function toStreamingDataResponse(
request: DataQueryRequest,
rsp: DataQueryResponse
): Observable<DataQueryResponse> {
const buffer: StreamingFrameOptions = {
maxLength: request.maxDataPoints ?? 500,
};

// For recent queries, clamp to the current time range
if (request.rangeRaw?.to === 'now') {
buffer.maxDelta = request.range.to.valueOf() - request.range.from.valueOf();
}

const staticdata: DataFrame[] = [];
const streams: Array<Observable<DataQueryResponse>> = [];
for (const frame of rsp.data) {
const addr = parseLiveChannelAddress(frame.meta?.channel);
if (addr) {
streams.push(
getLiveDataStream({
addr,
buffer,
frame: frame as DataFrame,
})
);
} else {
staticdata.push(frame);
}
}
if (staticdata.length) {
streams.push(of({ ...rsp, data: staticdata }));
}
if (streams.length === 1) {
return streams[0]; // avoid merge wrapper
}
return merge(...streams);
}

//@ts-ignore
DataSourceWithBackend = makeClassES5Compatible(DataSourceWithBackend);

Expand Down
@@ -1,6 +1,7 @@
import {
DataFrame,
DataFrameJSON,
dataFrameToJSON,
DataQueryResponse,
isLiveChannelMessageEvent,
isLiveChannelStatusEvent,
Expand All @@ -15,7 +16,7 @@ import {
import { getGrafanaLiveSrv } from '../services/live';

import { Observable, of } from 'rxjs';
import { toDataQueryError } from '../utils/queryResponse';
import { toDataQueryError } from './queryResponse';
import { perf } from './perf';

export interface LiveDataFilter {
Expand All @@ -28,6 +29,7 @@ export interface LiveDataFilter {
export interface LiveDataStreamOptions {
key?: string;
addr: LiveChannelAddress;
frame?: DataFrame; // initial results
buffer?: StreamingFrameOptions;
filter?: LiveDataFilter;
}
Expand All @@ -39,8 +41,13 @@ export interface LiveDataStreamOptions {
*/
export function getLiveDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
if (!isValidLiveChannelAddress(options.addr)) {
return of({ error: toDataQueryError('invalid address'), data: [] });
return of({
error: toDataQueryError(`invalid channel address: ${JSON.stringify(options.addr)}`),
state: LoadingState.Error,
data: options.frame ? [options.frame] : [],
});
}

const live = getGrafanaLiveSrv();
if (!live) {
return of({ error: toDataQueryError('grafana live is not initalized'), data: [] });
Expand All @@ -50,8 +57,16 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
let data: StreamingDataFrame | undefined = undefined;
let filtered: DataFrame | undefined = undefined;
let state = LoadingState.Loading;
const { key, filter } = options;
let { key } = options;
let last = perf.last;
if (options.frame) {
const msg = dataFrameToJSON(options.frame);
data = new StreamingDataFrame(msg, options.buffer);
state = LoadingState.Streaming;
}
if (!key) {
key = `xstr/${streamCounter++}`;
}

const process = (msg: DataFrameJSON) => {
if (!data) {
Expand All @@ -61,14 +76,17 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
}
state = LoadingState.Streaming;

// Select the fields we are actually looking at
// Filter out fields
if (!filtered || msg.schema) {
filtered = data;
if (filter?.fields?.length) {
filtered = {
...data,
fields: data.fields.filter((f) => filter.fields!.includes(f.name)),
};
if (options.filter) {
const { fields } = options.filter;
if (fields?.length) {
filtered = {
...data,
fields: data.fields.filter((f) => fields.includes(f.name)),
};
}
}
}

Expand All @@ -85,15 +103,17 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
.getStream()
.subscribe({
error: (err: any) => {
console.log('LiveQuery [error]', { err }, options.addr);
state = LoadingState.Error;
subscriber.next({ state, data: [data], key });
subscriber.next({ state, data: [data], key, error: toDataQueryError(err) });
sub.unsubscribe(); // close after error
},
complete: () => {
console.log('LiveQuery [complete]', options.addr);
if (state !== LoadingState.Error) {
state = LoadingState.Done;
}
subscriber.next({ state, data: [data], key });
// or track errors? subscriber.next({ state, data: [data], key });
subscriber.complete();
sub.unsubscribe();
},
Expand All @@ -103,14 +123,19 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
return;
}
if (isLiveChannelStatusEvent(evt)) {
if (
if (evt.error) {
let error = toDataQueryError(evt.error);
error.message = `Streaming channel error: ${error.message}`;
state = LoadingState.Error;
subscriber.next({ state, data: [data], key, error });
return;
} else if (
evt.state === LiveChannelConnectionState.Connected ||
evt.state === LiveChannelConnectionState.Pending
) {
if (evt.message) {
process(evt.message);
}
return;
}
console.log('ignore state', evt);
}
Expand All @@ -122,3 +147,6 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
};
});
}

// incremet the stream ids
let streamCounter = 10;
4 changes: 2 additions & 2 deletions pkg/services/live/features/plugin.go
Expand Up @@ -100,7 +100,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI
Path: r.path,
})
if err != nil {
logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path)
logger.Error("Plugin OnSubscribe call error", "error", err, "path", r.path)
return models.SubscribeReply{}, 0, err
}
if resp.Status != backend.SubscribeStreamStatusOK {
Expand Down Expand Up @@ -144,7 +144,7 @@ func (r *PluginPathRunner) OnPublish(ctx context.Context, user *models.SignedInU
Data: e.Data,
})
if err != nil {
logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path)
logger.Error("Plugin OnPublish call error", "error", err, "path", r.path)
return models.PublishReply{}, 0, err
}
if resp.Status != backend.PublishStreamStatusOK {
Expand Down
10 changes: 9 additions & 1 deletion pkg/services/live/live.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -403,7 +404,14 @@ func (g *GrafanaLive) handlePushScope(_ *models.SignedInUser, namespace string)
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false)
if err != nil {
return nil, fmt.Errorf("error getting datasource: %w", err)
// the namespace may be an ID
Copy link
Contributor

@FZambia FZambia Apr 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a temporary or persistent change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it when we fix #32853

id, _ := strconv.ParseInt(namespace, 10, 64)
if id > 0 {
ds, err = g.DatasourceCache.GetDatasource(id, user, false)
}
if err != nil {
return nil, fmt.Errorf("error getting datasource: %w", err)
}
}
streamHandler, err := g.getStreamPlugin(ds.Type)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions public/app/features/live/channel.ts
Expand Up @@ -178,6 +178,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li

shutdownWithError(err: string) {
this.currentStatus.error = err;
this.sendStatus();
this.disconnect();
}
}
Expand Down
9 changes: 6 additions & 3 deletions public/app/features/live/live.ts
@@ -1,7 +1,7 @@
import Centrifuge from 'centrifuge/dist/centrifuge';
import { GrafanaLiveSrv, setGrafanaLiveSrv, getGrafanaLiveSrv, config } from '@grafana/runtime';
import { BehaviorSubject } from 'rxjs';
import { LiveChannel, LiveChannelScope, LiveChannelAddress } from '@grafana/data';
import { LiveChannel, LiveChannelScope, LiveChannelAddress, LiveChannelConnectionState } from '@grafana/data';
import { CentrifugeLiveChannel, getErrorChannel } from './channel';
import {
GrafanaLiveScope,
Expand Down Expand Up @@ -104,7 +104,10 @@ export class CentrifugeSrv implements GrafanaLiveSrv {

// Initialize the channel in the background
this.initChannel(scope, channel).catch((err) => {
channel?.shutdownWithError(err);
if (channel) {
channel.currentStatus.state = LiveChannelConnectionState.Invalid;
channel.shutdownWithError(err);
}
this.open.delete(id);
});

Expand All @@ -116,7 +119,7 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
const { addr } = channel;
const support = await scope.getChannelSupport(addr.namespace);
if (!support) {
throw new Error(channel.addr.namespace + 'does not support streaming');
throw new Error(channel.addr.namespace + ' does not support streaming');
}
const config = support.getChannelConfig(addr.path);
if (!config) {
Expand Down