Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possibility to consume EOF messages #803

Merged

Conversation

fabianschmitthenner
Copy link
Contributor

@fabianschmitthenner fabianschmitthenner commented Jun 11, 2020

librdkafka supports to send EOF messages when reaching the end of a partition. This has to be enabled by setting the enable.partition.eof setting which is documented with the following information:

Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.

In our application, we need to receive these messages to distinguish between having reached the end of a partition and the case where we just haven't received all data yet.

This PR adds the possibility to receive these eof messages to the non-flowing mode of the standard API. It would also be possible to add this possibility to the other APIs. This PR is backward-compatible: It doesn't change any behaviour when not enabling this feature by calling a function on the consumer. This function is modelled in the same way as setDefaultTimeout is modelled, i.e. it sets an instance variable on the consumer.

When designing this feature, the question came up if these EOF messages should be handled as errors (i.e. consume should return a promise error) or as just another kind of message. I decided to use the approach to return this as another message type. That is, with this change, the resulting array in consume can not only return content messages but also EOF error messages when this feature is enabled. It can be enabled by calling consumer.setErrorsAsMessages(true). The reason for this decision are as follows:

  • currently consume discards errors when it already has some messages to be returned. This is the case because it can't return both an error and messages. I think it's useful in general to have an option to return these errors as messages instead, as that way both can be returned and they're not discarded. As this would be a breaking change, a flag is added to enable this. In this PR, only the EOFs are returned in this way though. But we already saw another important use case for us: We want to know when we get an OffsetOutOfRange error. Currently, this is sometimes not forwarded from node-rdkafka: If we get some messages from one partition and then get an OffsetOutOfRange error from another partition we miss this message. For the EOF messages, this is even more problematic, as they are usually returned after some other messages are returned.
  • there will be multiple EOF messages returned when listening to multiple partitions. To reduce the number of c++ / nodejs translations (which are somewhat expensive), it is useful to be able to return all of them in one call. Here, all of them can be returned including the content messages.

Internally, this behaviour is implemented by changing the behaviour of KafkaConsumer::Consume to return some errors not as Batons but still keep them as messages. This is done because we still need more information than just the error code: We also want to return the offset and partition for EOF messages as they may be useful for consumers. I first tried to add these additional fields in the Baton but found it too complicated to figure out how to delete the data at the right time then. This approach also makes serializing the data to JSON easier, as the ToV8Object just had to be extended for the eof message type. I checked all use sites of this function and changed them accordingly.

…nable this flag

  When the flag is not set, the behaviour doesn't change
- refactor consume to return some of the errors as messages instead
- eof messages include partition and offset
@fabianschmitthenner
Copy link
Contributor Author

Is there anything I can do to get some feedback on this? @webmakersteve @iradul ?

@iradul
Copy link
Collaborator

iradul commented Jul 20, 2020

Thank you for PR! Library is certainly missing this feature.
I think it would be more natural to have EOF events callback similar to rebalance_cb or offset_commit_cb instead of mixing it with the regular messages. Also that way we'd be backwards compatible.

@fabianschmitthenner
Copy link
Contributor Author

Hey, thanks for you reply :)

Good point. I see some pros and cons. As for cons:

  • More (expensive) switches between js and c++ land. Note that there can also be multiple EOF messages (e.g. for multiple partitions)
  • If they're part of the messages it's clear that the EOF comes after the last message. I wonder how that would be with such a callbacks. Would we get the callback then before the call to consume returns? I would believe so.

Pros is that it doesn't need this new switch to enable a different mode. I could also imagine offering both (always having the callbacks, but then also a switch to enable them as messages).

Okay, now for questions how this would work. I guess we would introduce a new callback, something like eof_cb? Or should we have something more generic like error_cb (that way we could also reuse the same callback for OffsetOutOfRange errors for example).

@iradul
Copy link
Collaborator

iradul commented Jul 23, 2020

More (expensive) switches between js and c++ land. Note that there can also be multiple EOF messages (e.g. for multiple partitions)

I don't see this as a significant performance cost but it's certainly worth testing and comparing.

Would we get the callback then before the call to consume returns?

Good point. Guess we'll have to look at the source and figure how to implement this to get the answer.

I guess we would introduce a new callback, something like eof_cb? Or should we have something more generic like error_cb

Yes, I think partition_eof_cb would be a good name. I wouldn't mix other errors with this.

@fabianschmitthenner
Copy link
Contributor Author

Okay. I'll try this callback approach for now, let's see where that leads us.

@fabianschmitthenner
Copy link
Contributor Author

Ah, if we use NaN::Callback, looks like it's blocking until the execution of the callback in JS side is finished. We will only continue returning the other messages after that callback returned then. But this also means that we wait for these callbacks to be executed before continuing processing. When we have multiple EOFs in the queue, we wait multiple times.

@fabianschmitthenner
Copy link
Contributor Author

