Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doc: node.js async integration patterns #305

Closed
estaub opened this issue Jul 10, 2020 · 7 comments
Closed

Doc: node.js async integration patterns #305

estaub opened this issue Jul 10, 2020 · 7 comments

Comments

@estaub
Copy link

estaub commented Jul 10, 2020

Some guidance and/or examples on use (or non-use!) with node.js would be helpful. Things like:

  • use with node callback idioms, streams, etc. I'd guess that async iterators and async generators should be used as shims, but I'm just guessing
  • cautions where important node.js functionality becomes hard to access

I'm just kicking the tires now - I haven't written a line of code, so I can't be more specific. But that's kind of the point!

@trxcllnt
Copy link
Member

I agree we should have better docs for these. At the moment, the tests are the best examples to reference.

  • fromNodeStream converts a node stream to an Ix.AsyncIterable
  • toNodeStream converts any AsyncIterable to a node stream, and accepts an optional ReadableOptions argument to control objectMode, highWaterMark, etc.
  • pipe can also accept a WritableStream directly, similar to a node ReadableStream, to support interop with existing node DuplexStream or TransformStream libraries

We don't yet have a method to create an AsyncIterable from a node stream via pipe(), for example:

import { PassThrough } from 'stream';
import { fromNodeStream } from 'ix/asynciterable/fromnodestream';

const stream = new PassThrough();
// this works
const asyncIter = fromNodeStream(stream);
// this doesn't work
// const asyncIter = stream.pipe(fromNodeStream());

For more real world examples, check out csv-to-arrow-js, arrow-to-parquet-js, and my fastify-arrow plugin 🎉.

@KilianKilmister
Copy link

@trxcllnt NodeJS has native ways for interactions between streams and unrelated async iterables.

  • the static Readable.from
  • any stream is an async iterable by default (implementing Symbol.asyncIterable)
  • the pipeline function (also correclty propagates errors and deals with backpressure)

These have worked great for me in the past (i find that nodejs streams can be a bit cumbersome to write, so i've just been writing async iterables specific for the usecase). What do the conversions of this module supply that the native ones don't?

@trxcllnt
Copy link
Member

@KilianKilmister I believe the Ix methods predate most of the node core ones, but they're complementary. Use whatever works for you!

@richardscarrott
Copy link

richardscarrott commented Jul 16, 2021

@trxcllnt I'm struggling to get ix to play nicely with node streams, e.g.

import { from } from 'ix/asynciterable';
import { fromNodeStream } from 'ix/asynciterable/fromnodestream';

let i = 100;
const counter = new Readable({
  objectMode: true,
  read() {
    setTimeout(() => {
      this.push({ i });
      if (i-- === 0) {
        this.push(null);
      }
    }, 10);
  },
});

const printer = new Transform({
  objectMode: true,
  write(chunk, encoding, callback) {
    setTimeout(() => {
      console.log(chunk);
      callback();
    }, 300);
  },
});

fromNodeStream(counter).pipe(printer);
// TypeError: as_1.as(...).toNodeStream is not a function
//     at AsyncIterableX.nodePipe (/usr/src/app/node_modules/ix/asynciterable/asynciterablex.js:70:39)
//     at main (/usr/src/app/dist/index.js:53:47)
//     at Object.<anonymous> (/usr/src/app/dist/index.js:76:1)
//     at Module._compile (internal/modules/cjs/loader.js:959:30)
//     at Object.Module._extensions..js (internal/modules/cjs/loader.js:995:10)
//     at Module.load (internal/modules/cjs/loader.js:815:32)
//     at Function.Module._load (internal/modules/cjs/loader.js:727:14)
//     at Function.Module.runMain (internal/modules/cjs/loader.js:1047:10)
//     at internal/main/run_main_module.js:17:11

// And the same happens when utilising the fact node's `Readable` is an async iterator:
from(counter).pipe(printer);

Am I missing something obvious here?

@richardscarrott
Copy link

richardscarrott commented Jul 16, 2021

Ok figured out my issue, I had to explicitly import (and log to avoid the TS compiler just removing the import) ix/asynciterable/tonodestream:

import { from } from 'ix/asynciterable';
import { toNodeStream } from 'ix/asynciterable/tonodestream';

console.log(toNodeStream); // NOTE: I'm not actually using this in my code

// ...

from(counter).pipe(printer);

I guess it mutates the prototype... is there a better practice for importing ix, perhaps in nodejs I need to import * as ix?

@trxcllnt
Copy link
Member

@richardscarrott does import 'ix/asynciterable/tonodestream' not work?

@richardscarrott
Copy link

@trxcllnt yes I think it does actually 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants