Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question on Internals #8

Closed
ben-laird opened this issue May 5, 2024 · 2 comments
Closed

Question on Internals #8

ben-laird opened this issue May 5, 2024 · 2 comments

Comments

@ben-laird
Copy link

Hello!

I found your repo on JSR and I'm very impressed with it, especially with your use of Transform Streams. I started investigating using Transform Streams in my own work and I can't seem to get it working. Lmk if this issue should be moved somewhere else, I figured this was the best way to contact you.

Consider this test case
import { assertEquals } from "jsr:@std/assert";

class DuplexStream<T> extends TransformStream<T, T> {}

Deno.test("Streams", async (t) => {
  const stream = new DuplexStream<string>();

  const expected = [
    "Hello there",
    "Here's some string data",
    "All done",
  ];

  await t.step("Writing", async () => {
    const w = stream.writable.getWriter();

    for await (const element of expected) {
      await w.ready;
      // Error: Promise resolution is still pending but the event loop has already resolved.
      await w.write(element);
    }

    await w.ready;
    w.releaseLock();
  });

  await t.step("Reading", async () => {
    const r = stream.readable.values();

    const actual = await Array.fromAsync(r);

    assertEquals(actual, expected);
  });
});

When running deno test on the test case above, it responds with an error as soon as it hits await w.write(element);:

error: Promise resolution is still pending but the event loop has already resolved.

Refactoring to use a WritableStream only (test 2 below) or to pipe a ReadableStream through (test 3 below) solves the error, but these approaches are undesirable. What am I doing wrong here? I've tried looking for guides on Transform Streams and consulting MDN's docs on them, and nothing has been helpful. I'm curious to see how you avoided the same errors in your own repo, because it seems inevitable given you use only a TransformStream and don't use it as a pipe for any Readable Stream.

Test 2
import { assertEquals } from "jsr:@std/assert";

Deno.test("Streams 2", async (t) => {
  const collected: string[] = [];

  const stream = new WritableStream<string>({
    write(ch) {
      collected.push(ch);
    },
  });

  const expected = [
    "Hello there",
    "Here's some string data",
    "All done",
  ];

  await t.step("Writing", async () => {
    const w = stream.getWriter();

    for await (const element of expected) {
      await w.ready;
      await w.write(element);
    }

    await w.ready;
    w.releaseLock();
  });

  await t.step("Reading", () => {
    assertEquals(collected, expected);
  });
});
Test 3
import { assertEquals } from "jsr:@std/assert";

class DuplexStream<T> extends TransformStream<T, T> {}

Deno.test("Streams 3", async (t) => {
  const stream = new DuplexStream<string>();

  const expected = [
    "Hello there",
    "Here's some string data",
    "All done",
  ];

  await t.step("Writing & Reading", async () => {
    const r = ReadableStream.from(expected).pipeThrough(stream).values();

    const actual = await Array.fromAsync(r);

    assertEquals(actual, expected);
  });
});

Thanks so much for your time and for making such a high-quality library. I've starred this repo and hope to contribute to it when I can!

@crowlKats
Copy link
Member

Hey, so the issue is that you are writing contents to the stream, but not reading anything. basically, TransformStream doesnt buffer elements up, so writing and reading need to happen "simultaneously". the easiest way to achieve this in your first test is to not await the "Writing" step, but assigning it to a variable, and awaiting it after the "Reading" step. I know this might be a bit awkward and not ideal, but thats the only solution.

however, if you do want to buffer n-amount of elements, i believe you can try around with https://developer.mozilla.org/en-US/docs/Web/API/TransformStream/TransformStream#highwatermark (though not entirely sure, i rarely use this option and do not remember its inner workings from the top of my head)

@ben-laird
Copy link
Author

Hey! Sorry for the late reply. Your solution worked perfectly; using the third argument to TransformStream to provide a queuing strategy to the underlying ReadableStream was the right call. The promises I was awaiting were queued up for the TransformStream to process, but they were never settled, hence the error I got. It's also why your use of TransformStream works: there's a reader attached to the ReadableStream side immediately, negating the need for a queue.

Thanks so much again for your help, and I hope to offer my own help on this repo if you'd like and when I can. I'll close this as completed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants