Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Application doesn't finish gracefully after client.disconnect #5

Closed
Pchelolo opened this issue Aug 16, 2016 · 22 comments
Closed

Application doesn't finish gracefully after client.disconnect #5

Pchelolo opened this issue Aug 16, 2016 · 22 comments

Comments

@Pchelolo
Copy link
Contributor

I expect that after a producer/consumer is disconnected it should destroy all RdKafka objects and let the application exit, however it doesn't happen.

Code example:

const Kafka = require('node-rdkafka');
const producer = new Kafka.Producer({
    'metadata.broker.list': 'localhost:9092'
});
producer.connect(undefined, () => {
    producer.produce({
        topic: 'test_dc.resource_change',
        message: 'test'
    }, () => {
        producer.disconnect(() => {
            console.log('Disconnected');
        });
    });
});

Versions: kafkaOS X El Capitan
Expected behaviour: application sends one message to the topic and shuts down.
Actual behaviour: application sends one message to the topic but continues running.

A bit of investigation: gdb shows that we have 8 threads running after disconnect was called:

  Id   Target Id         Frame 
* 1    Thread 0x1203 of process 55925 0x00007fff8f611eca in kevent () from /usr/lib/system/libsystem_kernel.dylib
  2    Thread 0x1303 of process 55925 0x00007fff8f60afae in semaphore_wait_trap () from /usr/lib/system/libsystem_kernel.dylib
  3    Thread 0x1403 of process 55925 0x00007fff8f60afae in semaphore_wait_trap () from /usr/lib/system/libsystem_kernel.dylib
  4    Thread 0x1503 of process 55925 0x00007fff8f60afae in semaphore_wait_trap () from /usr/lib/system/libsystem_kernel.dylib
  5    Thread 0x1603 of process 55925 0x00007fff8f60afae in semaphore_wait_trap () from /usr/lib/system/libsystem_kernel.dylib
  6    Thread 0x1703 of process 55925 0x00007fff8f610db6 in __psynch_cvwait () from /usr/lib/system/libsystem_kernel.dylib
  7    Thread 0x1803 of process 55925 0x00007fff8f61107a in select$DARWIN_EXTSN () from /usr/lib/system/libsystem_kernel.dylib
  8    Thread 0x1903 of process 55925 0x00007fff8f61107a in select$DARWIN_EXTSN () from /usr/lib/system/libsystem_kernel.dylib

Threads 1-5 are normal, so whatever prevents shutdown is in threads 6-8, I suspect it's on thread 6, here's the backtrace:

#0  0x00007fff8f610db6 in __psynch_cvwait () from /usr/lib/system/libsystem_kernel.dylib
#1  0x00007fff9cd3f728 in _pthread_cond_wait () from /usr/lib/system/libsystem_pthread.dylib
#2  0x000000010079630b in uv_cond_wait ()
#3  0x000000010078a2ab in worker ()
#4  0x0000000100796000 in uv.thread_start ()
#5  0x00007fff9cd3e99d in _pthread_body () from /usr/lib/system/libsystem_pthread.dylib
#6  0x00007fff9cd3e91a in _pthread_start () from /usr/lib/system/libsystem_pthread.dylib
#7  0x00007fff9cd3c351 in thread_start () from /usr/lib/system/libsystem_pthread.dylib
#8  0x0000000000000000 in ?? ()

Any ideas where could the source of a problem be?

@webmakersteve
Copy link
Contributor

The only thing I can think that is doing this is that client->close() actually has an absurdly long (for general node use-cases) default timeout. Has console.log reported the word 'Disconnected' or is even that blocked?

The reason I ask is that we have a test-case that is running a pretty similar piece of code: https://github.com/Blizzard/node-rdkafka/blob/master/e2e/producer.spec.js#L113 which is passing CI.

@Pchelolo
Copy link
Contributor Author

Pchelolo commented Aug 16, 2016

The only thing I can think that is doing this is that client->close() actually has an absurdly long (for general node use-cases) default timeout.

The 'disconnect' was reported on the console, so the callback is called, and it's called pretty fast.

The reason I ask is that we have a test-case that is running a pretty similar piece of code: https://github.com/Blizzard/node-rdkafka/blob/master/e2e/producer.spec.js#L113 which is passing CI.

I think the reason why it's passing is because you are explicitly calling process.exit(0) here, however that shouldn't be a requirement for the node app to shutdown, right?

For us it's pretty important since we run tests in mocha and different tests require different configurations, so we heavily use disconnect and then recreate the clients with different configs.

@webmakersteve
Copy link
Contributor

