Skip to content

Commit

Permalink
perf: increase batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
tracy-french authored and diehbria committed Dec 18, 2023
1 parent 38ef2be commit 1fefe81
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 151 deletions.
116 changes: 40 additions & 76 deletions packages/source-iotsitewise/src/time-series-data/client/batch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,32 @@
import {
createEntryBatches,
calculateNextBatchSize,
shouldFetchNextBatch,
MAX_BATCH_RESULTS,
MAX_BATCH_ENTRIES,
NO_LIMIT_BATCH,
} from './batch';
import { createEntryBatches, shouldFetchNextBatch } from './batch';

describe('createEntryBatches', () => {
it('buckets entries by maxResults for a given batch', () => {
const batches = createEntryBatches([
{
id: '1',
maxResults: undefined,
},
{
id: '2',
maxResults: 2000,
},
{
id: '3',
maxResults: 2000,
},
{
id: '4',
maxResults: 10,
},
{
id: '5',
maxResults: 2000,
},
]);
const batches = createEntryBatches(
[
{
id: '1',
maxResults: undefined,
},
{
id: '2',
maxResults: 2000,
},
{
id: '3',
maxResults: 2000,
},
{
id: '4',
maxResults: 10,
},
{
id: '5',
maxResults: 2000,
},
],
16
);

expect(batches).toEqual(
expect.arrayContaining([
Expand All @@ -41,7 +37,7 @@ describe('createEntryBatches', () => {
maxResults: undefined,
},
],
NO_LIMIT_BATCH,
-1,
],
[
[
Expand All @@ -58,7 +54,7 @@ describe('createEntryBatches', () => {
maxResults: 2000,
},
],
6000,
2000,
],
[
[
Expand All @@ -73,11 +69,12 @@ describe('createEntryBatches', () => {
);
});

it('chunks batches that exceed max entry size (16)', () => {
it('chunks batches that exceed max batch size', () => {
const entrySize = 2000;
const batchSize = 16;

const entries = [
...[...Array(MAX_BATCH_ENTRIES * 3)].map((_args, index) => ({
...[...Array(batchSize * 3)].map((_args, index) => ({
id: String(index),
maxResults: entrySize,
})),
Expand All @@ -87,63 +84,30 @@ describe('createEntryBatches', () => {
},
];

const batches = createEntryBatches(entries);
const batches = createEntryBatches(entries, 16);

expect(batches).toEqual(
expect.arrayContaining([
[entries.slice(0, MAX_BATCH_ENTRIES), MAX_BATCH_ENTRIES * entrySize],
[entries.slice(MAX_BATCH_ENTRIES, 2 * MAX_BATCH_ENTRIES), MAX_BATCH_ENTRIES * entrySize],
[entries.slice(MAX_BATCH_ENTRIES * 2, MAX_BATCH_ENTRIES * 3), MAX_BATCH_ENTRIES * entrySize],
[[entries[MAX_BATCH_ENTRIES * 3]], 10],
[entries.slice(0, batchSize), entrySize],
[entries.slice(batchSize, 2 * batchSize), entrySize],
[entries.slice(batchSize * 2, batchSize * 3), entrySize],
[[entries[batchSize * 3]], 10],
])
);
});

it('handles empty input', () => {
const batches = createEntryBatches([]);
const batches = createEntryBatches([], 16);
expect(batches).toEqual([]);
});
});

describe('calculateNextBatchSize', () => {
it('returns the correct max batch size for no limit batches', () => {
expect(calculateNextBatchSize({ maxResults: NO_LIMIT_BATCH, dataPointsFetched: 0 })).toBe(MAX_BATCH_RESULTS);
expect(calculateNextBatchSize({ maxResults: NO_LIMIT_BATCH, dataPointsFetched: 100000 })).toBe(MAX_BATCH_RESULTS);
});

it('returns the correct max batch size when specified and need to fetch more than MAX_BATCH_SIZE', () => {
expect(calculateNextBatchSize({ maxResults: MAX_BATCH_RESULTS * 3, dataPointsFetched: 0 })).toBe(MAX_BATCH_RESULTS);
});

it('returns the correct max batch size when specified and need to fetch less than MAX_BATCH SIZE', () => {
expect(calculateNextBatchSize({ maxResults: MAX_BATCH_RESULTS / 2, dataPointsFetched: 0 })).toBe(
MAX_BATCH_RESULTS / 2
);
expect(calculateNextBatchSize({ maxResults: MAX_BATCH_RESULTS, dataPointsFetched: MAX_BATCH_RESULTS / 2 })).toBe(
MAX_BATCH_RESULTS / 2
);
});
});

describe('shouldFetchNextBatch', () => {
it('returns true if next token exists and batch has no limit', () => {
expect(shouldFetchNextBatch({ nextToken: '123', maxResults: NO_LIMIT_BATCH, dataPointsFetched: 0 })).toBe(true);
expect(shouldFetchNextBatch({ nextToken: '123', maxResults: NO_LIMIT_BATCH, dataPointsFetched: 500000 })).toBe(
true
);
});

it('returns true if next token exists and there is still data that needs to be fetched', () => {
expect(shouldFetchNextBatch({ nextToken: '123', maxResults: 3000, dataPointsFetched: 0 })).toBe(true);
expect(shouldFetchNextBatch({ nextToken: '123', maxResults: 10000, dataPointsFetched: 9999 })).toBe(true);
});

it('returns false if next token exists but data points have already been fetched', () => {
expect(shouldFetchNextBatch({ nextToken: '123', maxResults: 3000, dataPointsFetched: 3000 })).toBe(false);
expect(shouldFetchNextBatch({ nextToken: '123', maxResults: 0, dataPointsFetched: 0 })).toBe(false);
it('returns true if next token exists', () => {
expect(shouldFetchNextBatch({ nextToken: '123' })).toBe(true);
});

it('returns false if next token does not exist', () => {
expect(shouldFetchNextBatch({ nextToken: undefined, maxResults: 3000, dataPointsFetched: 0 })).toBe(false);
expect(shouldFetchNextBatch({ nextToken: undefined })).toBe(false);
});
});
45 changes: 11 additions & 34 deletions packages/source-iotsitewise/src/time-series-data/client/batch.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// current maximum batch size when using batch APIs
export const MAX_BATCH_RESULTS = 4000;
// export const MAX_BATCH_RESULTS = 4000;

// current batch API entry limit
export const MAX_BATCH_ENTRIES = 16;
// export const MAX_BATCH_ENTRIES = 16;

// use -1 to represent a batch with no max result limit
export const NO_LIMIT_BATCH = -1;
Expand All @@ -14,7 +14,10 @@ export const NO_LIMIT_BATCH = -1;
* @param entries
* @returns buckets: [BatchHistoricalEntry[], number | undefined][]
*/
export const createEntryBatches = <T extends { maxResults?: number }>(entries: T[]): [T[], number][] => {
export const createEntryBatches = <T extends { maxResults?: number }>(
entries: T[],
batchSize: number
): [T[], number][] => {
const buckets: { [key: number]: T[] } = {};

entries.forEach((entry) => {
Expand All @@ -27,55 +30,29 @@ export const createEntryBatches = <T extends { maxResults?: number }>(entries: T
}
});

// chunk buckets that are larger than MAX_BATCH_ENTRIES
return Object.keys(buckets)
.map((key) => {
const maxEntryResults = Number(key);
const bucket = buckets[maxEntryResults];

return chunkBatch(bucket).map((chunk): [T[], number] => [
chunk,
maxEntryResults === NO_LIMIT_BATCH ? NO_LIMIT_BATCH : chunk.length * maxEntryResults,
]);
return chunkBatch(bucket, batchSize).map((chunk): [T[], number] => [chunk, maxEntryResults]);
})
.flat();
};

/**
* calculate the required size of the next batch
*/
export const calculateNextBatchSize = ({
maxResults,
dataPointsFetched,
}: {
maxResults: number;
dataPointsFetched: number;
}) => (maxResults === NO_LIMIT_BATCH ? MAX_BATCH_RESULTS : Math.min(maxResults - dataPointsFetched, MAX_BATCH_RESULTS));

/**
* check if batch still needs to be paginated.
*/
export const shouldFetchNextBatch = ({
nextToken,
maxResults,
dataPointsFetched,
}: {
nextToken: string | undefined;
maxResults: number;
dataPointsFetched?: number;
}) =>
!!nextToken &&
(maxResults === NO_LIMIT_BATCH ||
(dataPointsFetched !== null && dataPointsFetched !== undefined && dataPointsFetched < maxResults));
export const shouldFetchNextBatch = ({ nextToken }: { nextToken: string | undefined }) => !!nextToken;

/**
* chunk batches by MAX_BATCH_ENTRIES
*/
const chunkBatch = <T>(batch: T[]): T[][] => {
const chunkBatch = <T>(batch: T[], batchSize: number): T[][] => {
const chunks = [];

for (let i = 0; i < batch.length; i += MAX_BATCH_ENTRIES) {
chunks.push(batch.slice(i, i + MAX_BATCH_ENTRIES));
for (let i = 0; i < batch.length; i += batchSize) {
chunks.push(batch.slice(i, i + batchSize));
}

return chunks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { dataStreamFromSiteWise } from '../dataStreamFromSiteWise';
import { parseDuration } from '@iot-app-kit/core';
import { fromId } from '../util/dataStreamId';
import { isDefined } from '../../common/predicates';
import { createEntryBatches, calculateNextBatchSize, shouldFetchNextBatch } from './batch';
import { createEntryBatches, shouldFetchNextBatch } from './batch';
import { RESOLUTION_TO_MS_MAPPING } from '../util/resolution';
import { deduplicateBatch } from '../util/deduplication';
import type {
Expand All @@ -35,27 +35,26 @@ type BatchEntryCallbackCache = {
};
};

const BATCH_SIZE = 16;
const ENTRY_SIZE = 4_000;

const sendRequest = ({
client,
batch,
maxResults,
requestIndex, // used to create and regenerate (for paginating) a unique entryId
nextToken: prevToken,
dataPointsFetched = 0, // track number of data points fetched so far
}: {
client: IoTSiteWiseClient;
batch: BatchAggregatedEntry[];
maxResults: number;
requestIndex: number;
nextToken?: string;
dataPointsFetched?: number;
}) => {
// callback cache makes it convenient to capture request data in a closure.
// the cache exposes methods that only require batch response entry as an argument.
const callbackCache: BatchEntryCallbackCache = {};

const batchSize = calculateNextBatchSize({ maxResults, dataPointsFetched });

client
.send(
new BatchGetAssetPropertyAggregatesCommand({
Expand Down Expand Up @@ -109,7 +108,7 @@ const sendRequest = ({
timeOrdering: TimeOrdering.DESCENDING,
};
}),
maxResults: batchSize,
maxResults,
nextToken: prevToken,
})
)
Expand All @@ -122,17 +121,13 @@ const sendRequest = ({
errorEntries?.forEach((entry) => entry.entryId && callbackCache[entry.entryId]?.onError(entry));
successEntries?.forEach((entry) => entry.entryId && callbackCache[entry.entryId]?.onSuccess(entry));

// increment number of data points fetched
dataPointsFetched += batchSize;

if (shouldFetchNextBatch({ nextToken, maxResults, dataPointsFetched })) {
if (shouldFetchNextBatch({ nextToken })) {
sendRequest({
client,
batch,
maxResults,
requestIndex,
nextToken,
dataPointsFetched,
});
}
});
Expand All @@ -145,7 +140,7 @@ const batchGetAggregatedPropertyDataPointsForProperty = ({
client: IoTSiteWiseClient;
entries: BatchAggregatedEntry[];
}) =>
createEntryBatches<BatchAggregatedEntry>(entries)
createEntryBatches<BatchAggregatedEntry>(entries, BATCH_SIZE)
.filter((batch) => batch.length > 0) // filter out empty batches
.map(([batch, maxResults], requestIndex) => sendRequest({ client, batch, maxResults, requestIndex }));

Expand All @@ -159,15 +154,15 @@ export const batchGetAggregatedPropertyDataPoints = ({
const entries: BatchAggregatedEntry[] = [];

// fan out params into individual entries, handling fetchMostRecentBeforeStart
params.forEach(({ requestInformations, maxResults, onSuccess, onError }) => {
params.forEach(({ requestInformations, onSuccess, onError }) => {
requestInformations
.filter(({ resolution }) => resolution !== '0')
.forEach((requestInformation) => {
const { fetchMostRecentBeforeStart, start, end } = requestInformation;

entries.push({
requestInformation,
maxResults: fetchMostRecentBeforeStart ? 1 : maxResults,
maxResults: fetchMostRecentBeforeStart ? 1 : ENTRY_SIZE,
onSuccess,
onError,
requestStart: fetchMostRecentBeforeStart ? new Date(0, 0, 0) : start,
Expand Down

0 comments on commit 1fefe81

Please sign in to comment.