Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pausing a stream #135

Closed
nicholaswmin opened this issue Jan 21, 2017 · 12 comments
Closed

Pausing a stream #135

nicholaswmin opened this issue Jan 21, 2017 · 12 comments

Comments

@nicholaswmin
Copy link

nicholaswmin commented Jan 21, 2017

Pausing a stream() doesn't seem to have an effect - The parser proceeds reading the CSV as usual.

Take for example the following code:

const rs = fs.createReadStream(this.csvPath);
rs.pause();

let count = 0;
csv()
.fromStream(rs)
.on("json", (json) => {
  count++;
  console.log(count);
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})

count is logged 200 times (equal to the amount of rows in the CSV) - I was expecting it not to log anything since the stream is paused before passing it over to fromStream()

@Keyang
Copy link
Owner

Keyang commented Jan 22, 2017

Hi,
I dont understand why you want to pause the readstream while you passing into the converter. Generally you should pass into the converter when you need to convert the source.

In node.js when readstream is pipe into a downstream it is by default calling readstream.resume() so this will override readstream.pause() if you pass into csv().fromStream(rs)
see here

you will need to change your code into following:

const rs = fs.createReadStream(this.csvPath);

let count = 0;
csv()
.fromStream(rs)
.on("json", (json) => {
  count++;
  console.log(count);
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})
process.nextTick(()=>{
  rs.pause();
})

again, if you dont want parse the readstream that time, you just dont need call the parser.

~Keyang

@nicholaswmin
Copy link
Author

nicholaswmin commented Jan 22, 2017

I've got a use case where I need to process a huge (>2 million rows) CSV and insert it into a DB.

To do this without running into memory issues, I intend to process the CSV as a stream, pausing the stream every 10000 rows, inserting the rows in my DB and then resuming the stream.

@nicholaswmin
Copy link
Author

nicholaswmin commented Jan 22, 2017

Apologies, the example I gave above was a bit too cut down.

So this,

process.nextTick(()=>{
  rs.pause();
})

would pause the stream right before it starts.

What I want to do is something like this, where I pause and resume within the json event listener;

.on("json", (json) => {
  rows.push(json);
  // for every 10, 000 rows - pause the stream,
  // asynchronously save to DB, and then resume the stream
  if (rows.length % 10000 === 0) {
    rs.pause();
    this.saveToDb(rows, () => {
      rs.resume();
      rows = [];
    })
  }
})

@Keyang
Copy link
Owner

Keyang commented Jan 22, 2017 via email

@nicholaswmin
Copy link
Author

nicholaswmin commented Jan 22, 2017

Anyway, if you want to do the way you mentioned, it is possible as well.
You can pause the readable stream every 10000 rows and resume it once they
are processed.

Thanks a million - calling rs.pause() from within .on("json") doesn't seem to have an effect - the stream just continues flowing

@Keyang
Copy link
Owner

Keyang commented Jan 22, 2017 via email

@nicholaswmin
Copy link
Author

nicholaswmin commented Jan 22, 2017

Thanks,

unpipe seems to keep emitting json 4 times after it's called. Does it have an immediate effect?

It looks like when unpipe is called, some json events are still being emitted.

.on("json", (json) => {
  rows.push(json);
  console.log(rows.length);
  if (rows.length % 1000 === 0) {
    console.log("unpiping");
    rs.unpipe();
    this._insertEntries(db, rows, ()=> {
      rs.pipe(csvParser);
      rows = [];
    });
  }
})

And here's some console.log()'s

996
997
998
999
1000
unpiping
1001
1002
1003
1004
1
2
3
4

@nicholaswmin
Copy link
Author

nicholaswmin commented Jan 22, 2017

However, this works:

.on("json", (json) => {
  rows.push(json);
  if (rows.length % 1000 === 0) {
    rs.unpipe();
    // clear `rows` right after `unpipe`
    const entries = rows;
    rows = [];
    this._insertEntries(db, entries, ()=> {
      rs.pipe(csvParser);
    });
  }
})

@Keyang
Copy link
Owner

Keyang commented Jan 22, 2017 via email

@nicholaswmin
Copy link
Author

nicholaswmin commented Jan 23, 2017

Got it - thanks.

Are there any plans to make "pausing" a bit more easy? I'm guessing that what I'm trying to do here is a common task.

I've got it working with your suggestions, but it feels a tad hackish.

@Keyang
Copy link
Owner

Keyang commented Jan 23, 2017 via email

@nicholaswmin
Copy link
Author

Nice - that looks much cleaner. Thanks again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants