-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding config auto.offset.reset to PulsarKafkaConsumer #3273
Conversation
...pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
Outdated
Show resolved
Hide resolved
@@ -159,6 +163,16 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ | |||
} | |||
} | |||
|
|||
private OffsetResetStrategy getStrategy(final String strategy) { | |||
if (strategy.equals("earliest")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't strategy
be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, will change to the logic to take that into account.
return OffsetResetStrategy.EARLIEST; | ||
} else if (strategy.equals("latest")) { | ||
return OffsetResetStrategy.LATEST; | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer failing-fast if strategy is not null or empty here and logging a message to tell users that this is a wrong setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
@@ -311,6 +325,9 @@ public void unsubscribe() { | |||
long offset = MessageIdUtils.getOffset(msgId); | |||
|
|||
TopicPartition tp = new TopicPartition(topic, partition); | |||
if (lastReceivedOffset.get(tp) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain why you are adding this logic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I noticed that lastReceivedOffset is cleared whenever a seek-to-end or seek-to-front is called. So looking into it, I found that whenever lastReceivedOffset.get(tp) returns null, it would indicate the current offset position is invalid (or could not be found since we are using a map). Taking advantage of this fact, I would just call seekToEnd or seekToFront to achieve the same behavior as KafkaConsumer. Because in KafkaConsumer internals, their position() method and seekToEnd() and seekToStart() uses the same approach.
…rg/apache/kafka/clients/consumer/PulsarKafkaConsumer.java Co-Authored-By: ConcurrencyPractitioner <yohan.richard.yu@gmail.com>
SUCCESS --none-- |
@@ -65,6 +66,7 @@ | |||
|
|||
@Slf4j | |||
public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> { | |||
private static enum OffsetResetStrategy {EARLIEST, LATEST, NONE} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not using directly SubscriptionInitialPosition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, never thought about that. I would take a look. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I used SubscriptionInitialPosition later. We would still need to seek to the start or the end depending on the config setting. @merlimat Do you want me to get rid of the enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation on the website should also be updated to reflect the newly supported feature
One more thing. @merlimat Do you know what file to change for the documentation. |
@ConcurrencyPractitioner In |
Ok @merlimat Removed the enum. Do you have any other changes you want seen done? |
@@ -143,6 +146,7 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ | |||
|
|||
groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); | |||
isAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); | |||
strategy = getStrategy(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this give error if config
doesn't include the specified key? Shouldn't we have a default there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Kafka the default is latest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. I will take that into account.
@@ -159,6 +163,17 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ | |||
} | |||
} | |||
|
|||
private SubscriptionInitialPosition getStrategy(final String strategy) { | |||
switch(strategy) { | |||
case "earliest": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use 4 spaces for indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
@@ -492,9 +510,21 @@ public void seekToEnd(Collection<TopicPartition> partitions) { | |||
@Override | |||
public long position(TopicPartition partition) { | |||
Long offset = lastReceivedOffset.get(partition); | |||
return offset != null ? offset : -1l; | |||
if (offset == null) { | |||
return resetOffsets(partition).getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 spaces indent
Ok, done. @merlimat |
@sijie Do you think there is anything else I need? |
site2/docs/adaptors-kafka.md
Outdated
@@ -129,6 +129,7 @@ Properties: | |||
| Config property | Supported | Notes | | |||
|:----------------------------------------|:----------|:------------------------------------------------------------------------------| | |||
| `acks` | Ignored | Durability and quorum writes are configured at the namespace level | | |||
| `auto.offset.reset` | Yes | Will throw an exception if this property is not defined (i.e. is none). | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The note is no longer accurate now
} | ||
|
||
private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) { | ||
if (strategy == SubscriptionInitialPosition.Earliest) { | ||
seekToBeginning(Collections.singleton(partition)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are still several cases with tabs instead of spaces
Hi @merlimat Updated the PR. Hope this wraps things up. |
case "earliest": | ||
return SubscriptionInitialPosition.Earliest; | ||
default: | ||
return SubscriptionInitialPosition.Latest; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong indent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, these things happen. Oh, some tests were broken by my change. We should fix that before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I discovered a few things. When I looked at the failing tests for integration, KafkaApiTest
failed. But looking in detail, I found that we were using KafkaConsumer
instead of PulsarKafkaConsumer
. I don't think the test failures are related to my change. But I'm not too sure. Am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PulsarKafkaConsumer was shaded to KafkaConsumer
. so if there are test failures, it should be related.
Also, I did some digging. I couldn't seem to find any tests for PulsarKafkaConsumer. Should we add some tests to confirm the method's behavior? I found this to be good practice. WDYT? |
In general, it is a good practice to add tests (unit test at least, integration tests would be great) to make sure newly introduced code changes or features are covered by tests. so any subsequent changes can be verified using those tests in future. |
Ok, fixed the test. I will probably add a new one and this should be good to go. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Motivation
Currently, in KafkaConsumer, there is a new config which is called auto.offset.reset. It is currently ignored in PulsarKafkaConsumer, so we wish to implement it.
Modifications
We added some new code which defined the behavior if auto.offset.reset == "earliest" or "latest".
Result
It would automatically seek to the last offset or the first offset depending on the set config.