Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: kafka consumer data loss promble #1629

Merged
merged 12 commits into from
Jun 26, 2023
Merged

Conversation

luoluoyuyu
Copy link
Contributor

@luoluoyuyu luoluoyuyu commented May 29, 2023

See related discussion #1626

@github-actions github-actions bot added backend Everything that is related to the StreamPipes backend java Pull requests that update Java code labels May 29, 2023
@dominikriemer
Copy link
Member

Hi @luoluoyuyu thanks for the PR!
I have a question related to the earliest offset (I'm not so familiar with the Kafka internals):
When stopping a pipeline and restarting the pipeline again, the same consumer group id is used. Would this change imply that all data which has been produced to the topic would be replayed to the processors of a pipeline?

@luoluoyuyu
Copy link
Contributor Author

luoluoyuyu commented May 29, 2023

Hi @dominikriemer
If there is an offset in the Kafka consumer group (consumer-submitted offsets are saved in the broker), the consumption data will not start from scratch. If the offset of the consumer group expires or the offset is never committed, Kafka will consume the data from the beginning in the earliest configuration. Therefore, whether to consume from scratch depends on when the Kafka broker offset expires.

you can find more related information in the following resources:
https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset
https://issues.apache.org/jira/browse/KAFKA-3806

I'm happy to contribute to the Streampipes community.

@bossenti bossenti added this to the 0.93.0 milestone May 29, 2023
@bossenti bossenti assigned bossenti and luoluoyuyu and unassigned bossenti May 29, 2023
@bossenti
Copy link
Contributor

So to be on the safe side, we should check how we commit our offsets, right?
If we, e.g., set a short expiration period this might cause issues if my understanding is correct

@RobertIndie
Copy link
Member

So to be on the safe side, we should check how we commit our offsets, right?

For the current default behavior, the kafka consumer will auto-commit the record every 5 seconds: https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset
I think it's better to add an option to commit the offset after the records have been polled and processed. Looks like:

while (isRunning) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
      records.forEach(record -> eventProcessor.onEvent(record.value()));
      if (/*Disable auto commit*/) {
        consumer.commitSync();
      }
    }

Otherwise, we still have a high probability of consuming duplicate records within five seconds.

For this PR, I propose exposing this config to users. Let users choose what they want. We could keep the same behavior as before(Use the latest offset).

@luoluoyuyu
Copy link
Contributor Author

luoluoyuyu commented May 29, 2023

while (isRunning) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
      records.forEach(record -> eventProcessor.onEvent(record.value()));
      if (/*Disable auto commit*/) {
        consumer.commitSync();
      }
    }

Yes, I think this method works, kafka default offsets expiration time is 7 days (https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes), I think long enough if the user modifies the offsets expiration time. We can expose this configuration to users. Let users choose what they want

@luoluoyuyu
Copy link
Contributor Author

while (isRunning) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
      records.forEach(record -> eventProcessor.onEvent(record.value()));
      if (/*Disable auto commit*/) {
        consumer.commitSync();
      }
    }

should we put this program in another PR to complete

@dominikriemer
Copy link
Member

Hi @luoluoyuyu @RobertIndie thanks for the explanation!
So do we then want to expose AUTO_OFFSET_RESET_CONFIG_DEFAULT and OFFSET_COMMIT_SYNC (or so) as an environment variable? This can be added to the Environment class in streampipes-commons. We could then use the earliest settings for tests and other use cases where this is desired.
My understandig is that if a pipeline is manually stopped (user doesn't want to receive any data) and started again, the earliest setting would cause Kafka to either replay the data from the beginning if the time between pipeline stop and start is more than 7 days, or replay from the last offset. The current behaviour is that data is replayed from the latest offset (all events arriving after pipeline start).

Is this correct? Should we use latest as default then or did I misunderstand something?

@RobertIndie
Copy link
Member

So do we then want to expose AUTO_OFFSET_RESET_CONFIG_DEFAULT and OFFSET_COMMIT_SYNC (or so) as an environment variable?

I think we could just expose these configs to the Kafka adapter first to solve this adapter test issue. There are too many Kafka configurations, exposing them as environment variables would make it too complicated. If we subsequently need to allow users to configure more parameters for the Kafka messaging protocol, we can write these configurations to a file and expose an environment variable KAFKA_CONSUMER_PROPERTIES to point to that file. It's better to make the environment variables simple.

My understandig is that if a pipeline is manually stopped (user doesn't want to receive any data) and started again, the earliest setting would cause Kafka to either replay the data from the beginning if the time between pipeline stop and start is more than 7 days, or replay from the last offset. The current behaviour is that data is replayed from the latest offset (all events arriving after pipeline start).

Yes. That's correct. If we don't want to replay any messages that are produced before the consumer created and can tolerate messages loss, then we can use this approach to set the retention or use unique consumer group each time the consumer is created. I think the non-persistent topic feature from the Pulsar would be a better choice for this case.

@RobertIndie
Copy link
Member

while (isRunning) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
      records.forEach(record -> eventProcessor.onEvent(record.value()));
      if (/*Disable auto commit*/) {
        consumer.commitSync();
      }
    }

should we put this program in another PR to complete

Yes. That's a different topic. Let's put it in another PR.

@dominikriemer
Copy link
Member

Thanks for the explanation @RobertIndie! :-)

