Skip to content

Commit

Permalink
feat: async fetchTimeSeriesData
Browse files Browse the repository at this point in the history
  • Loading branch information
corteggiano authored and mnischay committed Jan 18, 2024
1 parent 712c1a7 commit 2b776cc
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 110 deletions.
30 changes: 30 additions & 0 deletions packages/core/src/data-module/TimeSeriesDataModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { MINUTE_IN_MS, parseDuration, SECOND_IN_MS } from '../common/time';
import type {
DataModuleSubscription,
DataSource,
DataStream,
DataStreamQuery,
RequestInformation,
RequestInformationAndRange,
Expand Down Expand Up @@ -141,6 +142,35 @@ export class TimeSeriesDataModule<Query extends DataStreamQuery> {
return { start, end };
};

public getCachedDataStreams = async ({
viewport,
queries,
emitDataStreams,
}: {
viewport: Viewport;
queries: Query[];
emitDataStreams: (dataStreams: DataStream[]) => void;
}) => {
const requestedStreams = await this.dataSourceStore.getRequestsFromQueries({
queries,
request: { viewport },
});

// create request information on every dataStream requested
// so they can be used to get cached data
const requestInformations = requestedStreams.map((stream) => ({
start: viewportStartDate(viewport),
end: viewportEndDate(viewport),
...stream,
})) as RequestInformationAndRange[];

const unsubscribe = this.dataCache.getCachedDataForRange(
requestInformations,
emitDataStreams
);
return unsubscribe;
};

public subscribeToDataStreams = (
{ queries, request }: DataModuleSubscription<Query>,
callback: (data: TimeSeriesData) => void
Expand Down
47 changes: 46 additions & 1 deletion packages/core/src/data-module/data-cache/dataCacheWrapped.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { configureStore } from './createStore';
import { onErrorAction, onRequestAction, onSuccessAction } from './dataActions';
import { getDataStreamStore } from './getDataStreamStore';
import { Observable, map, startWith, pairwise, from } from 'rxjs';
import { filter } from 'rxjs/operators';
import { delay, filter } from 'rxjs/operators';
import { toDataStreams } from './toDataStreams';
import type { Store } from 'redux';
import type {
Expand All @@ -14,6 +14,7 @@ import type {
} from '../types';
import type { DataStreamsStore } from './types';
import type { ErrorDetails } from '../../common/types';
import { hasIntervalForRange } from './dateUtils';

type StoreChange = {
prevDataCache: DataStreamsStore;
Expand Down Expand Up @@ -122,6 +123,50 @@ export class DataCache {

public getState = (): DataStreamsStore => this.dataCache.getState();

// emits cached data
public getCachedDataForRange = (
requestInfos: RequestInformationAndRange[],
emit: (dataStreams: DataStream[]) => void
) => {
const subscription = this.observableStore
.pipe(delay(0))
.subscribe(({ currDataCache }) => {
const hasLoadedFullIntervalData = requestInfos.every((request) => {
const { id, resolution, aggregationType, start, end } = request;
const associatedStore = getDataStreamStore(
id,
resolution,
currDataCache,
aggregationType
);

// if no stores are found, then data is uncached
if (!associatedStore) return false;

// check if cache has correct interval and is not loading more data
const hasLoadedData =
!associatedStore.isLoading && !associatedStore.isRefreshing;
const hasLoadedFullInterval = hasIntervalForRange(
associatedStore.dataCache.intervals,
{ start, end }
);

return hasLoadedData && hasLoadedFullInterval;
});

// only emit data streams if all request informations have loaded dataStreams for the required time range
if (hasLoadedFullIntervalData) {
const dataStreams = toDataStreams({
requestInformations: requestInfos,
dataStreamsStores: currDataCache,
});

subscription.unsubscribe();
emit(dataStreams);
}
});
};

/**
* data-cache bindings
*
Expand Down
20 changes: 20 additions & 0 deletions packages/core/src/data-module/data-cache/dateUtils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Interval } from '../../common/intervalStructure';
import { isHistoricalViewport } from '../../common/predicates';
import { parseDuration } from '../../common/time';
import type { DateInterval, Viewport } from './requestTypes';
Expand All @@ -18,3 +19,22 @@ export const getDateInterval = (viewport: Viewport): DateInterval => {
end,
};
};

export const hasIntervalForRange = (
intervals: Interval[],
range: { start: Date; end: Date }
) => {
if (!intervals) return false;

return intervals?.some((interval) => {
const intervalStart = new Date(interval[0]);
const intervalEnd = new Date(interval[1]);

const isStartWithinInterval =
range.start >= intervalStart && range.start <= intervalEnd;
const isEndWithinInterval =
range.end >= intervalStart && range.end <= intervalEnd;

return isStartWithinInterval && isEndWithinInterval;
});
};
1 change: 1 addition & 0 deletions packages/dashboard/testing/siteWiseQueries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export const mockQuery = (
): SiteWiseQuery => {
const { updateViewport = noop, unsubscribe = noop } = overrides || {};
return {
fetchTimeSeriesData: (_input) => new Promise(() => {}),
timeSeriesData: () => ({
toQueryString: () => JSON.stringify(timeSeriesData),
build: () => ({
Expand Down
28 changes: 27 additions & 1 deletion packages/source-iotsitewise/src/initialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
TimeSeriesDataRequest,
TimeSeriesDataQuery,
TimeSeriesDataModule,
Viewport,
DataStream,
} from '@iot-app-kit/core';
import { IoTEventsClient } from '@aws-sdk/client-iot-events';
import { IoTSiteWiseClient } from '@aws-sdk/client-iotsitewise';
Expand All @@ -33,6 +35,8 @@ import type {
SiteWiseAssetTreeNode,
SiteWiseAssetTreeQueryArguments,
} from './asset-modules';
import { fetchTimeSeriesData } from './time-series-data/fetchTimeSeriesData';
import { CreateTimeSeriesDataStore } from './time-series-data/store';

const SOURCE = 'iotsitewise';

Expand All @@ -47,6 +51,13 @@ export type SiteWiseDataSourceInitInputs = {
};

export type SiteWiseQuery = {
fetchTimeSeriesData: ({
query,
viewport,
}: {
query: SiteWiseDataStreamQuery;
viewport: Viewport;
}) => Promise<DataStream[]>;
timeSeriesData: (query: SiteWiseDataStreamQuery) => TimeSeriesDataQuery;
assetTree: {
fromRoot: (
Expand All @@ -67,6 +78,16 @@ export type SiteWiseQuery = {
export const initialize = (input: SiteWiseDataSourceInitInputs) => {
const siteWiseClient = getSiteWiseClient(input);
const iotEventsClient = getIotEventsClient(input);
const store = new CreateTimeSeriesDataStore({
initialState: {
modeledDataStreams: [],
dataStreams: [],
thresholds: [],
assetModels: {},
alarms: {},
errors: {},
},
});

const assetDataSource: SiteWiseAssetDataSource =
createSiteWiseAssetDataSource(siteWiseClient);
Expand All @@ -81,6 +102,10 @@ export const initialize = (input: SiteWiseDataSourceInitInputs) => {

return {
query: {
fetchTimeSeriesData: fetchTimeSeriesData(
siteWiseTimeSeriesModule,
store.getState()
),
timeSeriesData: (
query: SiteWiseDataStreamQuery
): TimeSeriesDataQuery => ({
Expand All @@ -101,7 +126,8 @@ export const initialize = (input: SiteWiseDataSourceInitInputs) => {
{
queries: [query],
request: params,
}
},
store
),
}),

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { TimeSeriesDataModule, Viewport, DataStream } from '@iot-app-kit/core';
import { SiteWiseDataStreamQuery } from './types';
import { TimeSeriesDataStore } from './store';
import { completeDataStreams } from '../completeDataStreams';

// creates async function which requests cachedDataStreams until all the data for viewport is present
export const fetchTimeSeriesData =
(
siteWiseTimeSeriesModule: TimeSeriesDataModule<SiteWiseDataStreamQuery>,
store: TimeSeriesDataStore
) =>
async ({
query,
viewport,
}: {
query: SiteWiseDataStreamQuery;
viewport: Viewport;
}): Promise<DataStream[]> => {
return new Promise((resolve) => {
siteWiseTimeSeriesModule.getCachedDataStreams({
queries: [query],
viewport,
emitDataStreams: (ds: DataStream[]) =>
resolve(
completeDataStreams({
modeledDataStreams: store.modeledDataStreams,
dataStreams: ds,
assetModels: store.assetModels,
alarms: store.alarms,
})
),
});
});
};
36 changes: 30 additions & 6 deletions packages/source-iotsitewise/src/time-series-data/provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
createMockIoTEventsSDK,
createMockSiteWiseSDK,
} from '@iot-app-kit/testing-util';
import { CreateTimeSeriesDataStore } from './store';

const DATA_STREAM: DataStream<number> = {
id: 'some-asset-id---some-property-id',
Expand All @@ -30,6 +31,23 @@ const DATA_STREAM: DataStream<number> = {

const AGGREGATE_TYPE = AggregateType.AVERAGE;

const createMockTimeSeriesStore = () => {
const cb = jest.fn();
const store = new CreateTimeSeriesDataStore({
initialState: {
modeledDataStreams: [],
dataStreams: [],
thresholds: [],
assetModels: {},
alarms: {},
errors: {},
},
callback: cb,
});

return { store, cb };
};

const createMockSource = (
dataStreams: DataStream[]
): DataSource<SiteWiseDataStreamQuery> => ({
Expand Down Expand Up @@ -88,13 +106,19 @@ it.skip('subscribes, updates, and unsubscribes to time series data by delegating

const refreshRate = MINUTE_IN_MS;

const provider = new SiteWiseTimeSeriesDataProvider(componentSession, {
queries: [{ assets: [] }],
request: {
viewport: { start: START_1, end: END_1 },
settings: { fetchFromStartToEnd: true, refreshRate },
const { store } = createMockTimeSeriesStore();

const provider = new SiteWiseTimeSeriesDataProvider(
componentSession,
{
queries: [{ assets: [] }],
request: {
viewport: { start: START_1, end: END_1 },
settings: { fetchFromStartToEnd: true, refreshRate },
},
},
});
store
);

const timeSeriesCallback = jest.fn();

Expand Down
19 changes: 13 additions & 6 deletions packages/source-iotsitewise/src/time-series-data/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type {
SubscriptionUpdate,
} from '@iot-app-kit/core';
import type { SiteWiseDataStreamQuery } from './types';
import { CreateTimeSeriesDataStore } from './store';

/**
* Provider for SiteWise time series data
Expand All @@ -26,25 +27,31 @@ export class SiteWiseTimeSeriesDataProvider
) => void = () => {};
public session: SiteWiseComponentSession;
public input: DataModuleSubscription<SiteWiseDataStreamQuery>;
private store: CreateTimeSeriesDataStore;

constructor(
session: SiteWiseComponentSession,
input: DataModuleSubscription<SiteWiseDataStreamQuery>
input: DataModuleSubscription<SiteWiseDataStreamQuery>,
store: CreateTimeSeriesDataStore
) {
this.session = session;
this.input = input;
this.store = store;
}

subscribe(observer: ProviderObserver<TimeSeriesData[]>) {
const { session } = this;
const { session, store } = this;

store.setCallback((timeSeriesData: TimeSeriesData) =>
observer.next([timeSeriesData])
);

const { update, unsubscribe } = subscribeToTimeSeriesData(
timeSeriesDataSession(session),
assetSession(session),
alarmsSession(session)
)(this.input, (timeSeriesData: TimeSeriesData) =>
observer.next([timeSeriesData])
);
alarmsSession(session),
store
)(this.input);

this.update = update;

Expand Down
10 changes: 8 additions & 2 deletions packages/source-iotsitewise/src/time-series-data/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ export type TimeSeriesDataStore = {

export class CreateTimeSeriesDataStore {
private readonly state: TimeSeriesDataStore;
private readonly callback: (data: TimeSeriesData) => void;
private callback: ((data: TimeSeriesData) => void) | undefined;

constructor({
initialState,
callback,
}: {
initialState: Partial<TimeSeriesDataStore>;
callback: (data: TimeSeriesData) => void;
callback?: (data: TimeSeriesData) => void;
}) {
this.callback = callback;
this.state = { ...initialState } as TimeSeriesDataStore;
}

update() {
if (!this.callback) return;

const {
thresholds,
viewport,
Expand Down Expand Up @@ -97,4 +99,8 @@ export class CreateTimeSeriesDataStore {
getState() {
return this.state;
}

setCallback(callback: (data: TimeSeriesData) => void) {
this.callback = callback;
}
}

0 comments on commit 2b776cc

Please sign in to comment.