Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RdKafka::Message blocking the RdKafka:KafkaConsumer.close #775

Closed
4 of 9 tasks
Pchelolo opened this issue Aug 31, 2016 · 9 comments
Closed
4 of 9 tasks

RdKafka::Message blocking the RdKafka:KafkaConsumer.close #775

Pchelolo opened this issue Aug 31, 2016 · 9 comments

Comments

@Pchelolo
Copy link

Description

The bug manifests itself in Blizzard/node-rdkafka#5

In my (pretty limited) understanding, here's what's happening. The node driver wrapping the RdKafka::Message into the V8 object and passes it to JavaScript. So now the memory is managed by JS GC, so we don't have control over when the JS object is going to be destroyed, thus no control when the native RdKafka::Message is going to be destroyed.

As I understand, internally, RdKafka::Message holds a reference to the consumer it was consumed by, so until all the messages are deleted, the consumer disconnect never finishes - the background thread is stuck on poll call until ref_count reaches 0 and it happens only after a very significant delay because node.js GC might not be called.

So, the question is why does the still existing message need to hold disconnecting the consumer and why does the message need a reference to the original consumer?

It's possible to fix it in the driver, but that would require one unnecessary memcpy of the message payload which would be wonderful to avoid.

How to reproduce

Consume a message, don't delete the message, try to disconnect the consumer - it hangs.

I can post backtraces or create some small reproducible test case if you think that's an issue in librdkafka.

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): current master
  • Apache Kafka version: 0.9.0.1
  • librdkafka client configuration:
  • Operating system: any
  • Using the legacy Consumer
  • Using the high-level KafkaConsumer
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

It's possible to fix it in the driver, but that would require one unnecessary memcpy of the message payload which would be wonderful to avoid.

This is exactly what librdkafka also tries to avoid.
The response payload from the broker is read into one big heap buffer that librdkafka then parses the individual messages from, instead of copying each message's payload it simply points to somewhere within this buffer with a refcount to know when it is safe to free the buffer.
I've had plans to make a detach() API for messages, but that would result in a new allocation + memcpy, so it doesnt really solve anything, it just puts the burden elsewhere.

There doesnt seem to be a quick fix for this on either side (librdkafka or V8).
I think your best bet is wrapping librdkafka's Message in your own message object, keep track of all outstanding ones and then destroy the underlying librdkafka Message for all of them prior to calling close(). The GC will eventually destroy your wrapping (but now empty) Message object.

@webmakersteve
Copy link

Just to give a little insight into how the C++ bindings for node-rdkafka are operating:

  1. Get a message from KafkaConsumer::consume
  2. Wrap the payload in a node::Buffer without copying the memory. Specify a FreeCallback to be executed when the memory is free'd by v8 GC.
  3. Free callback gets executed when the buffer goes out of scope. Callback deletes underlying Message.

I verified that the free callback is getting called and it still appears to hang when it's in the process of disconnecting. Part of the issue is v8's scoping - the buffer could be attached to a scope that is still active when someone calls consumer.disconnect, so the underlying message never gets deallocated. But even in a test case where I ensure they execute in different scopes and the buffer does get deallocated, it looks like if I try to Handle::close when things are not clean, it hangs even after it gets deleted.

Any advice on how we can avoid this problem? I can't just deallocate memory that is in node::Buffers myself when they are in scope because the application might call them later.

@edenhill
Copy link
Contributor

Instead of having node::Buffer -> Message, what about node::Buffer -> IntermediaryMessageWrapper -> Message.

In your close() wrapper you make a copy of the Message payload bytes of all instantiated IntermediaryMessageWrapper objects and then destroy the Message. This way the node::Buffer will refrence the underlying Message memory most of the time (during normal operation), but following a close() it'll reference a copy of the Message.

@webmakersteve
Copy link

I could implement that if I was sure it would fix the issue. I'll take a look and see.

@ottomata
Copy link
Contributor

ottomata commented Sep 7, 2016

@webmakersteve, any luck with this?

@webmakersteve
Copy link

webmakersteve commented Sep 7, 2016

@ottomata Haven't had time to write up some tests to ensure this would fix the issue. Am hoping to get to it soon.

@webmakersteve
Copy link

Just tried to make a test case for my own sanity to make sure this was the issue. I made it so the underlying RdKafka::Message was delete'd directly after it was read and it still looks like the consumer is hung up.

Is there any other way to debug why this is happening? I can verify that this only happens after a successful read from Kafka.

After it happens this is what debug is telling me, if it helps at all:

{ message: <Buffer 61>,
  size: 4096,
  topic: 'test',
  offset: 711480,
  partition: 0 }
{ severity: 7,
  fac: 'CONSUME',
  message: '127.0.0.1:9092/1001: Enqueue 18449 messages on test [0] fetch queue (qlen 0, v3)' }
{ severity: 7,
  fac: 'FETCH',
  message: '127.0.0.1:9092/1001: Fetch reply: Success' }
{ severity: 7,
  fac: 'FETCH',
  message: '127.0.0.1:9092/1001: Fetch topic test [0] at offset 729929 (v3)' }
{ severity: 7,
  fac: 'FETCH',
  message: '127.0.0.1:9092/1001: Fetch 1/1/1 toppar(s)' }
{ severity: 7,
  fac: 'FETCH',
  message: '127.0.0.1:9092/1001: Fetch for 1 toppars, fetching=1, backoff=0ms' }
{ severity: 7,
  fac: 'SEND',
  message: '127.0.0.1:9092/1001: Sent FetchRequest (v1, 63 bytes @ 0, CorrId 11)' }
{ severity: 7,
  fac: 'RECV',
  message: '127.0.0.1:9092/1001: Received FetchResponse (v1, 1048612 bytes, CorrId 11, rtt 2.55ms)' }
{ severity: 7,
  fac: 'FETCH',
  message: '127.0.0.1:9092/1001: Topic test [0] MessageSet size 1048576, error "Success", MaxOffset 89877841, Ver 3/3' }
{ severity: 7,
  fac: 'CGRPOP',
  message: 'Group "node-rdkafka-bench" received op TERMINATE in state up (join state assigned)' }
{ severity: 7,
  fac: 'CGRPTERM',
  message: 'Terminating group "node-rdkafka-bench" in state up with 1 partition(s)' }
{ severity: 7,
  fac: 'UNSUBSCRIBE',
  message: 'Group "node-rdkafka-bench": unsubscribe from current subscription of 1 topics (leave group=yes, join state assigned)' }
{ severity: 7,
  fac: 'PAUSE',
  message: 'Library pausing 1 partition(s)' }
{ severity: 7,
  fac: 'PAUSE',
  message: 'Pause test [0]: at offset 711481' }
{ severity: 7,
  fac: 'ASSIGN',
  message: 'Group "node-rdkafka-bench": delegating revoke of 1 partition(s) to application rebalance callback: unsubscribe' }
{ severity: 7,
  fac: 'CGRPJOINSTATE',
  message: 'Group "node-rdkafka-bench" changed join state assigned -> wait-rebalance_cb' }
{ severity: 7,
  fac: 'CONSUME',
  message: '127.0.0.1:9092/1001: Enqueue 18577 messages on test [0] fetch queue (qlen 0, v3)' }
{ severity: 7,
  fac: 'FETCH',
  message: '127.0.0.1:9092/1001: Fetch reply: Success' }
{ severity: 7,
  fac: 'FETCHDEC',
  message: 'Topic test [0]: fetch decide: updating to version 4 (was 3) at offset 711481 (was 748506)' }
{ severity: 7,
  fac: 'FETCH',
  message: '127.0.0.1:9092/1001: Topic test [0] at offset 711481 (18577/100000 msgs, 74308/1000000 kb queued) is not fetchable: paused' }
{ severity: 7,
  fac: 'FETCHADD',
  message: '127.0.0.1:9092/1001: Removed test [0] from fetch list (0 entries, opv 4)' }

It never gets passed that last logged entry.

@edenhill
Copy link
Contributor

'Group "node-rdkafka-bench" changed join state assigned -> wait-rebalance_cb'

Do you if you get that final rebalance callback with err==PARTITIONS_REVOKED?
It looks like it is waiting for your app to handle that and call assign(NULL) to move things forward.

Are you closing the client with rd_kafka_consumer_close()?

@txbm
Copy link

txbm commented Jan 25, 2018

Just for reference I believe I ran into another manifestation of this problem while using node-rdkafka and while it's being tracked here - Blizzard/node-rdkafka#196

I just wanted to include this test case as a specific reproduction in case it helps to find a way to track down the apparent awkwardness around the rebalance_cb between node-rdkafka and librdkafka.

https://gist.github.com/petermelias/17f276f4bdc73eafa6c3741697f62d71

This test will show the process hanging indefinitely despite the unsubscribe and disconnect calls when a rebalance_cb is attached from JavaScript-land.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants