-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add getPartitionIndex() to the Record<> #9947
Conversation
* | ||
* @return The partition number | ||
*/ | ||
default Optional<Integer> getPartitionNumber() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default Optional<Integer> getPartitionNumber() { | |
default Optional<Integer> getPartitionId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think parittion id it better than partition number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie I agree but default Optional<String> getPartitionId() {
already defined on line 76.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about getPartitionIndex
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie renamed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good to me
but I left some comments
|
||
KafkaSchemaWrappedSchema keySchema; | ||
|
||
KafkaSchemaWrappedSchema valueSchema; | ||
|
||
AbstractKafkaSourceRecord(SourceRecord srcRecord) { | ||
this.destinationTopic = Optional.of("persistent://"+topicNamespace + "/" + srcRecord.topic()); | ||
this.partitionNumber = Optional.of(srcRecord.kafkaPartition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional.ofNullable ?
@@ -178,6 +178,11 @@ public KafkaRecord(ConsumerRecord<String,?> record, V value, Schema<V> schema) { | |||
return Optional.of(Integer.toString(record.partition())); | |||
} | |||
|
|||
@Override | |||
public Optional<Integer> getPartitionNumber() { | |||
return Optional.of(record.partition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional.ofNullable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
record.partition()
returns int, cannot be null
@@ -61,6 +61,11 @@ | |||
return Optional.of(topicName); | |||
} | |||
|
|||
@Override | |||
public Optional<Integer> getPartitionNumber() { | |||
return Optional.of(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional.ofNullable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int partition
, cannot be null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
@sijie you left comments PTAL again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add some tests for the new method introduced in the PR?
0067e4f
to
5206f01
Compare
Motivation
Looking at #9927 (comment) there is no way to reliably get partition number.
There is Optional getPartitionId() which returns:
Modifications
Added
default Optional<Integer> getPartitionIndex()
to theRecord
interface.Return partition number where appropriate.
Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesinterface Record<T>
Documentation