Skip to content

Commit 8f24580

Browse files
marcog83gobbimarTkDodoautofix-ci[bot]
authored
feat(query-core): add custom reducer support to streamedQuery (#9532)
* feat(query-core): add custom reducer support to streamedQuery Replace maxChunks parameter with flexible reducer function that delegates data aggregation to consumer code. This provides full control over how streamed chunks are combined into the final data structure. Add support for custom placeholderData that works seamlessly with the reducer function, allowing initialization of complex data types beyond simple arrays. #9065 BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery. Use a custom reducer function to control data aggregation behavior instead. * ci: apply automated fixes * feat(query-core): require initialValue when using custom reducer in streamedQuery Add type safety by making initialValue mandatory when providing a custom reducer function. This prevents runtime errors and ensures proper data initialization for custom data structures beyond simple arrays. Use conditional types to enforce the relationship between reducer and initialValue parameters, maintaining backward compatibility for simple array-based streaming while requiring explicit initialization for custom reducers. BREAKING CHANGE: When using a custom reducer function with streamedQuery, the initialValue parameter is now required and must be provided. * feat(query-core): require initialValue when using custom reducer in streamedQuery Add type safety by making initialValue mandatory when providing a custom reducer function. This prevents runtime errors and ensures proper data initialization for custom data structures beyond simple arrays. Use conditional types to enforce the relationship between reducer and initialValue parameters, maintaining backward compatibility for simple array-based streaming while requiring explicit initialization for custom reducers. BREAKING CHANGE: When using a custom reducer function with streamedQuery, the initialValue parameter is now required and must be provided. * feat(query-core): require initialValue when using custom reducer in streamedQuery Add type safety by making initialValue mandatory when providing a custom reducer function. This prevents runtime errors and ensures proper data initialization for custom data structures beyond simple arrays. Use conditional types to enforce the relationship between reducer and initialValue parameters, maintaining backward compatibility for simple array-based streaming while requiring explicit initialization for custom reducers. BREAKING CHANGE: When using a custom reducer function with streamedQuery, the initialValue parameter is now required and must be provided. * removed personal vscode workspace file * updated documentation * fix(docs): clarify reducer function description in streamedQuery documentation * ci: apply automated fixes * fix(tests): Code Review :: update streamedQuery tests to use correct initialValue type --------- Co-authored-by: gobbimar <marco.gobbi@adidas.com> Co-authored-by: Dominik Dorfmeister <office@dorfmeister.cc> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 43049c5 commit 8f24580

File tree

3 files changed

+84
-103
lines changed

3 files changed

+84
-103
lines changed

docs/reference/streamedQuery.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,13 @@ const query = queryOptions({
3333
- When set to `'reset'`, the query will erase all data and go back into `pending` state.
3434
- When set to `'append'`, data will be appended to existing data.
3535
- When set to `'replace'`, all data will be written to the cache once the stream ends.
36-
- `maxChunks?: number`
36+
- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
3737
- Optional
38-
- The maximum number of chunks to keep in the cache.
39-
- Defaults to `undefined`, meaning all chunks will be kept.
40-
- If `undefined` or `0`, the number of chunks is unlimited.
41-
- If the number of chunks exceeds this number, the oldest chunk will be removed.
38+
- Reduces streamed chunks (`TQueryFnData`) into the final data shape (`TData`).
39+
- Default: appends each chunk to the end of the accumulator when `TData` is an array.
40+
- If `TData` is not an array, you must provide a custom `reducer`.
41+
- `initialValue?: TData = TQueryFnData`
42+
- Optional
43+
- Defines the initial data to be used while the first chunk is being fetched.
44+
- It is mandatory when custom `reducer` is provided.
45+
- Defaults to an empty array.

packages/query-core/src/__tests__/streamedQuery.test.tsx

Lines changed: 30 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -391,13 +391,18 @@ describe('streamedQuery', () => {
391391
})
392392
})
393393

394-
test('should support maxChunks', async () => {
394+
test('should support custom reducer', async () => {
395395
const key = queryKey()
396+
396397
const observer = new QueryObserver(queryClient, {
397398
queryKey: key,
398399
queryFn: streamedQuery({
399-
queryFn: () => createAsyncNumberGenerator(3),
400-
maxChunks: 2,
400+
queryFn: () => createAsyncNumberGenerator(2),
401+
reducer: (acc, chunk) => ({
402+
...acc,
403+
[chunk]: true,
404+
}),
405+
initialValue: {} as Record<number, boolean>,
401406
}),
402407
})
403408

@@ -409,41 +414,34 @@ describe('streamedQuery', () => {
409414
data: undefined,
410415
})
411416

412-
await vi.advanceTimersByTimeAsync(50)
413-
414-
expect(observer.getCurrentResult()).toMatchObject({
415-
status: 'success',
416-
fetchStatus: 'fetching',
417-
data: [0],
418-
})
419-
420-
await vi.advanceTimersByTimeAsync(50)
421-
422-
expect(observer.getCurrentResult()).toMatchObject({
423-
status: 'success',
424-
fetchStatus: 'fetching',
425-
data: [0, 1],
426-
})
427-
428-
await vi.advanceTimersByTimeAsync(50)
417+
await vi.advanceTimersByTimeAsync(100)
429418

430419
expect(observer.getCurrentResult()).toMatchObject({
431420
status: 'success',
432421
fetchStatus: 'idle',
433-
data: [1, 2],
422+
data: {
423+
0: true,
424+
1: true,
425+
},
434426
})
435427

436428
unsubscribe()
437429
})
438430

439-
test('maxChunks with append refetch', async () => {
431+
test('should support custom reducer with initialValue', async () => {
440432
const key = queryKey()
441433
const observer = new QueryObserver(queryClient, {
442434
queryKey: key,
443435
queryFn: streamedQuery({
444-
queryFn: () => createAsyncNumberGenerator(3),
445-
maxChunks: 2,
446-
refetchMode: 'append',
436+
queryFn: () => createAsyncNumberGenerator(2),
437+
reducer: (acc, chunk) => ({
438+
...acc,
439+
[chunk]: true,
440+
}),
441+
initialValue: {
442+
10: true,
443+
11: true,
444+
} as Record<number, boolean>,
447445
}),
448446
})
449447

@@ -455,62 +453,17 @@ describe('streamedQuery', () => {
455453
data: undefined,
456454
})
457455

458-
await vi.advanceTimersByTimeAsync(50)
459-
460-
expect(observer.getCurrentResult()).toMatchObject({
461-
status: 'success',
462-
fetchStatus: 'fetching',
463-
data: [0],
464-
})
465-
466-
await vi.advanceTimersByTimeAsync(50)
467-
468-
expect(observer.getCurrentResult()).toMatchObject({
469-
status: 'success',
470-
fetchStatus: 'fetching',
471-
data: [0, 1],
472-
})
473-
474-
await vi.advanceTimersByTimeAsync(50)
475-
476-
expect(observer.getCurrentResult()).toMatchObject({
477-
status: 'success',
478-
fetchStatus: 'idle',
479-
data: [1, 2],
480-
})
481-
482-
void observer.refetch()
483-
484-
await vi.advanceTimersByTimeAsync(10)
485-
486-
expect(observer.getCurrentResult()).toMatchObject({
487-
status: 'success',
488-
fetchStatus: 'fetching',
489-
data: [1, 2],
490-
})
491-
492-
await vi.advanceTimersByTimeAsync(40)
493-
494-
expect(observer.getCurrentResult()).toMatchObject({
495-
status: 'success',
496-
fetchStatus: 'fetching',
497-
data: [2, 0],
498-
})
499-
500-
await vi.advanceTimersByTimeAsync(50)
501-
502-
expect(observer.getCurrentResult()).toMatchObject({
503-
status: 'success',
504-
fetchStatus: 'fetching',
505-
data: [0, 1],
506-
})
507-
508-
await vi.advanceTimersByTimeAsync(50)
456+
await vi.advanceTimersByTimeAsync(100)
509457

510458
expect(observer.getCurrentResult()).toMatchObject({
511459
status: 'success',
512460
fetchStatus: 'idle',
513-
data: [1, 2],
461+
data: {
462+
10: true,
463+
11: true,
464+
0: true,
465+
1: true,
466+
},
514467
})
515468

516469
unsubscribe()

packages/query-core/src/streamedQuery.ts

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,34 @@
11
import { addToEnd } from './utils'
22
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
33

4+
type BaseStreamedQueryParams<TQueryFnData, TQueryKey extends QueryKey> = {
5+
queryFn: (
6+
context: QueryFunctionContext<TQueryKey>,
7+
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
8+
refetchMode?: 'append' | 'reset' | 'replace'
9+
}
10+
11+
type SimpleStreamedQueryParams<
12+
TQueryFnData,
13+
TQueryKey extends QueryKey,
14+
> = BaseStreamedQueryParams<TQueryFnData, TQueryKey> & {
15+
reducer?: never
16+
initialValue?: never
17+
}
18+
19+
type ReducibleStreamedQueryParams<
20+
TQueryFnData,
21+
TData,
22+
TQueryKey extends QueryKey,
23+
> = BaseStreamedQueryParams<TQueryFnData, TQueryKey> & {
24+
reducer: (acc: TData, chunk: TQueryFnData) => TData
25+
initialValue: TData
26+
}
27+
28+
type StreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> =
29+
| SimpleStreamedQueryParams<TQueryFnData, TQueryKey>
30+
| ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey>
31+
432
/**
533
* This is a helper function to create a query function that streams data from an AsyncIterable.
634
* Data will be an Array of all the chunks received.
@@ -11,31 +39,29 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
1139
* Defaults to `'reset'`, erases all data and puts the query back into `pending` state.
1240
* Set to `'append'` to append new data to the existing data.
1341
* Set to `'replace'` to write all data to the cache once the stream ends.
14-
* @param maxChunks - The maximum number of chunks to keep in the cache.
15-
* Defaults to `undefined`, meaning all chunks will be kept.
16-
* If `undefined` or `0`, the number of chunks is unlimited.
17-
* If the number of chunks exceeds this number, the oldest chunk will be removed.
42+
* @param reducer - A function to reduce the streamed chunks into the final data.
43+
* Defaults to a function that appends chunks to the end of the array.
44+
* @param initialValue - Initial value to be used while the first chunk is being fetched.
1845
*/
1946
export function streamedQuery<
2047
TQueryFnData = unknown,
48+
TData = Array<TQueryFnData>,
2149
TQueryKey extends QueryKey = QueryKey,
2250
>({
2351
queryFn,
2452
refetchMode = 'reset',
25-
maxChunks,
26-
}: {
27-
queryFn: (
28-
context: QueryFunctionContext<TQueryKey>,
29-
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
30-
refetchMode?: 'append' | 'reset' | 'replace'
31-
maxChunks?: number
32-
}): QueryFunction<Array<TQueryFnData>, TQueryKey> {
53+
reducer = (items, chunk) =>
54+
addToEnd(items as Array<TQueryFnData>, chunk) as TData,
55+
initialValue = [] as TData,
56+
}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<
57+
TData,
58+
TQueryKey
59+
> {
3360
return async (context) => {
3461
const query = context.client
3562
.getQueryCache()
3663
.find({ queryKey: context.queryKey, exact: true })
3764
const isRefetch = !!query && query.state.data !== undefined
38-
3965
if (isRefetch && refetchMode === 'reset') {
4066
query.setState({
4167
status: 'pending',
@@ -45,7 +71,8 @@ export function streamedQuery<
4571
})
4672
}
4773

48-
let result: Array<TQueryFnData> = []
74+
let result = initialValue
75+
4976
const stream = await queryFn(context)
5077

5178
for await (const chunk of stream) {
@@ -55,19 +82,16 @@ export function streamedQuery<
5582

5683
// don't append to the cache directly when replace-refetching
5784
if (!isRefetch || refetchMode !== 'replace') {
58-
context.client.setQueryData<Array<TQueryFnData>>(
59-
context.queryKey,
60-
(prev = []) => {
61-
return addToEnd(prev, chunk, maxChunks)
62-
},
85+
context.client.setQueryData<TData>(context.queryKey, (prev) =>
86+
reducer(prev === undefined ? initialValue : prev, chunk),
6387
)
6488
}
65-
result = addToEnd(result, chunk, maxChunks)
89+
result = reducer(result, chunk)
6690
}
6791

6892
// finalize result: replace-refetching needs to write to the cache
6993
if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) {
70-
context.client.setQueryData<Array<TQueryFnData>>(context.queryKey, result)
94+
context.client.setQueryData<TData>(context.queryKey, result)
7195
}
7296

7397
return context.client.getQueryData(context.queryKey)!

0 commit comments

Comments
 (0)