Has console.log('Disconnected'); executed in your example before it hangs?

@Pchelolo
Copy link
Contributor Author

Oh, sorry, wrong indentation on my previous comment..

Has console.log('Disconnected'); executed in your example before it hangs?

The 'disconnect' was reported on the console, so the callback is called, and it's called pretty fast.

@webmakersteve
Copy link
Contributor

Looks like this happens regardless of whether a message is produced or not. In fact, just instantiating the class makes it do this.

The only RdKafka objects created at that phase are the configs and their requisite callbacks. I can try to play around and see if storing Persistent v8 handles of objects rather than rdkafka configs until connect time fixes it.

@webmakersteve
Copy link
Contributor

Was able to identify it as the Dispatcher class.

@webmakersteve
Copy link
Contributor

New branch with a fix in place that should make it shutdown gracefully: https://github.com/Blizzard/node-rdkafka/tree/graceful-shutdown. Currently running some tests on it but feel free to try it and let me know if that fixed the problem for you.

@Pchelolo
Copy link
Contributor Author

@webmakersteve It definitely makes things different, but:

This test made it coredump:

"use strict";
var kafka = require('./lib');
var consumer = new kafka.KafkaConsumer({
    'metadata.broker.list': 'localhost:9092',
    'group.id': 'test'
});
consumer.connect(undefined, function() {
    console.log('connected');
    consumer.subscribe([ 'test_dc.resource_change' ]);
    consumer.disconnect();
});

But after I've updated the branch and rebuilt it once again I can't reproduce any more, but here's a dump (a relevant part of it). Just FYI, if it rings some bells

Crashed Thread:        0  Dispatch queue: com.apple.main-thread

Exception Type:        EXC_CRASH (SIGABRT)
Exception Codes:       0x0000000000000000, 0x0000000000000000

Application Specific Information:
Assertion failed: (0), function uv__finish_close, file ../deps/uv/src/unix/core.c, line 264.


Thread 0 Crashed:: Dispatch queue: com.apple.main-thread
0   libsystem_kernel.dylib          0x00007fff8f610f06 __pthread_kill + 10
1   libsystem_pthread.dylib         0x00007fff9cd414ec pthread_kill + 90
2   libsystem_c.dylib               0x00007fff9e9cb6e7 abort + 129
3   libsystem_c.dylib               0x00007fff9e992df8 __assert_rtn + 321
4   node                            0x0000000105d7d342 uv_run + 680
5   node                            0x0000000105c4f9eb node::Start(int, char**) + 594
6   node                            0x0000000105485d34 start + 52

@webmakersteve
Copy link
Contributor

webmakersteve commented Aug 16, 2016

Can you try one more time. I noticed that RIGHT after i said it and updated the branch. If you re-pulled afterwards it would make sense that it didn't happen again.

uv_close does not expect the memory to be freed so I had to make an adjustment.

@Pchelolo
Copy link
Contributor Author

Pchelolo commented Aug 16, 2016

@webmakersteve checked again, no EXC_CRASH any more, but there's a test that still doesn't work:

"use strict";
var kafka = require('./lib');
var consumer = new kafka.KafkaConsumer({
    'metadata.broker.list': 'localhost:9092',
    'group.id': 'test'
}, {
    'auto.offset.reset': 'smallest'
});
consumer.connect(undefined, function() {
    console.log('connected');
    consumer.subscribe([ 'test_dc.resource_change' ]);
    consumer.consume(function (err, msg) {
        console.log(err, msg);
        console.log('Calling disconnect');
        consumer.disconnect(function(err, info) {
            console.log('Diconnected', err, info);
        });
    });
});

Important thing is that I have some messages in the topic. So for some reason after I consume something, disconnect never happens and the callback is never called. I've tried waiting for like 5 minutes on this one, I know that RdKafka::Handle::close might be super slow, but not as slow.

GDB shows some interesting stuff - we have 8 threads, as always thread 1 is event loop, 2-4 are libuv thread pull, threads 6 and 7 are in tread_join() and thread 8 is in poll(). In my experience that happened when some reference to an RdKafka::Message leaked or wasn't deleted before disconnecting. That object contains a reference to RdKafka::KafkaConsumer. It has some custom reference counting mechanism embedded, so having more some undeleted RdKafka::Message objects before calling RdKafka::KafkaConsumer::Close make the latter call stuck. But that's just an unproved theory. In support for it, if I run the same code against an empty topic, so I get timeout instead of a message, disconnect happens properly

@webmakersteve
Copy link
Contributor

In this case, does it log 'Disconnected'? If it does, it means client->close() has completed execution and it would be the event loop still having open threads.

https://github.com/Blizzard/node-rdkafka/blob/master/src/message.cc#L103

The message is freed as the node::Buffer free callback, so it should only happen when node garbage collects. This was done to prevent needing to copy the data from Kafka to save time.

@Pchelolo
Copy link
Contributor Author

@webmakersteve Nope, in this case it doesn't log Disconnected, so client->close() never finishes.

The message is freed as the node::Buffer free callback, so it should only happen when node garbage collects. This was done to prevent needing to copy the data from Kafka to save time.

So, I guess that's the root cause of this problem... And there's not much we can do I guess - performance is way more critical then graceful shutdown. Lemme play with it a bit

@webmakersteve
Copy link
Contributor

You can try playing with session.timeout.ms to see if that makes any difference. close is limited to around that number times 3 according to the librdkafka header files.

@webmakersteve
Copy link
Contributor

webmakersteve commented Aug 23, 2016

Interestingly enough, even deallocating the buffer forceably does not cause handle->close() to finish.

var Kafka = require('../librdkafka');

var consumer = new Kafka.KafkaConsumer({
  'client.id': 'kafka-test',
  'metadata.broker.list': 'localhost:9092',
  'group.id': 'test'
}, {});


consumer.connect(function(err) {
  console.log(err);

  consumer.assign([
    {
      topic: 'test',
      partition: 0,
      offset: 3
    }
  ]);

  function disconnect() {
    consumer.disconnect(function() {
      console.log('disconnected');
    });
  }

  var msg = consumer.consume(function(err, msg) {
    console.log(msg);
  });

  setInterval(() => global.gc(), 1000);
  setTimeout(() => consumer.disconnect(function() {}), 2200);

});

Added some logging in the Buffer::Free callback to make sure it was getting called, and even when I could verify that the RdKafka::Message was getting deleted, the disconnect was still hanging.

Perhaps this is a library problem?

@Pchelolo
Copy link
Contributor Author

I've created confluentinc/librdkafka#775 to understand what does Magnus think about this problem.

@ottomata
Copy link

Just poking! I'm really hoping to deploy something into production in the next few months that will be blocked by this bug. Any way I can help?

@webmakersteve
Copy link
Contributor

I looked into implementing Magnus' solution to the problem but I can't change where the pointer of data points after the fact by detecting disconnections as far as I can tell because the instantiated node::Buffer does not expose its data pointer. After it is created it can't be changed.

I looked into potentially having a solution where you could opt into memory copying instead if disconnections were more important than performance, but it looks like the rebalance callback actually may be one of the things stopping the graceful shutdown of the wrapper.

I think using the default librdkafka rebalance callback may fix the issue. I'm hoping to get to that in the next few days and hopefully that helps fix the issue as well.

@ottomata
Copy link

Hm, for my purposes I don't need any rebalance callback, as I'm using assign directly, and never subscribe. According to Magnus, librdkafka's rebalance_cb is never called if I only use assign. I'm not totally sure how that relates to the idea about using librdkafka's rebalance callback affecting this. This bug affects me even though I don't ever call subscribe.

@webmakersteve
Copy link
Contributor

webmakersteve commented Sep 21, 2016

There are two issues that stop client->close from finishing: 1) the rebalance callback not unassigning the partitions and 2) the slab memory allocated to a RdKafka::Message. You would still experience the second issue even if you don't use subscribe.

I haven't found a solution I really like that works with how node::Buffer works. We may need to copy the memory to resolve this problem, and I'm not sure how much slower that will make things. I wanted to do some benchmarks on it, but don't have too much time right now.

Would be happy to take contributions for this if you have available time.

@webmakersteve
Copy link
Contributor

This issue should be fixed in #42 which has to rely on the memcpy to avoid the problem.

Unfortunately I decided philosophically that performance is not more important than the bindings being just that: bindings. A performance degradation is inherently necessary when making bindings for another language in order to subscribe to that language's way of doing things.

There is no way for me to adjust memory in use by node after it has been returned to the v8 thread, and buffer data is slab allocated which makes it read only (essentially) once I have instantiated the buffer.

Would love additional thoughts on this issue though. I am trying to bump up the performance in other ways to compensate for the new memcpy

@ottomata
Copy link

Been doing some testing. I can reproduce this issue in version 0.3.3. Using master, with #42 merged, my process finishes properly after calling disconnect().

So! Looking good to me!

@webmakersteve
Copy link
Contributor

Fixed in release 0.6.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants