Skip to content

Commit

Permalink
perf: improve raw batching
Browse files Browse the repository at this point in the history
  • Loading branch information
chejimmy authored and diehbria committed Jan 5, 2024
1 parent 743ca50 commit 263282d
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import type {
BatchGetAssetPropertyValueHistoryErrorEntry,
BatchGetAssetPropertyValueHistorySuccessEntry,
} from '@aws-sdk/client-iotsitewise';
import type { OnSuccessCallback, ErrorCallback, RequestInformationAndRange } from '@iot-app-kit/core';
import { type OnSuccessCallback, type ErrorCallback, type RequestInformationAndRange } from '@iot-app-kit/core';
import type { HistoricalPropertyParams } from './client';
import { withinLatestPropertyDataThreshold } from './withinLatestPropertyDataThreshold';

export type BatchHistoricalEntry = {
requestInformation: RequestInformationAndRange;
Expand Down Expand Up @@ -141,6 +142,21 @@ const batchGetHistoricalPropertyDataPointsForProperty = ({
.filter((batch) => batch.length > 0) // filter out empty batches
.map(([batch, maxResults], requestIndex) => sendRequest({ client, batch, maxResults, requestIndex }));

const shouldAcceptRequest = ({
end,
fetchFromStartToEnd,
fetchMostRecentBeforeStart,
fetchMostRecentBeforeEnd,
resolution,
}: RequestInformationAndRange) => {
return (
resolution === '0' &&
((!withinLatestPropertyDataThreshold(end) && fetchMostRecentBeforeEnd) ||
fetchFromStartToEnd ||
fetchMostRecentBeforeStart)
);
};

export const batchGetHistoricalPropertyDataPoints = ({
params,
client,
Expand All @@ -152,20 +168,20 @@ export const batchGetHistoricalPropertyDataPoints = ({

// fan out params into individual entries, handling fetchMostRecentBeforeStart
params.forEach(({ requestInformations, maxResults, onSuccess, onError }) => {
requestInformations
.filter(({ resolution }) => resolution === '0')
.forEach((requestInformation) => {
const { fetchMostRecentBeforeStart, start, end } = requestInformation;

entries.push({
requestInformation,
maxResults: fetchMostRecentBeforeStart ? 1 : maxResults,
onSuccess,
onError,
requestStart: fetchMostRecentBeforeStart ? new Date(0, 0, 0) : start,
requestEnd: fetchMostRecentBeforeStart ? start : end,
});
requestInformations.filter(shouldAcceptRequest).forEach((requestInformation) => {
const { fetchMostRecentBeforeStart, fetchMostRecentBeforeEnd, start, end } = requestInformation;

const isMostRecent = fetchMostRecentBeforeStart || fetchMostRecentBeforeEnd;

entries.push({
requestInformation,
maxResults: isMostRecent ? 1 : maxResults,
onSuccess,
onError,
requestStart: isMostRecent ? new Date(0, 0, 0) : start,
requestEnd: fetchMostRecentBeforeStart ? start : end,
});
});
});

// sort entries to ensure earliest data is fetched first because batch API has a property limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import type {
BatchGetAssetPropertyValueErrorEntry,
BatchGetAssetPropertyValueSuccessEntry,
} from '@aws-sdk/client-iotsitewise';
import type { OnSuccessCallback, ErrorCallback, RequestInformationAndRange } from '@iot-app-kit/core';
import { type OnSuccessCallback, type ErrorCallback, type RequestInformationAndRange } from '@iot-app-kit/core';
import type { LatestPropertyParams } from './client';
import { withinLatestPropertyDataThreshold } from './withinLatestPropertyDataThreshold';

export type BatchLatestEntry = {
requestInformation: RequestInformationAndRange;
Expand Down Expand Up @@ -133,7 +134,10 @@ export const batchGetLatestPropertyDataPoints = ({
// fan out params into individual entries, handling fetchMostRecentBeforeStart
params.forEach(({ requestInformations, onSuccess, onError }) => {
requestInformations
.filter(({ resolution, fetchMostRecentBeforeEnd }) => resolution === '0' && fetchMostRecentBeforeEnd)
.filter(
({ end, fetchMostRecentBeforeEnd, resolution }) =>
resolution === '0' && withinLatestPropertyDataThreshold(end) && fetchMostRecentBeforeEnd
)
.forEach((requestInformation) => {
const { end } = requestInformation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ describe('getLatestPropertyDataPoint', () => {
const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }));

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const endDate = new Date();
const resolution = '0';

const requestInformation1 = {
Expand Down Expand Up @@ -433,7 +433,7 @@ describe('getLatestPropertyDataPoint', () => {
const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }));

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const endDate = new Date();
const resolution = '0';

const requestInformation1 = {
Expand Down Expand Up @@ -795,7 +795,7 @@ describe('batch duration', () => {
const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }));

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const endDate = new Date();
const resolution = '0';

const requestInformation = {
Expand Down Expand Up @@ -859,7 +859,7 @@ describe('batch duration', () => {
const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }), { batchDuration: 100 });

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const endDate = new Date();
const resolution = '0';

const requestInformation = {
Expand Down Expand Up @@ -938,7 +938,7 @@ describe('batch deduplication', () => {
const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }));

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const endDate = new Date();
const resolution = '0';

const requestInformation = {
Expand Down Expand Up @@ -986,7 +986,7 @@ describe('batch deduplication', () => {
const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }));

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const endDate = new Date();
const resolution = '0';

// queue two requests in the same batch
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { RAW_DATA_RECENCY_THRESHOLD, withinLatestPropertyDataThreshold } from './withinLatestPropertyDataThreshold';

describe('withinLatestPropertyDataThreshold', () => {
test('returns true if the date is within the threshold', () => {
const date = new Date();

expect(withinLatestPropertyDataThreshold(date)).toBeTruthy();
});

test('returns false if the date is out of the threshold', () => {
const date = new Date();
date.setSeconds(date.getSeconds() + RAW_DATA_RECENCY_THRESHOLD);

expect(withinLatestPropertyDataThreshold(date)).toBeTruthy();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { SECOND_IN_MS } from '@iot-app-kit/core';

export const RAW_DATA_RECENCY_THRESHOLD = 10; // in seconds

/**
* Whether the given date is within the threshold to consider as a end date for a latest property request.
* @param date the date to consider
* @returns Whether the given date is within the threshold
*/
export const withinLatestPropertyDataThreshold = (date: Date) =>
Date.now() - date.getTime() < RAW_DATA_RECENCY_THRESHOLD * SECOND_IN_MS;
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ describe('initiateRequest', () => {
});

describe('fetch latest before end', () => {
const start = new Date(2000, 0, 0);
const end = new Date(2001, 0, 0);

it('gets latest value for multiple properties', async () => {
const batchGetAssetPropertyValueHistory = jest.fn().mockResolvedValue(BATCH_ASSET_PROPERTY_VALUE_HISTORY);
const batchGetAssetPropertyValue = jest.fn().mockResolvedValue(BATCH_ASSET_PROPERTY_DOUBLE_VALUE);
Expand All @@ -116,15 +119,15 @@ describe('initiateRequest', () => {
[
{
id: toId({ assetId: ASSET_ID, propertyId: PROPERTY_1 }),
start: new Date(),
end: new Date(),
start,
end,
resolution: '0',
fetchMostRecentBeforeEnd: true,
},
{
id: toId({ assetId: ASSET_ID, propertyId: PROPERTY_2 }),
start: new Date(),
end: new Date(),
start,
end,
resolution: '0',
fetchMostRecentBeforeEnd: true,
},
Expand All @@ -149,23 +152,6 @@ describe('initiateRequest', () => {
]),
})
);

expect(batchGetAssetPropertyValue).toBeCalledTimes(1);

expect(batchGetAssetPropertyValue).toBeCalledWith(
expect.objectContaining({
entries: expect.arrayContaining([
expect.objectContaining({
assetId: ASSET_ID,
propertyId: PROPERTY_2,
}),
expect.objectContaining({
assetId: ASSET_ID,
propertyId: PROPERTY_2,
}),
]),
})
);
});

it('gets latest value for multiple assets', async () => {
Expand Down Expand Up @@ -198,15 +184,15 @@ describe('initiateRequest', () => {
[
{
id: toId({ assetId: ASSET_1, propertyId: PROPERTY_1 }),
start: new Date(),
end: new Date(),
start,
end,
resolution: '0',
fetchMostRecentBeforeEnd: true,
},
{
id: toId({ assetId: ASSET_2, propertyId: PROPERTY_2 }),
start: new Date(),
end: new Date(),
start,
end,
resolution: '0',
fetchMostRecentBeforeEnd: true,
},
Expand All @@ -231,23 +217,6 @@ describe('initiateRequest', () => {
]),
})
);

expect(batchGetAssetPropertyValue).toBeCalledTimes(1);

expect(batchGetAssetPropertyValue).toBeCalledWith(
expect.objectContaining({
entries: expect.arrayContaining([
expect.objectContaining({
assetId: ASSET_1,
propertyId: PROPERTY_1,
}),
expect.objectContaining({
assetId: ASSET_2,
propertyId: PROPERTY_2,
}),
]),
})
);
});
});

Expand Down Expand Up @@ -595,10 +564,9 @@ describe('e2e through data-module', () => {

describe('fetching latest value', () => {
it('reports error occurred on request initiation', async () => {
const batchGetAssetPropertyValueHistory = jest.fn().mockResolvedValue(BATCH_ASSET_PROPERTY_VALUE_HISTORY);
const batchGetAssetPropertyValue = jest.fn().mockResolvedValue(BATCH_ASSET_PROPERTY_ERROR);

const mockSDK = createMockSiteWiseSDK({ batchGetAssetPropertyValueHistory, batchGetAssetPropertyValue });
const mockSDK = createMockSiteWiseSDK({ batchGetAssetPropertyValue });
const dataSource = createDataSource(mockSDK);
const dataModule = new TimeSeriesDataModule(dataSource);

Expand All @@ -614,7 +582,7 @@ describe('e2e through data-module', () => {
} as SiteWiseDataStreamQuery,
],
request: {
viewport: { start: new Date(2000, 0, 0), end: new Date(2000, 0, 0, 1) },
viewport: { start: new Date(2000, 0, 0), end: new Date() },
settings: { fetchMostRecentBeforeEnd: true, resolution: '0' },
},
},
Expand All @@ -623,7 +591,7 @@ describe('e2e through data-module', () => {

await flushPromises();

expect(timeSeriesCallback).toBeCalledTimes(3);
expect(timeSeriesCallback).toBeCalledTimes(2);
expect(timeSeriesCallback).toHaveBeenCalledWith(
expect.objectContaining({
dataStreams: [
Expand Down

0 comments on commit 263282d

Please sign in to comment.