Skip to content

Commit

Permalink
feat: expose ttlDurationMapping as data module configuration (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
boweihan committed Dec 16, 2021
1 parent 1dad2b6 commit 88c0fcb
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 26 deletions.
96 changes: 90 additions & 6 deletions packages/core/src/data-module/IotAppKitDataModule.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { DataSource, DataSourceRequest } from './types.d';
import { DataPoint, DataStream, DataStreamInfo, Resolution } from '@synchro-charts/core';
import { Request } from './data-cache/requestTypes';
import { DataStreamsStore, DataStreamStore } from './data-cache/types';
import { EMPTY_CACHE } from './data-cache/caching/caching';
import * as caching from './data-cache/caching/caching';
import { createSiteWiseLegacyDataSource } from '../data-sources/site-wise-legacy/data-source';
import { MINUTE_IN_MS, MONTH_IN_MS, SECOND_IN_MS } from '../common/time';
import { HOUR_IN_MS, MINUTE_IN_MS, MONTH_IN_MS, SECOND_IN_MS } from '../common/time';
import { IotAppKitDataModule } from './IotAppKitDataModule';

import { SITEWISE_DATA_SOURCE } from '../data-sources/site-wise/data-source';
Expand All @@ -16,6 +16,8 @@ import { toDataStreamId, toSiteWiseAssetProperty } from '../data-sources/site-wi

import Mock = jest.Mock;

const { EMPTY_CACHE } = caching;

const { propertyId: PROPERTY_ID, assetId: ASSET_ID } = toSiteWiseAssetProperty(DATA_STREAM.id);

const DATA_STREAM_QUERY: SiteWiseDataStreamQuery = {
Expand Down Expand Up @@ -337,7 +339,7 @@ describe('error handling', () => {
it('provides a data stream which has an error associated with it on initial subscription', () => {
const customSource = createMockSiteWiseDataSource([DATA_STREAM]);

const dataModule = new IotAppKitDataModule(CACHE_WITH_ERROR);
const dataModule = new IotAppKitDataModule({ initialDataCache: CACHE_WITH_ERROR });
const dataStreamCallback = jest.fn();

dataModule.registerDataSource(customSource);
Expand All @@ -357,7 +359,7 @@ describe('error handling', () => {
it('does not re-request a data stream with an error associated with it', async () => {
const customSource = createMockSiteWiseDataSource([DATA_STREAM]);

const dataModule = new IotAppKitDataModule(CACHE_WITH_ERROR);
const dataModule = new IotAppKitDataModule({ initialDataCache: CACHE_WITH_ERROR });
const dataStreamCallback = jest.fn();

dataModule.registerDataSource(customSource);
Expand All @@ -378,7 +380,7 @@ describe('error handling', () => {
it('does request a data stream which has no error associated with it', () => {
const customSource = createMockSiteWiseDataSource([DATA_STREAM]);

const dataModule = new IotAppKitDataModule(CACHE_WITHOUT_ERROR);
const dataModule = new IotAppKitDataModule({ initialDataCache: CACHE_WITHOUT_ERROR });

const dataStreamCallback = jest.fn();

Expand All @@ -398,6 +400,14 @@ describe('error handling', () => {
});

describe('caching', () => {
beforeAll(() => {
jest.useFakeTimers('modern');
});

afterAll(() => {
jest.useRealTimers();
});

it('does not request already cached data', () => {
const dataModule = new IotAppKitDataModule();
const dataSource = createMockSiteWiseDataSource([DATA_STREAM]);
Expand Down Expand Up @@ -504,6 +514,81 @@ describe('caching', () => {
},
]);
});

it('requests already cached data if the default TTL has expired', async () => {
const dataModule = new IotAppKitDataModule();
const dataSource = createMockSiteWiseDataSource([DATA_STREAM]);
dataModule.registerDataSource(dataSource);

const END_1 = new Date();
const START_1 = new Date(END_1.getTime() - HOUR_IN_MS);

const dataStreamCallback = jest.fn();
const { update } = dataModule.subscribeToDataStreams(
{
query: DATA_STREAM_QUERY,
requestInfo: { viewport: { start: START_1, end: END_1 }, onlyFetchLatestValue: false },
},
dataStreamCallback
);

(dataSource.initiateRequest as Mock).mockClear();

jest.advanceTimersByTime(MINUTE_IN_MS);

update({ requestInfo: { viewport: { start: START_1, end: END_1 }, onlyFetchLatestValue: false } });

expect(dataSource.initiateRequest).toBeCalledWith(expect.any(Object), [
{
id: DATA_STREAM_INFO.id,
resolution: DATA_STREAM_INFO.resolution,
// 1 minute time advancement invalidates 3 minutes of cache by default, which is 2 minutes from END_1
start: new Date(END_1.getTime() - 2 * MINUTE_IN_MS),
end: END_1,
},
]);
});

it('requests already cached data if custom TTL has expired', async () => {
const customCacheSettings = {
ttlDurationMapping: {
[MINUTE_IN_MS]: 0,
[5 * MINUTE_IN_MS]: 30 * SECOND_IN_MS,
},
};

const dataModule = new IotAppKitDataModule({ cacheSettings: customCacheSettings });
const dataSource = createMockSiteWiseDataSource([DATA_STREAM]);
dataModule.registerDataSource(dataSource);

const END_1 = new Date();
const START_1 = new Date(END_1.getTime() - HOUR_IN_MS);

const dataStreamCallback = jest.fn();
const { update } = dataModule.subscribeToDataStreams(
{
query: DATA_STREAM_QUERY,
requestInfo: { viewport: { start: START_1, end: END_1 }, onlyFetchLatestValue: false },
},
dataStreamCallback
);

(dataSource.initiateRequest as Mock).mockClear();

jest.advanceTimersByTime(MINUTE_IN_MS);

update({ requestInfo: { viewport: { start: START_1, end: END_1 }, onlyFetchLatestValue: false } });

expect(dataSource.initiateRequest).toBeCalledWith(expect.any(Object), [
{
id: DATA_STREAM_INFO.id,
resolution: DATA_STREAM_INFO.resolution,
// 1 minute time advancement invalidates 5 minutes of cache with custom mapping, which is 4 minutes from END_1
start: new Date(END_1.getTime() - 4 * MINUTE_IN_MS),
end: END_1,
},
]);
});
});

describe('request scheduler', () => {
Expand Down Expand Up @@ -693,4 +778,3 @@ it('requests data range with buffer', () => {

unsubscribe();
});

27 changes: 25 additions & 2 deletions packages/core/src/data-module/IotAppKitDataModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,28 @@ import {
Subscription,
SubscriptionUpdate,
} from './types.d';
import { DataStreamsStore } from './data-cache/types';
import { DataStreamsStore, CacheSettings } from './data-cache/types';
import DataSourceStore from './data-source-store/dataSourceStore';
import { SubscriptionResponse } from '../data-sources/site-wise/types.d';
import { DataCache } from './data-cache/dataCacheWrapped';
import { Request } from './data-cache/requestTypes';
import { requestRange } from './data-cache/requestRange';
import { getDateRangesToRequest } from './data-cache/caching/caching';
import { viewportEndDate, viewportStartDate } from '../common/viewport';
import { MINUTE_IN_MS, SECOND_IN_MS } from '../common/time';

export const DEFAULT_CACHE_SETTINGS = {
ttlDurationMapping: {
[1.2 * MINUTE_IN_MS]: 0,
[3 * MINUTE_IN_MS]: 30 * SECOND_IN_MS,
[20 * MINUTE_IN_MS]: 5 * MINUTE_IN_MS,
},
};

interface IotAppKitDataModuleConfiguration {
initialDataCache?: DataStreamsStore;
cacheSettings?: Partial<CacheSettings>;
}

export class IotAppKitDataModule implements DataModule {
private dataCache: DataCache;
Expand All @@ -26,13 +40,21 @@ export class IotAppKitDataModule implements DataModule {

private dataSourceStore = new DataSourceStore();

private cacheSettings: CacheSettings;

/**
* Create a new data module, optionally with a pre-hydrated data cache.
*
*/
constructor(initialDataCache?: DataStreamsStore) {
constructor(configuration: IotAppKitDataModuleConfiguration = {}) {
const { initialDataCache, cacheSettings } = configuration;

this.dataCache = new DataCache(initialDataCache);
this.subscriptions = new SubscriptionStore({ dataSourceStore: this.dataSourceStore, dataCache: this.dataCache });
this.cacheSettings = {
...DEFAULT_CACHE_SETTINGS,
...cacheSettings,
};
}

public registerDataSource = this.dataSourceStore.registerDataSource;
Expand Down Expand Up @@ -81,6 +103,7 @@ export class IotAppKitDataModule implements DataModule {
end: adjustedEnd,
resolution,
dataStreamId: id,
cacheSettings: this.cacheSettings,
});

return {
Expand Down
21 changes: 21 additions & 0 deletions packages/core/src/data-module/data-cache/caching/caching.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
getDateRangesToRequest,
unexpiredCacheIntervals,
} from './caching';
import { DEFAULT_CACHE_SETTINGS } from '../../IotAppKitDataModule';
import { HOUR_IN_MS, MINUTE_IN_MS, SECOND_IN_MS } from '../../../common/time';
import { DataStreamsStore } from '../types';
import { IntervalStructure } from '../../../common/intervalStructure';
Expand Down Expand Up @@ -66,6 +67,7 @@ describe('getDateRangesToRequest', () => {
dataStreamId: STREAM_ID,
start: REQUESTED_START,
end: REQUESTED_END,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([
[REQUESTED_START, CACHED_START],
Expand Down Expand Up @@ -107,6 +109,7 @@ describe('getDateRangesToRequest', () => {
dataStreamId: STREAM_ID,
start: REQUESTED_START,
end: REQUESTED_END,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([
[REQUESTED_START, CACHED_START],
Expand Down Expand Up @@ -150,6 +153,7 @@ describe('getDateRangesToRequest', () => {
dataStreamId: STREAM_ID,
start: REQUESTED_START,
end: REQUESTED_END,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([[REQUESTED_START, REQUESTED_END]]);
});
Expand All @@ -165,6 +169,7 @@ describe('getDateRangesToRequest', () => {
dataStreamId: STREAM_ID,
start: START_DATE,
end: END_DATE,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([[START_DATE, END_DATE]]);
});
Expand Down Expand Up @@ -194,6 +199,7 @@ describe('getDateRangesToRequest', () => {
dataStreamId: STREAM_ID,
start: START_DATE,
end: END_DATE,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([[START_DATE, END_DATE]]);
});
Expand Down Expand Up @@ -221,6 +227,7 @@ describe('getDateRangesToRequest', () => {
start: new Date(2000, 0, 0),
end: new Date(2001, 0, 0),
resolution: 0,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([[new Date(2000, 0, 6), new Date(2001, 0, 0)]]);
});
Expand Down Expand Up @@ -248,6 +255,7 @@ describe('getDateRangesToRequest', () => {
start: new Date(2000, 0, 0),
end: new Date(2001, 0, 0),
resolution: 0,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([]);
});
Expand All @@ -262,6 +270,7 @@ describe('getDateRangesToRequest', () => {
start: START_DATE,
end: END_DATE,
resolution: 0,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([]);
});
Expand All @@ -276,6 +285,7 @@ describe('getDateRangesToRequest', () => {
start: START_DATE,
end: END_DATE,
resolution: 0,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toHaveLength(1);
});
Expand All @@ -290,6 +300,7 @@ describe('getDateRangesToRequest', () => {
start: START_DATE,
end: END_DATE,
resolution: 0,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).not.toEqual([]);
});
Expand Down Expand Up @@ -319,6 +330,7 @@ describe('getDateRangesToRequest', () => {
start: START_DATE,
end: END_DATE,
resolution: 0,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([]);
});
Expand Down Expand Up @@ -349,6 +361,7 @@ describe('getDateRangesToRequest', () => {
dataStreamId: STREAM_ID,
start: START_DATE,
end: END_DATE,
cacheSettings: DEFAULT_CACHE_SETTINGS,
})
).toEqual([[START_DATE, END_DATE]]);
});
Expand Down Expand Up @@ -921,6 +934,7 @@ describe('checkCacheForRecentPoint', () => {
dataStreamId: STREAM_ID,
resolution: RESOLUTION,
start: new Date(1621879612500),
cacheSettings: DEFAULT_CACHE_SETTINGS,
});
expect(presentInCache).toBeFalse();
});
Expand Down Expand Up @@ -971,6 +985,7 @@ describe('checkCacheForRecentPoint', () => {
dataStreamId: STREAM_ID,
resolution: RESOLUTION,
start: new Date(1621880261010),
cacheSettings: DEFAULT_CACHE_SETTINGS,
});

expect(presentInCache).toBeTrue();
Expand Down Expand Up @@ -1022,6 +1037,7 @@ describe('checkCacheForRecentPoint', () => {
dataStreamId: STREAM_ID,
resolution: RESOLUTION,
start: new Date(1621880261010),
cacheSettings: DEFAULT_CACHE_SETTINGS,
});

expect(presentInCache).toBeFalse();
Expand All @@ -1040,6 +1056,7 @@ describe('checkCacheForRecentPoint', () => {
dataStreamId: STREAM_ID,
resolution: RESOLUTION,
start: new Date(1621879662000),
cacheSettings: DEFAULT_CACHE_SETTINGS,
});

expect(presentInCache).toBeFalse();
Expand Down Expand Up @@ -1073,6 +1090,7 @@ describe('checkCacheForRecentPoint', () => {
dataStreamId: STREAM_ID,
resolution: RESOLUTION,
start: new Date(1621879661000),
cacheSettings: DEFAULT_CACHE_SETTINGS,
});

expect(presentInCache).toBeTrue();
Expand Down Expand Up @@ -1110,6 +1128,7 @@ describe('checkCacheForRecentPoint', () => {
dataStreamId: STREAM_ID,
resolution: RESOLUTION,
start: new Date(14),
cacheSettings: DEFAULT_CACHE_SETTINGS,
});

expect(presentInCache).toBeTrue();
Expand Down Expand Up @@ -1147,6 +1166,7 @@ describe('checkCacheForRecentPoint', () => {
dataStreamId: STREAM_ID,
resolution: RESOLUTION,
start: new Date(13),
cacheSettings: DEFAULT_CACHE_SETTINGS,
});

expect(presentInCache).toBeFalse();
Expand Down Expand Up @@ -1184,6 +1204,7 @@ describe('checkCacheForRecentPoint', () => {
dataStreamId: STREAM_ID,
resolution: RESOLUTION,
start: new Date(16),
cacheSettings: DEFAULT_CACHE_SETTINGS,
});

expect(presentInCache).toBeFalse();
Expand Down

0 comments on commit 88c0fcb

Please sign in to comment.