-
Notifications
You must be signed in to change notification settings - Fork 11.8k
/
processQueryResultsEpic.ts
82 lines (73 loc) · 2.82 KB
/
processQueryResultsEpic.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import _ from 'lodash';
import { Epic } from 'redux-observable';
import { mergeMap } from 'rxjs/operators';
import { NEVER } from 'rxjs';
import { LoadingState } from '@grafana/ui';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { getRefIds } from 'app/core/utils/explore';
import {
processQueryResultsAction,
ProcessQueryResultsPayload,
querySuccessAction,
resetQueryErrorAction,
scanStopAction,
updateTimeRangeAction,
runQueriesAction,
} from '../actionTypes';
import { ResultProcessor } from '../../utils/ResultProcessor';
export const processQueryResultsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (
action$,
state$,
{ getTimeZone, getShiftedTimeRange }
) => {
return action$.ofType(processQueryResultsAction.type).pipe(
mergeMap((action: ActionOf<ProcessQueryResultsPayload>) => {
const { exploreId, datasourceId, latency, loadingState, series, delta } = action.payload;
const { datasourceInstance, scanning, eventBridge } = state$.value.explore[exploreId];
// If datasource already changed, results do not matter
if (datasourceInstance.meta.id !== datasourceId) {
return NEVER;
}
const result = series || delta || [];
const replacePreviousResults = loadingState === LoadingState.Done && series && !delta ? true : false;
const resultProcessor = new ResultProcessor(state$.value.explore[exploreId], replacePreviousResults, result);
const graphResult = resultProcessor.getGraphResult();
const tableResult = resultProcessor.getTableResult();
const logsResult = resultProcessor.getLogsResult();
const refIds = getRefIds(result);
const actions: Array<ActionOf<any>> = [];
// For Angular editors
eventBridge.emit('data-received', resultProcessor.getRawData());
// Clears any previous errors that now have a successful query, important so Angular editors are updated correctly
actions.push(
resetQueryErrorAction({
exploreId,
refIds,
})
);
actions.push(
querySuccessAction({
exploreId,
latency,
loadingState,
graphResult,
tableResult,
logsResult,
})
);
// Keep scanning for results if this was the last scanning transaction
if (scanning) {
if (_.size(result) === 0) {
const range = getShiftedTimeRange(-1, state$.value.explore[exploreId].range, getTimeZone(state$.value.user));
actions.push(updateTimeRangeAction({ exploreId, absoluteRange: range }));
actions.push(runQueriesAction({ exploreId }));
} else {
// We can stop scanning if we have a result
actions.push(scanStopAction({ exploreId }));
}
}
return actions;
})
);
};