Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explore: Use PanelQueryState to handle querying #18694

Merged
merged 12 commits into from Aug 28, 2019
27 changes: 13 additions & 14 deletions public/app/core/utils/explore.ts
Expand Up @@ -2,14 +2,23 @@
import _ from 'lodash';
import { from } from 'rxjs';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';

// Services & Utils
import { dateMath } from '@grafana/data';
import {
dateMath,
toUtc,
TimeRange,
RawTimeRange,
TimeZone,
IntervalValues,
TimeFragment,
LogRowModel,
LogsModel,
LogsDedupStrategy,
} from '@grafana/data';
import { renderUrl } from 'app/core/utils/url';
import kbn from 'app/core/utils/kbn';
import store from 'app/core/store';
import { getNextRefIdChar } from './query';

// Types
import {
DataQuery,
Expand All @@ -19,17 +28,6 @@ import {
DataQueryRequest,
DataStreamObserver,
} from '@grafana/ui';
import {
toUtc,
TimeRange,
RawTimeRange,
TimeZone,
IntervalValues,
TimeFragment,
LogRowModel,
LogsModel,
LogsDedupStrategy,
} from '@grafana/data';
import {
ExploreUrlState,
HistoryItem,
Expand Down Expand Up @@ -145,6 +143,7 @@ export function buildQueryTransaction(
panelId,
targets: configuredQueries, // Datasources rely on DataQueries being passed under the targets key.
range,
requestId: 'explore',
rangeRaw: range.raw,
scopedVars: {
__interval: { text: interval, value: interval },
Expand Down
4 changes: 1 addition & 3 deletions public/app/features/dashboard/state/PanelQueryState.ts
@@ -1,18 +1,16 @@
// Libraries
import { isArray, isEqual, isString } from 'lodash';

// Utils & Services
import { getBackendSrv } from 'app/core/services/backend_srv';
import { dateMath } from '@grafana/data';
import {
dateMath,
guessFieldTypes,
LoadingState,
toLegacyResponseData,
DataFrame,
toDataFrame,
isDataFrame,
} from '@grafana/data';

// Types
import {
DataSourceApi,
Expand Down
9 changes: 9 additions & 0 deletions public/app/features/explore/state/actionTypes.ts
Expand Up @@ -268,6 +268,11 @@ export interface UpdateTimeRangePayload {
absoluteRange?: AbsoluteTimeRange;
}

export interface ChangeLoadingStatePayload {
exploreId: ExploreId;
loadingState: LoadingState;
}

/**
* Adds a query row after the row with the given index.
*/
Expand Down Expand Up @@ -478,6 +483,10 @@ export const changeRangeAction = actionCreatorFactory<ChangeRangePayload>('explo

export const updateTimeRangeAction = actionCreatorFactory<UpdateTimeRangePayload>('explore/UPDATE_TIMERANGE').create();

export const changeLoadingStateAction = actionCreatorFactory<ChangeLoadingStatePayload>(
'changeLoadingStateAction'
).create();

export type HigherOrderAction =
| ActionOf<SplitCloseActionPayload>
| SplitOpenAction
Expand Down
Expand Up @@ -17,8 +17,7 @@ export const limitMessageRateEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState
latency: 0,
datasourceId,
loadingState: LoadingState.Streaming,
series: null,
delta: series,
series,
});
})
);
Expand Down
Expand Up @@ -34,7 +34,7 @@ export const processQueryResultsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreSt
}

const result = series || delta || [];
const replacePreviousResults = loadingState === LoadingState.Done && series && !delta ? true : false;
const replacePreviousResults = loadingState === LoadingState.Done && series ? true : false;
const resultProcessor = new ResultProcessor(state$.value.explore[exploreId], replacePreviousResults, result);
const graphResult = resultProcessor.getGraphResult();
const tableResult = resultProcessor.getTableResult();
Expand Down
205 changes: 205 additions & 0 deletions public/app/features/explore/state/epics/runRequestEpic.ts
@@ -0,0 +1,205 @@
import { Epic } from 'redux-observable';
import { Observable, from, Subscriber } from 'rxjs';
import { mergeMap, takeUntil, filter, catchError } from 'rxjs/operators';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';

