Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: Limit data layer state updates #724

Merged
merged 6 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions packages/scenes/src/querying/DataLayersMerger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { filter, finalize, map, merge, mergeAll } from 'rxjs';
import { CancelActivationHandler, SceneDataLayerProvider, SceneDataProviderResult } from '../core/types';

export class DataLayersMerger {
private _resultsMap: Map<string, SceneDataProviderResult> = new Map();
private _prevLayers: SceneDataLayerProvider[] = [];

public getMergedStream(layers: SceneDataLayerProvider[]) {
if (areDifferentLayers(layers, this._prevLayers)) {
this._resultsMap = new Map();
this._prevLayers = layers;
}

const resultStreams = layers.map((l) => l.getResultsStream());
const deactivationHandlers: CancelActivationHandler[] = [];

for (const layer of layers) {
deactivationHandlers.push(layer.activate());
}

return merge(resultStreams).pipe(
mergeAll(),
filter((v) => {
return this._resultsMap.get(v.origin.state.key!) !== v;
}),
map((v) => {
// Is there a better, rxjs only way to combine multiple same-data-topic observables?
// Indexing by origin state key is to make sure we do not duplicate/overwrite data from the different origins
this._resultsMap.set(v.origin.state.key!, v);
return this._resultsMap.values();
}),
finalize(() => {
deactivationHandlers.forEach((handler) => handler());
})
);
}
}

function areDifferentLayers(a: SceneDataLayerProvider[], b: SceneDataLayerProvider[]) {
if (a.length !== b.length) {
return true;
}

for (let i = 0; i < a.length; i++) {
if (a[i] !== b[i]) {
return true;
}
}

return false;
}
7 changes: 4 additions & 3 deletions packages/scenes/src/querying/SceneDataLayerSet.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
SceneDataLayerProviderState,
SceneDataProviderResult,
} from '../core/types';
import { mergeMultipleDataLayers } from './mergeMultipleDataLayers';
import { DataLayersMerger } from './DataLayersMerger';
import { setBaseClassState } from '../utils/utils';

export abstract class SceneDataLayerSetBase<T extends SceneDataLayerProviderState>
Expand All @@ -27,11 +27,12 @@ export abstract class SceneDataLayerSetBase<T extends SceneDataLayerProviderStat
/**
* Subject to emit results to.
*/
private _results = new ReplaySubject<SceneDataProviderResult>();
private _results = new ReplaySubject<SceneDataProviderResult>(1);
private _dataLayersMerger = new DataLayersMerger();

protected subscribeToAllLayers(layers: SceneDataLayerProvider[]) {
if (layers.length > 0) {
this.querySub = mergeMultipleDataLayers(layers).subscribe(this._onLayerUpdateReceived.bind(this));
this.querySub = this._dataLayersMerger.getMergedStream(layers).subscribe(this._onLayerUpdateReceived.bind(this));
} else {
this._results.next({ origin: this, data: emptyPanelData });
this.setStateHelper({ data: emptyPanelData });
Expand Down
27 changes: 26 additions & 1 deletion packages/scenes/src/querying/SceneQueryRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {

import { SceneTimeRange } from '../core/SceneTimeRange';

import { SceneQueryRunner } from './SceneQueryRunner';
import { QueryRunnerState, SceneQueryRunner } from './SceneQueryRunner';
import { SceneFlexItem, SceneFlexLayout } from '../components/layout/SceneFlexLayout';
import { SceneVariableSet } from '../variables/sets/SceneVariableSet';
import { TestVariable } from '../variables/variants/TestVariable';
Expand Down Expand Up @@ -1365,6 +1365,31 @@ describe('SceneQueryRunner', () => {
expect(queryRunner.state.data?.annotations).toHaveLength(1);
});

it('should not cause unnessaray state updates', async () => {
const layer = new TestAnnotationsDataLayer({ name: 'Layer 1' });
const queryRunner = new SceneQueryRunner({
queries: [{ refId: 'A' }],
$timeRange: new SceneTimeRange(),
$data: new SceneDataLayerSet({ layers: [layer] }),
});

expect(queryRunner.state.data).toBeUndefined();

queryRunner.activate();

const stateUpdates: QueryRunnerState[] = [];
queryRunner.subscribeToState((state) => stateUpdates.push(state));

await new Promise((r) => setTimeout(r, 1));

expect(queryRunner.state.data?.annotations).toHaveLength(0);
expect(queryRunner.state.data?.series).toBeDefined();

layer.completeEmpty();

expect(stateUpdates).toHaveLength(1);
});

describe('canceling queries', () => {
it('should unsubscribe from data layers when query is canceled', async () => {
const layer1 = new TestAnnotationsDataLayer({ name: 'Layer 1' });
Expand Down
36 changes: 31 additions & 5 deletions packages/scenes/src/querying/SceneQueryRunner.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { cloneDeep } from 'lodash';
import { cloneDeep, isEqual } from 'lodash';
import { forkJoin, ReplaySubject, Unsubscribable } from 'rxjs';

import { DataQuery, DataSourceRef, LoadingState } from '@grafana/schema';
Expand Down Expand Up @@ -44,7 +44,7 @@ import { findActiveGroupByVariablesByUid } from '../variables/groupby/findActive
import { GroupByVariable } from '../variables/groupby/GroupByVariable';
import { AdHocFiltersVariable, isFilterComplete } from '../variables/adhoc/AdHocFiltersVariable';
import { SceneVariable } from '../variables/types';
import { mergeMultipleDataLayers } from './mergeMultipleDataLayers';
import { DataLayersMerger } from './DataLayersMerger';

let counter = 100;

Expand Down Expand Up @@ -78,6 +78,7 @@ export interface DataQueryExtended extends DataQuery {
export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implements SceneDataProvider {
private _querySub?: Unsubscribable;
private _dataLayersSub?: Unsubscribable;
private _dataLayersMerger = new DataLayersMerger();
private _timeSub?: Unsubscribable;
private _timeSubRange?: SceneTimeRangeLike;
private _containerWidth?: number;
Expand Down Expand Up @@ -141,7 +142,9 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
return;
}

this._dataLayersSub = mergeMultipleDataLayers(dataLayers).subscribe(this._onLayersReceived.bind(this));
this._dataLayersSub = this._dataLayersMerger
.getMergedStream(dataLayers)
.subscribe(this._onLayersReceived.bind(this));
}

private _onLayersReceived(results: Iterable<SceneDataProviderResult>) {
Expand Down Expand Up @@ -182,9 +185,19 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
}
}

const baseStateUpdate = this.state.data ? this.state.data : { ...emptyPanelData, timeRange: timeRange.state.value };
// Skip unnessary state updates
if (
allFramesEmpty(annotations) &&
allFramesEmpty(this._layerAnnotations) &&
isEqual(alertState, this.state.data?.alertState)
) {
return;
}

this._layerAnnotations = annotations;

const baseStateUpdate = this.state.data ? this.state.data : { ...emptyPanelData, timeRange: timeRange.state.value };

this.setState({
data: {
...baseStateUpdate,
Expand Down Expand Up @@ -558,7 +571,6 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
}

this.setState({ data: dataWithLayersApplied, _hasFetchedData: hasFetchedData });

this._results.next({ origin: this, data: dataWithLayersApplied });
};

Expand Down Expand Up @@ -639,3 +651,17 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
export function findFirstDatasource(targets: DataQuery[]): DataSourceRef | undefined {
return targets.find((t) => t.datasource !== null)?.datasource ?? undefined;
}

function allFramesEmpty(frames?: DataFrame[]) {
if (!frames) {
return true;
}

for (let i = 0; i < frames.length; i++) {
if (frames[i].length > 0) {
return false;
}
}

return true;
}
2 changes: 1 addition & 1 deletion packages/scenes/src/querying/layers/SceneDataLayerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export abstract class SceneDataLayerBase<T extends SceneDataLayerProviderState>
/**
* Subject to emit results to.
*/
private _results = new ReplaySubject<SceneDataProviderResult>();
private _results = new ReplaySubject<SceneDataProviderResult>(1);

/**
* Implement logic for enabling the layer. This is called when layer is enabled or when layer is enabled when activated.
Expand Down
5 changes: 5 additions & 0 deletions packages/scenes/src/querying/layers/TestDataLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ export class TestAnnotationsDataLayer
public startRun() {
this.publishResults({ ...emptyPanelData, state: LoadingState.Loading });
}

public completeRun() {
this.publishResults(this.getResults());
}

public completeEmpty() {
this.publishResults({ ...emptyPanelData });
}

public completeRunWithError() {
this.publishResults({ ...emptyPanelData, state: LoadingState.Error });
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { arrayToDataFrame, DataTopic, AnnotationQuery, ScopedVars } from '@grafana/data';
import { arrayToDataFrame, DataTopic, AnnotationQuery, ScopedVars, PanelData } from '@grafana/data';
import { LoadingState } from '@grafana/schema';
import React from 'react';
import { map, Unsubscribable } from 'rxjs';
Expand Down Expand Up @@ -108,7 +108,7 @@ export class AnnotationsDataLayer
return await getDataSource(query.datasource || undefined, this._scopedVars);
}

protected processEvents(query: AnnotationQuery, events: AnnotationQueryResults) {
protected processEvents(query: AnnotationQuery, events: AnnotationQueryResults): PanelData {
let processedEvents = postProcessQueryResult(query, events.events || []);
processedEvents = dedupAnnotations(processedEvents);

Expand Down
28 changes: 0 additions & 28 deletions packages/scenes/src/querying/mergeMultipleDataLayers.ts

This file was deleted.

Loading