Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deadlock when using batch with flatMap and mongo stream as source (node >= 10) #693

Open
adrian-gierakowski opened this issue Jan 16, 2020 · 13 comments

Comments

@adrian-gierakowski
Copy link

The following test deadlocks (toCallback never gets called):

/* eslint-disable no-unused-vars, no-shadow, no-redeclare */
var _, EventEmitter = require('events').EventEmitter,
    through = require('through'),
    // sinon = require('sinon'),
    Stream = require('stream'),
    streamify = require('stream-array'),
    concat = require('concat-stream'),
    // Promise = RSVP.Promise,
    transducers = require('transducers-js'),
    bluebird = require('bluebird'),
    runTask = require('orchestrator/lib/runTask'),
    fl = require('fantasy-land'),
    bufferFrom = require('buffer-from');

if (global.highland != null) {
    _ = global.highland;
}
else {
    _ = require('../lib/index');
}

// Use bluebird cancellation. We want to test against it.
bluebird.config({
    cancellation: true,
    longStackTraces : true
});


var MongoClient = require('mongodb').MongoClient

exports['batch - mongodb'] = function (test) {
    test.expect(1);

    MongoClient.connect('mongodb://localhost:27017')
    .then(async (client) => {

        var db = client.db("test")
        var col = db.collection("test")
        const docs = Array.from(Array(22).keys()).map( _id => { return { _id } })
        await col.removeMany({})
        await col.insertMany(docs)

        cursor = col.find()
        cursor.batchSize(2)
        // Changing batch size to 3 causes the test to pass.
        // cursor.batchSize(3)

        _(cursor.stream())
        .batch(11)
        // Changing batch size to 12 causes the test to pass.
        // .batch(12)
        .flatMap( x => {
            console.log("x:", x)
            return _(x)
        })
        // If flatMap above is replaced with .map, the test passes.
        // .map( x => {
        //     console.log("x:", x)
        //     return x
        // })
        .reduce(
            (acc, x) => {
                console.log("conc", acc, x)
                return acc.concat(x)
            },
            []
        )
        .toCallback((e, xs) => {
            console.log("xs",xs)
            test.same(xs, docs);
            test.done();
        })
    })

};

You save save the above in tests/batch_deadlock.js, install mongo drives with npm i mongodb and finally run it with npx nodeunit tests/batch_deadlock.js. It should deadlock when running with node version >= 10, but passes when ran with node 8.x.

I've also left a few comments in the code which show some tweaks that make the deadlock go away.

@szabolcs-szilagyi
Copy link

Created a small github repo with Travis on it with a similar test case:
https://github.com/szabolcs-szilagyi/highland-node-12
https://travis-ci.org/szabolcs-szilagyi/highland-node-12
Build Status

Setup Travis to build it daily, so the above image should go green once its fixed.

@szabolcs-szilagyi
Copy link

interestingly if you use the ratelimit function before calling the batch on the stream with some settings it can finish. Think it might be a timing issue on setting some internal state. E.g. if in the test project I put:

        hl(mongoStream)
          .ratelimit(110, 0)
          .batch(300)

it will actually finish, while if I put 120 for the amount it will hang.

@szabolcs-szilagyi
Copy link

what I've found sofar:
it is able to pull data from the mongo stream 4 times, but then it forgets to resume/pull from it again. Tried to run it with the --inspect flag to see - after lots of console.logs - but of course like all timing bugs it runs perfectly when you start it in debug mode. :/
Tried to reproduce the bug with file streams, but those ran okay. Another case that mitght be related is when I stream http requests using the got library, it also hangs. (node@12, got@10).

I'm tempted to give up at this point and just start on a big project of replacing all highland usages with some other solutions. :(

@jaidetree
Copy link
Contributor

I took a quick look at your test repo and saw you are using highland 3 which if I recall, is missing some work around when resources are destroyed which may be affecting this. What happens if you use the stable Highland 2.13.5? https://www.npmjs.com/package/highland

Also what happens if you use .toPromise(Promise) instead of .toCallback()?

Lastly, looking at the source:

https://github.com/caolan/highland/blob/3.0.0/lib/index.js#L2785-L2792

It has a clause to close the stream when the source stream closes. That suggests it may be the mongo node stream that is not triggering the completion because it's also batching and will not close until it's satisfied.

Could you try adding an event listener to the mongo stream to see if it dispatches a finish or close event?

@szabolcs-szilagyi
Copy link

szabolcs-szilagyi commented Mar 1, 2020

Hmm with version 2.13.5 it finishes nicely without the issue. That is cool, now just need to see why did we started to use 3. :D

In regards to the finish and close events, I've added them, but they don't get triggered. (updated the repo with the event listeners too)

edit: I use toPromise already in my test repo.

@jaidetree
Copy link
Contributor

Ok that's at least progress. I see your updated test repo and the use of toPromise. You're right that close and finish may not fire but is end firing?

Also, based on how you're using batch and flatMap here, would .take(11) not suffice?

@szabolcs-szilagyi
Copy link

Indeed progress, thank you for the support :)

For when the test runs on the non-batched stream it triggers all three event handlers, but on the second run with batched stream, none of them gets triggered.

How would the .take(n) method help? It gives a new stream with the first n items, but don't see how would we be able to iterate through all the items in the original stream.

@jaidetree
Copy link
Contributor

Thanks for that info! I'm going to dig in today and see if I can pinpoint if it's a highland issue specifically or something with the batched mongo stream.

As for take, I wasn't sure what the intention was looking at the example. Based on the use of .toCallback and the fact that this example batches it into an array then flattens it out again, I was wondering if the goal was to get a finite stream out of an infinite stream.

@jaidetree
Copy link
Contributor

jaidetree commented Mar 3, 2020

Made some progress.

async function main () {
  const client = await MongoClient.connect(url, opts);
  const db = client.db("test");
  const col = db.collection("test");
  await col.removeMany();
  await col.insertMany(docs);

  // This function will explode in six seconds
  timeout(6000)
    .done(() => {
      throw new Error("APPLICATION TIMED OUT");
      client.close();
      process.exit(1);
    });

  return _.of(col)
    .flatMap(fetchDocs)
    .batch(11)
    .consume(logStream)
    .flatMap(_)
    .reduce(append, [])
    .toCallback((err, xs) => {
      client.close();

      if (err) {
        console.error(err);
        process.exit(1);
      }

      assert.deepEqual(xs, docs, `DOCS WERE NOT RETUREND. INSTEAD FOUND "${xs}"`);

      console.log("Completed!", xs);
      process.exit(0);
    });
}

That works, the main difference being _(cursor.stream()) -> _.of(x).flatMap(() => _(cursor.stream)).

My next step is to create a new Node Readable stream that kinda simulates the cursor.stream. That should confirm whether this is a Highland or Mongo DB API issue.

My progress is at https://github.com/eccentric-j/highland-batch-repro which includes a docker-compose file to make it easier to run.

@szabolcs-szilagyi
Copy link

wow interesting, tried to use

  const mongoStream = hl.of(collection.find().stream()).flatMap(hl);

instead of the

  const mongoStream = collection.find().stream();

in my tests and indeed the streaming works if wrapped like this.

@jaidetree
Copy link
Contributor

Tried _(mockMongoStream(docs)) with a custom subclass of Stream.Readable and it worked without issue https://travis-ci.com/eccentric-j/highland-batch-repro/builds/151668807. I also tried _(_(docs).toNodeReadable({ objectMode: true })) and it also worked without issue https://travis-ci.com/eccentric-j/highland-batch-repro/builds/151663566.

This has me leaning towards the issue residing in mongodb's cursor.stream() method. The next step is to use the mongo db stream and try consuming it in a batch Node Transform stream to see if I can reproduce the error without using Highland.

@adrian-gierakowski
Copy link
Author

has anyone managed to make any further progress on this?

@szabolcs-szilagyi
Copy link

@adrian-gierakowski unfortunately no, personally I started to move away from using highland utilizing async-await, for await of and nodejs' built in pipeline method.
There is a helper lib that a collegue of mine made - called aih - have a look into the lib folder you can see some good examples on how you can use await with streams.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants