Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 9480][pulsar-io] add option for auto.offset.reset to kafka source #9482

Merged
merged 6 commits into from
Feb 15, 2021

Conversation

sbourkeostk
Copy link
Contributor

Fixes #9480

Motivation

The kafka source sets auto.offset.reset to "earliest". This means all old messages from kafka are produced to pulsar. Often is it desirable to start form the present location "latest".
The option is set after the user config has been loaded so it cannot be changed:
source code link

Modifications

Added an autoOffsetReset option to KafkaSourceConfig

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change added tests and can be verified as follows:
Added to unit test and verified behaviour.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): no
  • The public API: no
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no
  • The rest endpoints: no
  • The admin cli options: no
  • Anything that affects deployment: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs / JavaDocs
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@Renkai
Copy link
Contributor

Renkai commented Feb 4, 2021

@BewareMyPower Could you have a look?

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the sample config to kafkaSourceConfig.yaml and verify it in loadFromYamlFileTest?

@codelipenghui codelipenghui added this to the 2.8.0 milestone Feb 4, 2021
@codelipenghui codelipenghui added release/2.7.1 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages labels Feb 4, 2021
@sbourkeostk
Copy link
Contributor Author

Sample config added to kafkaSourceConfig.yaml and verified in loadFromYamlFileTest

Co-authored-by: Yu Liu <50226895+Anonymitaet@users.noreply.github.com>
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sbourkeostk
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@sbourkeostk
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@sbourkeostk
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

1 similar comment
@sbourkeostk
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@sijie sijie merged commit 9c2b081 into apache:master Feb 15, 2021
@sbourkeostk sbourkeostk deleted the kafka-auto-offset-reset branch February 15, 2021 09:31
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Feb 18, 2021
codelipenghui pushed a commit that referenced this pull request Feb 18, 2021
…rce (#9482)

Fixes #9480 

### Motivation

The kafka source sets auto.offset.reset to "earliest". This means all old messages from kafka are produced to pulsar. Often is it desirable to start form the present location "latest".
The option is set after the user config has been loaded so it cannot be changed:
[source code link](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java#L87)

### Modifications

Added an autoOffsetReset option to KafkaSourceConfig

(cherry picked from commit 9c2b081)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.7 Archived: 2.7 is end of life release/2.7.1 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

support auto.offset.reset in kafka connector
9 participants