import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import {
clearQueriesAction,
resetExploreAction,
updateDatasourceInstanceAction,
changeRefreshIntervalAction,
runQueriesBatchAction,
RunQueriesBatchPayload,
queryStartAction,
historyUpdatedAction,
processQueryResultsAction,
stateSaveAction,
processQueryErrorsAction,
changeRangeAction,
limitMessageRatePayloadAction,
changeLoadingStateAction,
} from '../actionTypes';
import { buildQueryTransaction, updateHistory } from '../../../../core/utils/explore';
import { PanelQueryState } from '../../../dashboard/state/PanelQueryState';
import { DataQueryResponseData } from '@grafana/ui/src/types/datasource';
import { ExploreId, ExploreItemState } from 'app/types';
import { LoadingState, DataFrame, AbsoluteTimeRange, dateMath } from '@grafana/data';
import { PanelData } from '@grafana/ui/src/types/panel';
import { isString } from 'lodash';

const publishActions = (outerObservable: Subscriber<unknown>, actions: Array<ActionOf<any>>) => {
hugohaggmark marked this conversation as resolved.
Show resolved Hide resolved
for (const action of actions) {
outerObservable.next(action);
}
};

interface ProcessResponseConfig {
exploreId: ExploreId;
exploreItemState: ExploreItemState;
datasourceId: string;
now: number;
loadingState: LoadingState;
series?: DataQueryResponseData[];
delta?: DataFrame[];
}

const processResponse = (config: ProcessResponseConfig) => {
const { exploreId, exploreItemState, datasourceId, now, loadingState, series, delta } = config;
const { queries, history } = exploreItemState;
const latency = Date.now() - now;

// Side-effect: Saving history in localstorage
const nextHistory = updateHistory(history, datasourceId, queries);
return [
historyUpdatedAction({ exploreId, history: nextHistory }),
processQueryResultsAction({ exploreId, latency, datasourceId, loadingState, series, delta }),
stateSaveAction(),
];
};

interface ProcessErrorConfig {
exploreId: ExploreId;
datasourceId: string;
error: any;
}

const processError = (config: ProcessErrorConfig) => {
const { exploreId, datasourceId, error } = config;

return [processQueryErrorsAction({ exploreId, response: error, datasourceId })];
};

export const runRequestEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (
action$,
state$,
{ getQueryResponse }
) => {
return action$.ofType(runQueriesBatchAction.type).pipe(
mergeMap((action: ActionOf<RunQueriesBatchPayload>) => {
const { exploreId, queryOptions } = action.payload;
const exploreItemState = state$.value.explore[exploreId];
const { datasourceInstance, queries, queryIntervals, range, scanning } = exploreItemState;

// Create an observable per run queries action
// Within the observable create two subscriptions
// First subscription: 'querySubscription' subscribes to the call to query method on datasourceinstance
// Second subscription: 'streamSubscription' subscribes to events from the query methods observer callback
const observable: Observable<ActionOf<any>> = new Observable(outerObservable => {
hugohaggmark marked this conversation as resolved.
Show resolved Hide resolved
outerObservable.next(queryStartAction({ exploreId }));

const datasourceId = datasourceInstance.meta.id;
const now = Date.now();
const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning);
const queryState = new PanelQueryState();
queryState.onStreamingDataUpdated = () => {
const data = queryState.validateStreamsAndGetPanelData();
const { state, error, request, series } = data;
if (!data && !error) {
return;
}

if (state === LoadingState.Error) {
const actions = processError({ exploreId, datasourceId, error });
publishActions(outerObservable, actions);
}

if (state === LoadingState.Streaming) {
if (request && request.range) {
let newRange = request.range;
let absoluteRange: AbsoluteTimeRange = {
from: newRange.from.valueOf(),
to: newRange.to.valueOf(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to do any of this part... it already happened in validateStreamsAndGetPanelData()

you can just use the request.range

};
if (isString(newRange.raw.from)) {
newRange = {
from: dateMath.parse(newRange.raw.from, false),
to: dateMath.parse(newRange.raw.to, true),
raw: newRange.raw,
};
absoluteRange = {
from: newRange.from.valueOf(),
to: newRange.to.valueOf(),
};
}
outerObservable.next(changeRangeAction({ exploreId, range: newRange, absoluteRange }));
}

outerObservable.next(
limitMessageRatePayloadAction({
exploreId,
series,
datasourceId,
})
);
}

if (state === LoadingState.Done) {
outerObservable.next(changeLoadingStateAction({ exploreId, loadingState: state }));
}
};

from(queryState.execute(datasourceInstance, transaction.options))
.pipe(
mergeMap((response: PanelData) => {
return processResponse({
exploreId,
exploreItemState,
datasourceId,
now,
loadingState: response.state,
series: response && response.series ? response.series : [],
delta: null,
});
}),
catchError(error => {
return processError({ exploreId, datasourceId, error });
})
)
.subscribe({ next: (action: ActionOf<any>) => outerObservable.next(action) });

const unsubscribe = () => {
queryState.closeStreams(true);
outerObservable.unsubscribe();
};

return unsubscribe;
});

return observable.pipe(
takeUntil(
action$
.ofType(
runQueriesBatchAction.type,
resetExploreAction.type,
updateDatasourceInstanceAction.type,
changeRefreshIntervalAction.type,
clearQueriesAction.type
)
.pipe(
filter(action => {
if (action.type === resetExploreAction.type) {
return true; // stops all subscriptions if user navigates away
}

if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) {
return true; // stops subscriptions if user changes data source
}

if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) {
return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live'
}

if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) {
return true; // stops subscriptions if user clears all queries
}

return action.payload.exploreId === exploreId;
})
)
)
);
})
);
};
17 changes: 14 additions & 3 deletions public/app/features/explore/state/reducers.ts
Expand Up @@ -30,9 +30,6 @@ import {
queryStartAction,
runQueriesAction,
changeRangeAction,
} from './actionTypes';
import { reducerFactory } from 'app/core/redux';
import {
addQueryRowAction,
changeQueryAction,
changeSizeAction,
Expand All @@ -52,11 +49,14 @@ import {
queriesImportedAction,
updateUIStateAction,
toggleLogLevelAction,
changeLoadingStateAction,
} from './actionTypes';
import { reducerFactory } from 'app/core/redux';
import { updateLocation } from 'app/core/actions/location';
import { LocationUpdate } from '@grafana/runtime';
import TableModel from 'app/core/table_model';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
import { PanelQueryState } from '../../dashboard/state/PanelQueryState';

export const DEFAULT_RANGE = {
from: 'now-6h',
Expand Down Expand Up @@ -113,6 +113,7 @@ export const makeExploreItemState = (): ExploreItemState => ({
mode: null,
isLive: false,
urlReplaced: false,
queryState: new PanelQueryState(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. I feel like this should not be accessible by the react and also does not drive rendering in any. We do not change it and it is also mutable. So I wonder if we shouldn't just move it outside of the redux store and handle it as external dependency kind of thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this is fine for now, it is mutable so not ideal to have in redux but not forbidden either. Think before we do more complex work figure out the proper redux solution around this we can move ahead with this. I think the coming rewrite to how queries are executed to use observable patterns, that will fundamentally change things so good to wait to do more work around the current API & responsibility of PanelQueryState as I think that will change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points! But I'll let it be for now.

});

/**
Expand Down Expand Up @@ -575,6 +576,16 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
};
},
})
.addMapper({
filter: changeLoadingStateAction,
mapper: (state, action): ExploreItemState => {
const { loadingState } = action.payload;
return {
...state,
loadingState,
};
},
})
.create();

export const updateChildRefreshState = (
Expand Down