Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node process does not exit after subscribing to a Kafka topic #222

Closed
gyamana opened this issue Jul 17, 2017 · 8 comments
Closed

Node process does not exit after subscribing to a Kafka topic #222

gyamana opened this issue Jul 17, 2017 · 8 comments
Assignees
Labels

Comments

@gyamana
Copy link

gyamana commented Jul 17, 2017

Hello,

I am trying to troubleshoot an issue around our node.js applications not exiting gracefully.

I have an app which executes some code then terminates, but I find that the node process continues to stay alive after subscribing to a Kafka topic with node-rdkafka.

I notice that this is also the case (node process continues to live) when there is an uncaught error, where usually the node process terminates.

Is there a way to subscribe to a topic without this behaviour?

Thanks in advance.

@webmakersteve
Copy link
Contributor

The subscription itself shouldn't be the thing keeping the process alive, as it doesn't actually open any worker threads to do its work. It is likely the act of consuming that does.

What method of consuming are you using? Could you show me a sample and potentially your shutdown handler so I could see the sequence of events?

@FuzzOli87
Copy link

@webmakersteve

I ran into this issue today:

For example:

Consumer

const kafka = require('node-rdkafka');

const topic = 'SIMPLE_TEST_TOPIC01';
const connection = 'localhost:9092';
const groupId = 'integration-consumer-test0111';
const handler = (data) => {
  console.log('got data', data);
  throw new Error('HELLO');
};

const consumer = new kafka.KafkaConsumer({
  'group.id': groupId,
  'metadata.broker.list': connection,
  'api.version.request': true, // Request the api version of Kafka node
  event_cb: true
});

consumer.on('error', error => console.log('errorz', error));
consumer.on('event.log', log => console.log(log));
consumer.connect();

consumer
  .on('ready', () => {
    console.log('consumer is ready');
    consumer.subscribe([topic]);

    consumer.consume();
  })
  .on('data', handler);

producer

const kafka = require('node-rdkafka');

const topic = 'SIMPLE_TEST_TOPIC01';
const connection = 'localhost:9092';
const producer = new kafka.Producer({
  'metadata.broker.list': connection,
  'broker.version.fallback': '0.10.0', // If kafka node doesn't have API, use this instead
  'api.version.request': true // Request the api version of Kafka node
});

producer.connect();
producer.on('ready', () => {
  console.log('producer is ready');
  try {
    producer.produce(topic, null, Buffer.from(JSON.stringify({ test: 'test' })));
    // producer.disconnect();
  } catch (e) {
    console.error(e);
  }
});

The consumer in this scenario throws the error but does not kill the process.

However, in the stream form of the consumer:

const kafka = require('node-rdkafka');

const topic = 'SIMPLE_TEST_TOPIC01';
const connection = 'localhost:9092';
const groupId = 'integration-consumer-test0111';
const handler = (data) => {
  console.log('got data', data);
  throw new Error('HELLO');
};
const stream = kafka.KafkaConsumer.createReadStream(
  { 'metadata.broker.list': connection, 'group.id': groupId, event_cb: true },
  {},
  {
    topics: [topic]
  }
);

stream.on('data', handler);

stream.on('error', err => console.log({ ERROR: err }));
stream.on('event.err', err => console.log({ errl: err }));

It does!

I've been digging into it and I can't see a reason for why this is happening at the library level. The only difference is that the stream calls the binding method "consume" while the standard api calls "consumeloop". I need to go pick up my kid but I'll try and dig a little more. My hunch is that this is from the bindings somewhere.

@FuzzOli87
Copy link

Been doing some deep diving. Addons are fairly new to me but from running this in debugger, It looks like the while loop in the C++ libraries for the non-stream consumer keeps executing although the main JS thread failed.

@FuzzOli87
Copy link

FuzzOli87 commented Aug 25, 2017

@webmakersteve