@tenthe
Copy link
Contributor

tenthe commented Jun 12, 2023

Hi @luoluoyuyu,

Thanks a lot for providing the PR. It looks good, and I see the problem you are solving.

However, I agree with @dominikriemer that merging this PR as it is will break the behavior of running pipelines. Please correct me if I am wrong, but I would say that when a user does the following:

  1. Start a pipeline at timestamp_0.
  2. Then the pipeline is stopped at timestamp_1.
  3. And started again at timestamp_2.

Expected behavior: The events between timestamp_1 and timestamp_2 are not processed.
Behavior of PR: The events between timestamp_1 and timestamp_2 are replayed when the pipeline is started again.

My suggestion would be to leave the consumer config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG as it was and provide a way for developers to set it to earliest.
This would ensure that the pipeline behaviour is as expected and a developer could change the properties for the integration tests.

Is there anything I'm not thinking about?

Cheers,
Philipp

@github-actions github-actions bot added connect Related to the `connect` module (adapters) pipeline elements Relates to pipeline elements labels Jun 13, 2023
@luoluoyuyu
Copy link
Contributor Author

HI, @tenthe
I have added a ConsumerConfig.AUTO_OFFSET_RESET_CONFIG configuration item for the kafka adapter and this option is not mandatory, even if the user does not configure ConsumerConfig.AUTO_OFFSET_RESET_CONFIG , it will not affect the creation of the kafka adapter and the adapter is created using the default behavior of kafka。

What do you think of this program?

@luoluoyuyu
Copy link
Contributor Author

The effect is as follows 741400a745575f64aede215cbd760b6

@bossenti
Copy link
Contributor

Can we pre-select the default option in the UI?
In addition I have two minor remarks:

  • A more detailed description below the option heading would be great
  • IMHO the new config option should be placed above the Topic section

@bossenti bossenti linked an issue Jun 13, 2023 that may be closed by this pull request
@tenthe
Copy link
Contributor

tenthe commented Jun 13, 2023

HI, @tenthe I have added a ConsumerConfig.AUTO_OFFSET_RESET_CONFIG configuration item for the kafka adapter and this option is not mandatory, even if the user does not configure ConsumerConfig.AUTO_OFFSET_RESET_CONFIG , it will not affect the creation of the kafka adapter and the adapter is created using the default behavior of kafka。

What do you think of this program?

Hello @luoluoyuyu,
thank you very much for updating the PR, I like this solution very much 👍.
We should create a follow up issue to create an update script for existing adapters that don't have these settings yet.
Once we have a solution for this (see discussion #1663), we can provide an implement in another PR.

@luoluoyuyu
Copy link
Contributor Author

luoluoyuyu commented Jun 14, 2023

Hi, @bossenti

Can we pre-select the default option in the UI?
In addition I have two minor remarks:

A more detailed description below the option heading would be great
IMHO the new config option should be placed above the Topic section

Thank you for your suggestion, in the process of modification found that you should add the default configuration, streampipes in the process of creating adapters need to iterate through the configuration items, if there is an unconfigured option may throw an exception back, resulting in the failure of creating the adapter.

@bossenti
Copy link
Contributor

Hi, @bossenti

Can we pre-select the default option in the UI?
In addition I have two minor remarks:
A more detailed description below the option heading would be great
IMHO the new config option should be placed above the Topic section

Thank you for your suggestion, in the process of modification found that you should add the default configuration, streampipes in the process of creating adapters need to iterate through the configuration items, if there is an unconfigured option may throw an exception back, resulting in the failure of creating the adapter.

Okay then I'd propose to remove the option to set None as a configuration value as it always results in an exception, right?

Another aspect I'd like to discuss although I'm a bit late to the party here:
We all agree that this is a configuration only required for development purpose, right?
Do we then rely want to expose it via the UI and potentially confuse users with it?
Wouldn't it be better if we expose it as an environment variable as proposed earlier by @dominikriemer?

I'm very sorry to be so late in bringing up this discussion, @loststar
The effort you are already putting in is really appreciated.
I would love to get PR in as soon as possible, but I think we should always have the best user experience as a goal.

@tenthe
Copy link
Contributor

tenthe commented Jun 14, 2023

Hey @bossenti,
you are right, first the problem came up in the tests. But I personally actually like the idea that a user also has the possibility to configure this behavior for each adapter.
Do you see a downside of having this option in the adapter? If not I would like to merge this new feature.
Regarding the default value I agree with you. It would be good to have one there.

@dominikriemer
Copy link
Member

Hi,
thanks @luoluoyuyu for this solution, looks great!
Concerning the default value of radio buttons, I don't think there is currently an option do define a default selection in the SDK, so we can create a new issue for this feature and later on also add it to other adapters and pipeline elements.
We could maybe call None something like Default and assign a default behaviour in the implementation (I'd favor latest).

@loststar
Copy link
Contributor

Okay then I'd propose to remove the option to set None as a configuration value as it always results in an exception, right?

Another aspect I'd like to discuss although I'm a bit late to the party here: We all agree that this is a configuration only required for development purpose, right? Do we then rely want to expose it via the UI and potentially confuse users with it? Wouldn't it be better if we expose it as an environment variable as proposed earlier by @dominikriemer?

I'm very sorry to be so late in bringing up this discussion, @loststar The effort you are already putting in is really appreciated. I would love to get PR in as soon as possible, but I think we should always have the best user experience as a goal.

Hi @bossenti
May I interpret your proposal as follows: Do we need to add an option to the UI that is unlikely to be used in the production environment?
If so, I think it's a good idea to include it. However, it would be better if it was placed in something like a "Debug Panel" or a collapsed "Advanced Configuration" section.

@luoluoyuyu
Copy link
Contributor Author

Hi @loststar

Hi @bossenti
May I interpret your proposal as follows: Do we need to add an option to the UI that is unlikely to be used in the production environment?
If so, I think it's a good idea to include it. However, it would be better if it was placed in something like a "Debug Panel" or a collapsed "Advanced Configuration" section.

This is a good idea, but I did not find the configuration and description of the advanced settings in the code, I think if we want to implement " advanced configuration " may need to create an issue to complete the " Advanced Configuration " function

@bossenti
Copy link
Contributor

But I personally actually like the idea that a user also has the possibility to configure this behavior for each adapter.

Okay, then I'm fine with it :)

I'm very sorry to be so late in bringing up this discussion, @loststar
The effort you are already putting in is really appreciated.
I would love to get PR in as soon as possible, but I think we should always have the best user experience as a goal.

I'm sorry @luoluoyuyu, this was addressed to you 🙂

@luoluoyuyu
Copy link
Contributor Author

Hi @bossenti

I'm sorry @luoluoyuyu, this was addressed to you 🙂

Sorry, I'll try to do better

@dominikriemer
Copy link
Member

Hi @bossenti

I'm sorry @luoluoyuyu, this was addressed to you 🙂

Sorry, I'll try to do better

@luoluoyuyu don't worry, Tim only apologized for adressing the wrong username in his comment ;-)
You are doing great and we really appreciate your contribution!!

@bossenti
Copy link
Contributor

@luoluoyuyu wait wait...
This was only meant as an appreciation for you, please don't get me wrong!
I was feeling bad to bring again another topic to this discussion and just wanted to express my gratitude for you 🙂
You are really doing great and the insights you bring into our project are really valuable.
I'm really sorry in case I snubbed you

@luoluoyuyu
Copy link
Contributor Author

Ohhh 😁

@dominikriemer dominikriemer merged commit 58d2194 into apache:dev Jun 26, 2023
30 checks passed
@dominikriemer
Copy link
Member

Thank you very much @luoluoyuyu and sorry that this took so long to merge!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backend Everything that is related to the StreamPipes backend connect Related to the `connect` module (adapters) java Pull requests that update Java code pipeline elements Relates to pipeline elements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

About Kafka consumer data loss problem
6 participants