Skip to content

Commit

Permalink
fix(source-iotsitewise): deduplicate batch requests (#629)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjuranek committed Mar 3, 2023
1 parent f5ca005 commit 0a5e8a1
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 6 deletions.
34 changes: 34 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions packages/source-iotsitewise/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,16 @@
"@synchro-charts/core": "7.2.0",
"dataloader": "^2.1.0",
"flush-promises": "^1.0.2",
"lodash.isequal": "^4.5.0",
"lodash.merge": "^4.6.2",
"lodash.uniqwith": "^4.5.0",
"rxjs": "^7.4.0",
"typescript": "4.4.4"
},
"devDependencies": {
"@types/jest": "^27.4.0",
"@types/lodash.merge": "^4.6.7",
"@types/lodash.uniqwith": "^4.5.7",
"jest": "^27.5.1",
"jest-extended": "^2.0.0",
"npm-watch": "^0.11.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import { isDefined } from '../../common/predicates';
import { AggregatedPropertyParams } from './client';
import { createEntryBatches, calculateNextBatchSize, shouldFetchNextBatch } from './batch';
import { RESOLUTION_TO_MS_MAPPING } from '../util/resolution';
import { deduplicateBatch } from '../util/deduplication';

type BatchAggregatedEntry = {
export type BatchAggregatedEntry = {
requestInformation: RequestInformationAndRange;
aggregateTypes: AggregateType[];
maxResults?: number;
Expand Down Expand Up @@ -56,7 +57,7 @@ const sendRequest = ({
client
.send(
new BatchGetAssetPropertyAggregatesCommand({
entries: batch.map((entry, entryIndex) => {
entries: deduplicateBatch(batch).map((entry, entryIndex) => {
const { requestInformation, aggregateTypes, onError, onSuccess, requestStart, requestEnd } = entry;
const { id, resolution } = requestInformation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import { toSiteWiseAssetProperty } from '../util/dataStreamId';
import { isDefined } from '../../common/predicates';
import { HistoricalPropertyParams } from './client';
import { createEntryBatches, calculateNextBatchSize, shouldFetchNextBatch } from './batch';
import { deduplicateBatch } from '../util/deduplication';

type BatchHistoricalEntry = {
export type BatchHistoricalEntry = {
requestInformation: RequestInformationAndRange;
maxResults?: number;
onError: ErrorCallback;
Expand Down Expand Up @@ -53,7 +54,7 @@ const sendRequest = ({
client
.send(
new BatchGetAssetPropertyValueHistoryCommand({
entries: batch.map((entry, entryIndex) => {
entries: deduplicateBatch(batch).map((entry, entryIndex) => {
const { requestInformation, onError, onSuccess, requestStart, requestEnd } = entry;
const { id } = requestInformation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import { toSiteWiseAssetProperty } from '../util/dataStreamId';
import { isDefined } from '../../common/predicates';
import { LatestPropertyParams } from './client';
import { createEntryBatches, shouldFetchNextBatch, NO_LIMIT_BATCH } from './batch';
import { deduplicateBatch } from '../util/deduplication';

type BatchLatestEntry = {
export type BatchLatestEntry = {
requestInformation: RequestInformationAndRange;
onError: ErrorCallback;
onSuccess: OnSuccessCallback;
Expand Down Expand Up @@ -51,7 +52,7 @@ const sendRequest = ({
client
.send(
new BatchGetAssetPropertyValueCommand({
entries: batch.map((entry, entryIndex) => {
entries: deduplicateBatch(batch).map((entry, entryIndex) => {
const { requestInformation, onError, onSuccess, requestStart, requestEnd } = entry;
const { id } = requestInformation;

Expand Down
120 changes: 120 additions & 0 deletions packages/source-iotsitewise/src/time-series-data/client/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,123 @@ describe('batch duration', () => {
expect(batchGetAssetPropertyValue).toBeCalledTimes(2);
});
});

describe('batch deduplication', () => {
beforeAll(() => {
jest.useFakeTimers('modern');
});

afterAll(() => {
jest.useRealTimers();
});

it('deduplicates duplicate requests', async () => {
const batchGetAssetPropertyValue = jest.fn().mockResolvedValue(BATCH_ASSET_PROPERTY_DOUBLE_VALUE);
const assetId = 'some-asset-id';
const propertyId = 'some-property-id';

const onSuccess = jest.fn();
const onError = jest.fn();

const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }));

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const resolution = '0';

const requestInformation = {
id: toId({ assetId, propertyId }),
start: startDate,
end: endDate,
resolution,
fetchMostRecentBeforeEnd: true,
};

// queue two requests in the same batch
client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});
client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});

// clear promise queue
await flushPromises();

// ensure latest requests are enqueued
jest.advanceTimersByTime(0);

// process the batch
expect(batchGetAssetPropertyValue).toBeCalledTimes(1);

// assert batch only had one entry
expect(batchGetAssetPropertyValue).toBeCalledWith({
entries: [{ assetId: 'some-asset-id', entryId: '0-0', propertyId: 'some-property-id' }],
nextToken: undefined,
});
});

it('does not deduplicate non-duplicate requests', async () => {
const batchGetAssetPropertyValue = jest.fn().mockResolvedValue(BATCH_ASSET_PROPERTY_DOUBLE_VALUE);

const onSuccess = jest.fn();
const onError = jest.fn();

const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }));

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const resolution = '0';

// queue two requests in the same batch
client.getLatestPropertyDataPoint({
requestInformations: [
{
id: toId({ assetId: '1', propertyId: '1' }),
start: startDate,
end: endDate,
resolution,
fetchMostRecentBeforeEnd: true,
},
],
onSuccess,
onError,
});

client.getLatestPropertyDataPoint({
requestInformations: [
{
id: toId({ assetId: '2', propertyId: '2' }),
start: startDate,
end: endDate,
resolution,
fetchMostRecentBeforeEnd: true,
},
],
onSuccess,
onError,
});

// clear promise queue
await flushPromises();

// ensure latest requests are enqueued
jest.advanceTimersByTime(0);

// process the batch
expect(batchGetAssetPropertyValue).toBeCalledTimes(1);

// assert batch only had both entries
expect(batchGetAssetPropertyValue).toBeCalledWith({
entries: [
{ assetId: '1', entryId: '0-0', propertyId: '1' },
{ assetId: '2', entryId: '0-1', propertyId: '2' },
],
nextToken: undefined,
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { deduplicateBatch } from './deduplication';

describe('deduplicateBatch', () => {
it('removes duplicate entries from a given batch', () => {
const entry = {
onError: jest.fn(),
onSuccess: jest.fn(),
requestEnd: new Date(2001, 0, 0),
requestStart: new Date(2000, 0, 0),
requestInformation: {
id: 'asset-property',
start: new Date(2001, 0, 0),
end: new Date(2000, 0, 0),
resolution: '0',
fetchMostRecentBeforeEnd: true,
},
};

const originalBatch = [entry, entry];
const deduplicatedBatch = [entry];

expect(deduplicateBatch(originalBatch)).toEqual(deduplicatedBatch);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import isEqual from 'lodash.isequal';
import uniqWith from 'lodash.uniqwith';
import { BatchAggregatedEntry } from '../client/batchGetAggregatedPropertyDataPoints';
import { BatchHistoricalEntry } from '../client/batchGetHistoricalPropertyDataPoints';
import { BatchLatestEntry } from '../client/batchGetLatestPropertyDataPoints';

export type Entry = BatchAggregatedEntry | BatchHistoricalEntry | BatchLatestEntry;

/**
* Given a batch, or array of entries, deduplicate the batch.
*/
export function deduplicateBatch<T extends Entry>(batch: T[]): T[] {
return uniqWith(batch, compareEntryUniqueness);
}

/**
* Compares one entry to another, determining if the first is unique.
*/
function compareEntryUniqueness(a: Entry, b: Entry) {
return isEqual(getEntryRequestInformation(a), getEntryRequestInformation(b));
}

/**
* Gets relevant request information off the entry.
*/
function getEntryRequestInformation(entry: Entry) {
const {
refId: _refId,
cacheSettings: _cacheSettings,
...requestInformationWithoutOmittedFields
} = entry.requestInformation;

return requestInformationWithoutOmittedFields;
}

0 comments on commit 0a5e8a1

Please sign in to comment.