diff --git a/api/src/DuckDBResult.ts b/api/src/DuckDBResult.ts index 2f749fb3..52c5c892 100644 --- a/api/src/DuckDBResult.ts +++ b/api/src/DuckDBResult.ts @@ -239,4 +239,71 @@ export class DuckDBResult { public async getRowObjectsJson(): Promise[]> { return this.convertRowObjects(JsonDuckDBValueConverter); } + + public async *[Symbol.asyncIterator](): AsyncIterableIterator { + while (true) { + const chunk = await this.fetchChunk(); + if (chunk && chunk.rowCount > 0) { + yield chunk; + } else { + break; + } + } + } + + public async *yieldRows(): AsyncIterableIterator { + for await (const chunk of this) { + yield getRowsFromChunks([chunk]); + } + } + + public async *yieldRowObjects(): AsyncIterableIterator< + Record[] + > { + const deduplicatedColumnNames = this.deduplicatedColumnNames(); + for await (const chunk of this) { + yield getRowObjectsFromChunks([chunk], deduplicatedColumnNames); + } + } + + public async *yieldConvertedRows( + converter: DuckDBValueConverter + ): AsyncIterableIterator<(T | null)[][]> { + for await (const chunk of this) { + yield convertRowsFromChunks([chunk], converter); + } + } + + public async *yieldConvertedRowObjects( + converter: DuckDBValueConverter + ): AsyncIterableIterator[]> { + const deduplicatedColumnNames = this.deduplicatedColumnNames(); + for await (const chunk of this) { + yield convertRowObjectsFromChunks( + [chunk], + deduplicatedColumnNames, + converter, + ); + } + } + + public yieldRowsJs(): AsyncIterableIterator { + return this.yieldConvertedRows(JSDuckDBValueConverter); + } + + public yieldRowsJson(): AsyncIterableIterator { + return this.yieldConvertedRows(JsonDuckDBValueConverter); + } + + public yieldRowObjectJs(): AsyncIterableIterator< + Record[] + > { + return this.yieldConvertedRowObjects(JSDuckDBValueConverter); + } + + public yieldRowObjectJson(): AsyncIterableIterator< + Record[] + > { + return this.yieldConvertedRowObjects(JsonDuckDBValueConverter); + } } diff --git a/api/test/api.test.ts b/api/test/api.test.ts index 9accbb98..7e4b9533 100644 --- a/api/test/api.test.ts +++ b/api/test/api.test.ts @@ -2445,4 +2445,129 @@ ORDER BY name assert.equal(quotedIdentifier('table name'), '"table name"'); }); }); + + test("iterate over DuckDBResult stream in chunks", async () => { + await withConnection(async (connection) => { + const result = await connection.stream( + "select i::int, i::int + 10, (i + 100)::varchar from range(3) t(i)", + ); + + for await (const chunk of result) { + assert.strictEqual(chunk.rowCount, 3); + let i = 0; + assertValues( + chunk, + i++, + DuckDBIntegerVector, + [0, 1, 2], + ); + assertValues( + chunk, + i++, + DuckDBIntegerVector, + [10, 11, 12], + ); + assertValues( + chunk, + i++, + DuckDBVarCharVector, + ["100", "101", "102"], + ); + } + }); + }); + + test("iterate over many DuckDBResult chunks", async () => { + await withConnection(async (connection) => { + const chunkSize = 2048; + const totalExpectedCount = chunkSize * 3; + const result = await connection.stream( + `select i::int from range(${totalExpectedCount}) t(i)` + ); + + let total = 0; + for await (const chunk of result) { + assert.equal(chunk.rowCount, chunkSize); + total += chunk.rowCount; + } + + assert.equal(total, totalExpectedCount); + }); + }); + + test("iterate stream of rows", async () => { + await withConnection(async (connection) => { + const result = await connection.stream( + "select i::int, i::int + 10, (i + 100)::varchar from range(3) t(i)", + ); + + const expectedRows: DuckDBValue[][] = [ + [0, 10, '100'], + [1, 11, '101'], + [2, 12, '102'] + ]; + + for await (const rows of result.yieldRows()) { + for (let i = 0; i < rows.length; i++) { + assert.deepEqual(rows[i], expectedRows[i]); + } + } + }); + }); + + test("iterate stream of row objects", async () => { + await withConnection(async (connection) => { + const result = await connection.stream( + "select i::int as a, i::int + 10 as b, (i + 100)::varchar as c from range(3) t(i)", + ); + + const expectedRows: Record[] = [ + { a: 0, b: 10, c: '100'}, + { a: 1, b: 11, c: '101'}, + { a: 2, b: 12, c: '102'} + ]; + + for await (const rows of result.yieldRowObjects()) { + for (let i = 0; i < rows.length; i++) { + assert.deepEqual(rows[i], expectedRows[i]); + } + } + }); + }); + + test("iterate result stream rows js", async () => { + await withConnection(async (connection) => { + const result = await connection.stream(createTestJSQuery()); + for await (const row of result.yieldRowsJs()) { + assert.deepEqual(row, createTestJSRowsJS()); + } + }); + }); + + test("iterate result stream object js", async () => { + await withConnection(async (connection) => { + const result = await connection.stream(createTestJSQuery()); + for await (const row of result.yieldRowObjectJs()) { + assert.deepEqual(row, createTestJSRowObjectsJS()); + } + }); + }); + + test("iterate result stream rows json", async () => { + await withConnection(async (connection) => { + const result = await connection.stream(`from test_all_types()`); + for await (const row of result.yieldRowsJson()) { + assert.deepEqual(row, createTestAllTypesRowsJson()); + } + }); + }); + + test("iterate result stream object json", async () => { + await withConnection(async (connection) => { + const result = await connection.stream(`from test_all_types()`); + for await (const row of result.yieldRowObjectJson()) { + assert.deepEqual(row, createTestAllTypesRowObjectsJson()); + } + }); + }); });