So, I've been digging around all day out of curiosity. The issue is definitely that the KafkaConsumerConsumeLoop keeps executing even after the main thread throws an error. This is because it is not listening for any errors from JS.

I toyed around with this:

void KafkaConsumerConsumeLoop::HandleMessageCallback(RdKafka::Message *msg) {
  Nan::HandleScope scope;

  const unsigned int argc = 2;
  v8::Local<v8::Value> argv[argc];

  argv[0] = Nan::Null();
  argv[1] = Conversion::Message::ToV8Object(msg);

  // We can delete msg now
  delete msg;

  Nan::TryCatch tc;
  callback->Call(argc, argv);
  if (tc.HasCaught()) {
    // Do something here
  }
}

And it does indeed catch from main JS thread. However, I have no idea what to do after. Ideally I would somehow let the execute method know that there was an error and to stop the while loop. But I'm not sure how to do that. Another way is to force another tick of the event loop, but again, that is a bit outside my comfort zone.

These guys for example created a helper function that forces the event loop to tick:
https://github.com/tjanczuk/edge/blob/118500581244cd31ff7acb88945b5d02007618d6/src/common/callbackhelper.cpp

This is a thread I found this at: nodejs/nan#668 It might not be too useful because the new version of nan fixes the issue they refer to in here but its similar in that the worker thread does not deal with JS exceptions.

The reason why the stream consumer exits is because in the JS side the event loop keeps ticking.

For the moment, I'm doing this:

var kafka = require('./lib');

const globalClients = [];
process.on('uncaughtException', (err) => {
  globalClients.forEach((client) => client.disconnect());

  console.error(err);
});
var topic = 'SIMPLE_TEST_TOPIC01';
var connection = 'localhost:9092';
var groupId = 'integration-consumer-test0111';
var handler = function (data) {
  console.log('got data', data);
  throw new Error('HELLO');
};

var consumer = new kafka.KafkaConsumer({
  'group.id': groupId,
  'metadata.broker.list': connection,
  // 'broker.version.fallback': '0.10.0', // If kafka node doesn't have API, use this instead
  'api.version.request': 'true',
   event_cb: 'true'
});

consumer.connect();


consumer
  .on('ready', function() {
    console.log('consumer is ready');
    consumer.subscribe([topic]);

    consumer.consume(function (err, thing, here) {
      console.log('in consume', { err, thing, here });
    });

    globalClients.push(consumer);

    consumer.on('error', function(error) {console.log('errorz', error);});
    consumer.on('event.log', function(log) { console.log(log); });
  })
  .on('data', handler);

Which is not really optimal as it will require management if an app uses a lot of clients.

Anyways, I hope you find this useful and I hope it leads you the right way and are able to figure out the best way to fix this and I look forward to learning from it!

@webmakersteve
Copy link
Contributor

Stuff like this makes me want to remove the consume loop entirely :( But thanks for the thorough investigation! I will play around with this and try to figure out an optimal solution :)

@FuzzOli87
Copy link

FuzzOli87 commented Aug 29, 2017

@webmakersteve Thanks for your response. Yes, it is a bit of a headache. My C++ experience is limited and I am as green as they come to the Node Addon API but I learned quite a bit by debugging this.

I'm thinking for the short-term to just use the streams API. From the playing around I did, it looks like the streams API kind of does the same thing that the flow-mode API does (never ending loop) but makes those loops in v8 rather than outside of it.

I Googled for hours but I couldn't find a solution. I just need SOMETHING to tell me if the v8 thread is still alive and well in the execute method. Thats all I want but doesn't seem to exist since execute method only deals with the current thread(the loop) and can't peek into v8 threads from there.

@darky
Copy link

darky commented Apr 10, 2019

This issue so painful :(
Can't use Jest with node-rdkafka
Always troubleshoots with forever open handles

@iradul
Copy link
Collaborator

iradul commented Jun 14, 2020

This should be fixed with 2.9.0

@iradul iradul closed this as completed Jun 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants