Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose streaming query as (typed) AsyncIterable #426

Closed
jellelicht opened this issue Oct 31, 2022 · 4 comments
Closed

Expose streaming query as (typed) AsyncIterable #426

jellelicht opened this issue Oct 31, 2022 · 4 comments

Comments

@jellelicht
Copy link

jellelicht commented Oct 31, 2022

It would be pretty nifty to be able to use the for await (const datum of <something>) {} construction with streaming queries, with proper types.

Desired Behavior

const exampleQuery = sql.type(z.object({ bar: z.string() }))`SELECT 'foo' as bar`;
await pool.streamIterable(exampleQuery, async (s) => {
  for await (const { row } of s) {
    console.log(`This has typechecking: ${row.bar}`);
  }
}

Motivation

For slower computations where storing all data in memory at once (or fetching it over the network at once) is not feasible, AsyncIterables offer convenient data processing facilities, while still being typed.
The stream method offers similar facilities, but seems to not have any typing due to Readable not being Readable<T>.

Implementation

The most low-effort way to me seems to offer the possibility of StreamHandler to be an async function, in which case building my own asTypedIterable on top of this will be trivial.

One issue is that the user somehow has to make sure that the Readable is actually closed/destroyed. This is done automagically when one runs a for await (const _ of <readable>){}, but the Readable obviously does not get closed/destroyed if there is no such loop.
OTOH, this is also currently the case for the existing stream, so we might consider this a case of foot meet gun.

const exampleQuery = sql.type(z.object({ bar: z.string() }))`SELECT 'foo' as bar`;
await pool.stream(exampleQuery, (s) => {
  console.log('This prints');
});
console.log('This does not for at least a while');
@flevi29
Copy link

flevi29 commented Apr 25, 2023

I wonder about the safety of the following approach:

async function* streamMARCsByLibraryID(
    connection: DatabaseConnection,
    libraryId: number,
  ) {
    const stream = await new Promise<Readable>((resolve, reject) =>
      connection
        .stream(
          sql.unsafe`SELECT id, marc
                     FROM unitas_library.harvested_record
                     WHERE library_id = ${libraryId}`,
          (stream) => resolve(stream),
        )
        .catch((reason) => reject(reason)),
    );
    for await (const { row } of stream) {
      yield <{ id: string; marc: string }>row;
    }
    stream.destroy();
  }

@gajus
Copy link
Owner

gajus commented Sep 28, 2023

FYI now the stream results are typed.

@gajus gajus closed this as completed in 821851c Sep 29, 2023
@gajus
Copy link
Owner

gajus commented Sep 29, 2023

Typed AsyncIterable interface is now implemented.

@github-actions
Copy link
Contributor

🎉 This issue has been resolved in version 37.0.0 🎉

The release is available on:

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants