Skip to content

Commit

Permalink
feat(core): Support caching of dataType, name and other fields descri…
Browse files Browse the repository at this point in the history
…bing dataStreams
  • Loading branch information
diehbria committed Aug 31, 2022
1 parent c063d5c commit 355f57e
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 29 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"fix:eslint": "eslint --fix --ext .js,.ts,.tsx .",
"fix:stylelint": "stylelint '**/*.css' --fix",
"test": "npm-run-all -p test:unit test:eslint test:stylelint test:git",
"test:eslint": "eslint --ext .js,.ts,.tsx . --max-warnings=49",
"test:eslint": "eslint --ext .js,.ts,.tsx . --max-warnings=47",
"test:stylelint": "stylelint '**/*.css' --max-warnings 0",
"test:unit": "lerna run test --stream --concurrency 1",
"test:git": "git diff --exit-code",
Expand Down
53 changes: 53 additions & 0 deletions packages/core/src/data-module/TimeSeriesDataModule.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,59 @@ describe('initial request', () => {
});
});

it('passes back meta, name, and dataType information', async () => {
const REF_ID = 'ref-id';
const query: SiteWiseDataStreamQuery = {
assets: [
{
assetId: ASSET_ID,
properties: [{ propertyId: PROPERTY_ID, refId: REF_ID }],
},
],
};

const START = new Date(2000, 0, 0);
const END = new Date();

const someMetaStuff = { field: 'key value' };
const someName = 'cool name';
const someDataType = 'STRING';

const dataSource: DataSource = createMockSiteWiseDataSource({
dataStreams: [{ ...DATA_STREAM, meta: someMetaStuff, name: someName, dataType: someDataType }],
});
const dataModule = new TimeSeriesDataModule(dataSource);

const timeSeriesCallback = jest.fn();

dataModule.subscribeToDataStreams(
{
queries: [query],
request: { viewport: { start: START, end: END }, settings: { fetchFromStartToEnd: true } },
},
timeSeriesCallback
);
jest.advanceTimersByTime(100);
await flushPromises();
jest.advanceTimersByTime(100);
await flushPromises();

expect(timeSeriesCallback).toHaveBeenLastCalledWith({
dataStreams: [
expect.objectContaining({
id: DATA_STREAM.id,
name: someName,
dataType: someDataType,
meta: someMetaStuff,
}),
],
viewport: {
start: START,
end: END,
},
});
});

it('initiates a request for a data stream', () => {
const START = new Date(2000, 0, 0);
const END = new Date();
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/data-module/data-cache/dataReducer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,8 @@ it('merges data into existing data cache', () => {
isLoading: false,
isRefreshing: false,
id: ID,
dataType: 'NUMBER',
name: 'some name',
error: undefined,
dataCache: {
intervals: [[DATE_ONE, DATE_FOUR]],
Expand Down Expand Up @@ -855,6 +857,8 @@ describe('requests to different resolutions', () => {
isLoading: false,
isRefreshing: false,
error: undefined,
dataType: 'NUMBER',
name: 'some name',
requestHistory: [
{
start: NEW_FIRST_DATE,
Expand Down
14 changes: 8 additions & 6 deletions packages/core/src/data-module/data-cache/dataReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ export const dataReducer: Reducer<DataStreamsStore, AsyncActions> = (
}

case SUCCESS: {
const { id, data, first, last, requestInformation } = action.payload;
const streamStore = getDataStreamStore(id, data.resolution, state);
const { id, data: dataStream, first, last, requestInformation } = action.payload;
const streamStore = getDataStreamStore(id, dataStream.resolution, state);
// Updating request cache is a hack to deal with latest value update
// TODO: clean this to one single source of truth cache
const requestCache = streamStore != null ? streamStore.requestCache : EMPTY_CACHE;

// We always want data in ascending order in the cache
const sortedData = getDataPoints(data, data.resolution).sort((a, b) => a.x - b.x);
const sortedData = getDataPoints(dataStream, dataStream.resolution).sort((a, b) => a.x - b.x);

/**
* Based on the type of request, determine the actual range requested.
Expand Down Expand Up @@ -98,14 +98,16 @@ export const dataReducer: Reducer<DataStreamsStore, AsyncActions> = (

const existingRequestHistory = streamStore ? streamStore.requestHistory : [];

// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { data, aggregates, ...restOfDataStream } = dataStream;

return {
...state,
[id]: {
...state[id],
[data.resolution]: {
[dataStream.resolution]: {
...streamStore,
resolution: data.resolution,
id,
...restOfDataStream,
requestHistory: mergeHistoricalRequests(existingRequestHistory, {
start: intervalStart,
end: last,
Expand Down
14 changes: 14 additions & 0 deletions packages/core/src/data-module/data-cache/toDataStreams.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ const rawStore = {
}),
requestCache: EMPTY_CACHE,
requestHistory: [],
meta: { key: 1000 },
name: 'somedatastreamname',
dataType: 'NUMBER',
isLoading: false,
isRefreshing: false,
};
Expand Down Expand Up @@ -112,6 +115,17 @@ it('returns a single data stream containing all the available resolutions', () =
expect(stream.aggregates![MINUTE_IN_MS]).toEqual(NUMBER_STREAM_1.data);
});

it('appends additional information about dataStream that is cached', () => {
const [stream] = toDataStreams({
requestInformations: [{ ...ALARM_STREAM_INFO, resolution: '0' }],
dataStreamsStores: STORE_WITH_NUMBERS_ONLY,
});

expect(stream.dataType).toEqual(rawStore.dataType);
expect(stream.name).toEqual(rawStore.name);
expect(stream.meta).toEqual(rawStore.meta);
});

it('appends the refId from the request information', () => {
const REF_ID = 'some-ref-id';
const [stream] = toDataStreams({
Expand Down
12 changes: 7 additions & 5 deletions packages/core/src/data-module/data-cache/toDataStreams.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DataPoint } from '@synchro-charts/core';
import { DataStreamsStore } from './types';
import { DataStreamsStore, DataStreamStore } from './types';
import { isDefined } from '../../common/predicates';
import { DataStream, RequestInformation } from '../types';
import { parseDuration } from '../../common/time';
Expand Down Expand Up @@ -32,17 +32,19 @@ export const toDataStreams = ({
{}
);

const activeStore = streamsResolutions[parseDuration(info.resolution)];
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { dataCache, requestCache, requestHistory, ...restOfStream } = streamsResolutions[
parseDuration(info.resolution)
] as DataStreamStore;

const rawData: DataPoint[] = streamsResolutions[0] ? streamsResolutions[0].dataCache.items.flat() : [];

// Create new data stream for the corresponding info
return {
...restOfStream,
id: info.id,
refId: info.refId,
resolution: parseDuration(info.resolution),
isLoading: activeStore ? activeStore.isLoading : false,
isRefreshing: activeStore ? activeStore.isRefreshing : false,
error: activeStore ? activeStore.error : undefined,
data: rawData,
aggregates,
};
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/data-module/data-cache/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { DataPoint, Primitive } from '@synchro-charts/core';
import { IntervalStructure } from '../../common/intervalStructure';
import { ErrorDetails } from '../../common/types';
import { DataStream } from '../types';

type TTL = number;
export type TTLDurationMapping = {
Expand All @@ -27,7 +28,7 @@ export type DataStreamStore = {
// When data is being requested, whether or not data has been previously requested
isRefreshing: boolean;
error?: ErrorDetails;
};
} & Omit<DataStream, 'data' | 'aggregates'>;

export type DataStreamsStore = {
[dataStreamId: string]:
Expand Down
14 changes: 0 additions & 14 deletions packages/core/src/data-module/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,6 @@ export type DataSourceRequest<Query extends DataStreamQuery> = {
onError: ErrorCallback;
};

/**
* Subscribe to data streams
*
* Adds a subscription to the data-module.
* The data-module will ensure that the requested data is provided to the subscriber.
*/
type SubscribeToDataStreams<Query extends DataStreamQuery> = (
{ queries, request }: DataModuleSubscription<Query>,
callback: (data: TimeSeriesData) => void
) => {
unsubscribe: () => void;
update: (subscriptionUpdate: SubscriptionUpdate<Query>) => void;
};

export type StyleSettingsMap = { [refId: string]: BaseStyleSettings };

// Style settings sharable by all components
Expand Down
13 changes: 11 additions & 2 deletions packages/source-iotsitewise/src/time-series-data/provider.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { SiteWiseTimeSeriesDataProvider } from './provider';
import { TimeSeriesDataModule, DataSource, DataStream, MINUTE_IN_MS, DATA_STREAM } from '@iot-app-kit/core';
import {
TimeSeriesDataModule,
DataSource,
DataStream,
MINUTE_IN_MS,
DATA_STREAM,
OnSuccessCallback,
} from '@iot-app-kit/core';
import { createSiteWiseAssetDataSource } from '../asset-modules/asset-data-source';
import { DESCRIBE_ASSET_RESPONSE } from '../__mocks__/asset';
import { SiteWiseComponentSession } from '../component-session';
Expand All @@ -8,7 +15,9 @@ import { createMockSiteWiseSDK } from '../__mocks__/iotsitewiseSDK';
import { SiteWiseAssetModule } from '../asset-modules';

const createMockSource = (dataStreams: DataStream[]): DataSource<SiteWiseDataStreamQuery> => ({
initiateRequest: jest.fn(({ onSuccess }: { onSuccess: any }) => onSuccess(dataStreams)),
initiateRequest: jest.fn(({ onSuccess }: { onSuccess: OnSuccessCallback }) =>
onSuccess(dataStreams, { start: new Date(), resolution: '1m', end: new Date(), id: '123' }, new Date(), new Date())
),
getRequestsFromQuery: () => dataStreams.map((dataStream) => ({ id: dataStream.id, resolution: '0' })),
});

Expand Down

0 comments on commit 355f57e

Please sign in to comment.