-
Notifications
You must be signed in to change notification settings - Fork 123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-32019][Connector/Kafka] EARLIEST offset strategy for partitions discoveried later based on FLIP-288 #28
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @loserwang1024
I have left some initial comments.
KafkaSourceEnumState(Set<TopicPartition> assignedPartitions) { | ||
this.assignedPartitions = assignedPartitions; | ||
KafkaSourceEnumState( | ||
Set<TopicPartition> assignPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please change the parameter names to assignedPartitions
and unassignedInitialPartitions
@Internal | ||
public class TopicPartitionWithAssignStatus { | ||
private final TopicPartition topicPartition; | ||
private final long assignStatus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assignmentStatus
would convey the meaning better than assignStatus
Also, I would prefer TopicPartitionAndAssignmentStatus
over TopicPartitionWithAssignStatus
} | ||
|
||
public Set<TopicPartition> assignedPartitions() { | ||
return assignedPartitions; | ||
return partitions.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 68-74 and 78-84 duplicate the code a bit.
Maybe you can define a private method to abstract the common code and call it from assignedPartitions()
and unassignedPartitions()
So, something like this
private Set<TopicPartition> filterPartitions(long assignmentStatus);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RamanVerma Thanks for your advice. Would you like to code review again?
@@ -113,10 +124,13 @@ public KafkaSourceEnumerator( | |||
Properties properties, | |||
SplitEnumeratorContext<KafkaPartitionSplit> context, | |||
Boundedness boundedness, | |||
Set<TopicPartition> assignedPartitions) { | |||
Set<TopicPartition> assignedPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be better to pass the KafkaSourceEnumState
object in this constructor to limit the number of arguments.
private static byte[] serializeTopicPartitions(Collection<TopicPartition> topicPartitions) | ||
private static byte[] serializeTopicPartitions( | ||
Collection<TopicPartition> assignedPartitions, | ||
Collection<TopicPartition> unassignedInitialPartitons, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo unassignedInitialPartitons
-> unassignedInitialPartitions
Set<TopicPartition> assignedPartitions = enumState.assignedPartitions(); | ||
Set<TopicPartition> unassignedInitialPartitons = enumState.unassignedInitialPartitons(); | ||
boolean initialDiscoveryFinished = enumState.initialDiscoveryFinished(); | ||
return serializeTopicPartitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can just get rid of the private method now.
We are serializing more than just topic partitions (initialDiscoveryFinished is a boolean) so the method name needs to change. Also, there is no other caller. So, let's just do everything in serialize method itself.
final int partition = in.readInt(); | ||
assignedPartitions.add(new TopicPartition(topic, partition)); | ||
} | ||
final int numUnassignedInitialPartitons = in.readInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typos: numUnassignedInitialPartitons
. Also in line 162
eec6121
to
56c87e0
Compare
@RamanVerma Thanks for your advice. Would you like to see my new modification? |
Now that @RamanVerma is busy, could anyone else help me? CC, @PatrickRen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@loserwang1024 Thanks for the PR! The overall logic looks good to me. I left some comments about naming and code style issues.
...a/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
Show resolved
Hide resolved
...va/org/apache/flink/connector/kafka/source/enumerator/TopicPartitionAndAssignmentStatus.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
Outdated
Show resolved
Hide resolved
...ka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
Outdated
Show resolved
Hide resolved
56c87e0
to
6d56e91
Compare
@PatrickRen CC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@loserwang1024 Thanks for the update! I left some comments.
.../src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
Outdated
Show resolved
Hide resolved
...kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/AssignmentStatus.java
Outdated
Show resolved
Hide resolved
...kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/AssignmentStatus.java
Show resolved
Hide resolved
...kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/AssignmentStatus.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@loserwang1024 Thanks for the update. LGTM.
Could you squash all commits into one and rebase the latest master? I triggered a CI just now and let's wait for the result.
…s discoveried later based on FLIP-288
deae500
to
07c3165
Compare
Awesome work, congrats on your first merged pull request! |
What is the purpose of the change
As described in [FLIP-288](https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source), the strategy used for new partitions is the same as the initial offset strategy, which is not reasonable.
According to the semantics, if the startup strategy is latest, the consumed data should include all data from the moment of startup, which also includes all messages from new created partitions. However, the latest strategy currently maybe used for new partitions, leading to the loss of some data (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).if the data from all new partitions is not read, it does not meet the user's expectations.
Other ploblems see final Section:
User specifies OffsetsInitializer for new partition
.Therefore, it’s better to provide an EARLIEST strategy for later discovered partitions.
Brief change log
KafkaSourceEnumState
withTopicPartitionWithAssignStatus
to distinguish between initial partitions and newly discovered partitions.TopicPartitionWithAssignStatus
is also better for future expansion, as new statuses can be added without changing the state results.newDiscoveryOffsetsInitializer
(EARLIEST) to get offsets for newly discovered partitions.kafkaSourceEnumStateSerializer
to handle the expandedKafkaSourceEnumState
.Verifying this change
KafkaSourceEnumStateSerializerTest
.KafkaEnumeratorTest#testSnapshotState
method to test snapshot state in more scenarios:KafkaEnumeratorTest#testDiscoverPartitionsPeriodically
method to test whether new partitions use EARLIEST offset while initial partitions use specified offset strategy.