Hm, I wonder how callback registration should work. The other callbacks are configured as part of the configuration for the consumer. But that's because they're part of librdkafka CONFIGURATION. partition_eof_cb is not part of that. There's enable.partition.eof, but that's just a boolean and not a callback.

@iradul
Copy link
Collaborator

iradul commented Jul 25, 2020

Hm, I wonder how callback registration should work. The other callbacks are configured as part of the configuration for the consumer. But that's because they're part of librdkafka CONFIGURATION. partition_eof_cb is not part of that. There's enable.partition.eof, but that's just a boolean and not a callback.

You are right. Setting enable.partition.eof to true should be enough for consumer to start emitting partition EOF events. We don't need partition_eof_cb call back.

@fabianschmitthenner
Copy link
Contributor Author

You are right. Setting enable.partition.eof to true should be enough for consumer to start emitting partition EOF events. We don't need partition_eof_cb call back.

So you mean like is done in this PR? Maybe we don't even need this consumer.setErrorsAsMessages(true) function as there will be no change as long as enable.partition.eof is false (which also is the default)? 🤔

@iradul
Copy link
Collaborator

iradul commented Jul 26, 2020

Yes, if we emit EOF events we don't need any extra setting. When enable.partition.eof is set to false underlying librdkafka shouldn't even produce PARTITION_EOF events so we shouldn't either.

@fabianschmitthenner
Copy link
Contributor Author

Okay. Then I'll remove the setErrorsAsMessages function from this PR.

@iradul
Copy link
Collaborator

iradul commented Jul 26, 2020

Good name for EOF event could be partition.eof

@fabianschmitthenner
Copy link
Contributor Author

Good name for EOF event could be partition.eof

Where would that name appear?

@fabianschmitthenner
Copy link
Contributor Author

I didn't entirely get your vision for how we communicate the EOF events back yet. Are you okay with mixing them with normal messages now? Or through what other means should they be communicated back (as we discarded the callback idea AFAICT).

Then the question is how the json should look like and I would expect maybe something like

{
   event: "partition.eof",
   code: -191, // as this is the error code of ERR__PARTITION_EOF
   partition: number,
   offset: number,
   topic: string
}

Maybe also adding a field for the error description (which would be Broker: No more messages as per result from RdKafka::err2str).

@fabianschmitthenner
Copy link
Contributor Author

I removed the setErrorsAsMessages function. That simplified the PR quite a bit :)

@fabianschmitthenner
Copy link
Contributor Author

fabianschmitthenner commented Jul 26, 2020

Let me know when the changes go in the right direction code-wise. Then I can also add some documentation to the README and add the feature to the other APIs.

@iradul
Copy link
Collaborator

iradul commented Jul 27, 2020

I didn't entirely get your vision for how we communicate the EOF events back yet. Are you okay with mixing them with normal messages now? Or through what other means should they be communicated back (as we discarded the callback idea AFAICT).

I don't think we should mix normal messages with EOF events. Here is how I see this:
When enable.partition.eof is set to false, from user perspective, everything should look the same as it's now.
When enable.partition.eof is set to true library should emit EOF events with partition.eof event, for example:

const consumer = new Kafka.KafkaConsumer({
  'group.id': 'testgroup',
  'metadata.broker.list': 'localhost:9092',
  'enable.partition.eof': true,
});

consumer
  .on('ready', function () {
    console.log('consumer connected');
    consumer.subscribe([topic]);
    consumer.consume();
    // or
    // consumer.consume(100, (err, messages) => { /* here we get normal messages */ });
  })
  .on('data', function (message) {
    // here we should get normal message
  })
  .on('partition.eof', function (eof_event) {
    // here we should get EOF event
    // {
    //   partition: number,
    //   offset: number,
    //   topic: string,
    // }
  })

consumer.connect();

I don't think error message and error code are useful since they're constant so we can omit those.

@fabianschmitthenner
Copy link
Contributor Author

Ah that makes sense yes 👍

@fabianschmitthenner
Copy link
Contributor Author

fabianschmitthenner commented Jul 27, 2020

Oh, that way we also won't have the additional switches between js and c++ land in the consumeNum case 😃

Co-authored-by: Gabriel Assis Bezerra <gabriel.bezerra@gmail.com>
@fabianschmitthenner
Copy link
Contributor Author

@iradul May you review again, please. I implemented it now as you suggested and also added a few e2e tests :).

e2e/both.spec.js Outdated Show resolved Hide resolved
@iradul
Copy link
Collaborator

iradul commented Dec 4, 2020

Thanks you!

@iradul iradul merged commit 6a36b45 into Blizzard:master Dec 4, 2020
@fabianschmitthenner
Copy link
Contributor Author

fabianschmitthenner commented Dec 7, 2020

Hey @iradul, cool that the PR finally got merged :).

I wonder, did you get the emails I send some time ago to you and @webmakersteve where I asked about getting contribution rights and being more involved in the overall project?

Best

Fabian

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants