New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Live: support streaming results out-of-the-box #32821
Conversation
go.mod
Outdated
@@ -45,7 +45,7 @@ require ( | |||
github.com/grafana/grafana-aws-sdk v0.4.0 | |||
github.com/grafana/grafana-live-sdk v0.0.4 | |||
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 | |||
github.com/grafana/grafana-plugin-sdk-go v0.91.0 | |||
github.com/grafana/grafana-plugin-sdk-go v0.91.1-0.20210408195819-266fa63c3826 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is needed so that meta.channel
is parsed
if (!dsConfig) { | ||
return Promise.reject({ message: `Datasource named ${name} was not found` }); | ||
dsConfig = this.settingsMapById[name]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@torkelo any reason this may be a bad idea? The issue I have is that backend plugins do not have access to uid :( and we need something to pass forward that is consistent.
Long term, we should likely replace all use of id
with uid
, this change can help with that migration -- at least I do not think it will hurt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dam I thought the backend was already using uid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When/where do you need to lookup ds by id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel response from a query is:
ds/${id}/path
Ideally that could be: ds/${uid}/path
, BUT backend plugin API does not pass in the UID so it can not be returned from the query easily. This is something we should fix, but is a bigger issue.
For V8, perhaps we should stop returning the internal id entirely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very interesting progress with this, splendid work 🎉
switchMap((raw) => { | ||
const rsp = toDataQueryResponse(raw, queries as DataQuery[]); | ||
// Check if any response should subscribe to a live stream | ||
if (rsp.data?.length && rsp.data.find((f: DataFrame) => f.meta?.channel)) { | ||
const buffer: StreamingFrameOptions = { | ||
maxLength: request.maxDataPoints ?? 500, | ||
}; | ||
|
||
// For recent queries, clamp to the current time range | ||
if (request.rangeRaw?.to === 'now') { | ||
buffer.maxDelta = request.range.to.valueOf() - request.range.from.valueOf(); | ||
} | ||
|
||
const staticdata: DataFrame[] = []; | ||
const streams: Array<Observable<DataQueryResponse>> = []; | ||
for (const frame of rsp.data) { | ||
const addr = parseLiveChannelAddress(frame.meta?.channel); | ||
if (addr) { | ||
streams.push( | ||
getLiveDataStream({ | ||
addr, | ||
buffer, | ||
frame: frame as DataFrame, | ||
}) | ||
); | ||
} else { | ||
staticdata.push(frame); | ||
} | ||
} | ||
if (staticdata.length) { | ||
streams.push(of({ ...rsp, data: staticdata })); | ||
} | ||
if (streams.length === 1) { | ||
return streams[0]; // avoid merge wrapper | ||
} | ||
return merge(...streams); | ||
} | ||
return of(rsp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there anyway we can pull this apart so it becomes testable? And maybe add tests too?
@@ -403,7 +404,14 @@ func (g *GrafanaLive) handlePushScope(_ *models.SignedInUser, namespace string) | |||
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { | |||
ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false) | |||
if err != nil { | |||
return nil, fmt.Errorf("error getting datasource: %w", err) | |||
// the namespace may be an ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a temporary or persistent change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove it when we fix #32853
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
This works with the pre-release/WIP:
https://github.com/grafana/signal-generator-datasource
We have many streaming implementations where the data source needs to construct its own rxjs magic to keep things up-to-date. This PR applies the streaming logic by default for
DataSourceWithBackend.ts
The front-end implementation for signal generator is now: