New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4935: Deprecate client checksum API and compute lazy partial checksum for magic v2 #3123
Conversation
…ecksum for magic v2
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -112,7 +114,7 @@ public ConsumerRecord(String topic, | |||
long offset, | |||
long timestamp, | |||
TimestampType timestampType, | |||
long checksum, | |||
Long checksum, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a note here in case others are wondering: this is OK because this constructor was added during this release cycle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Looks good overall, just a few minor comments.
* @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the | ||
* checksum returned by the broker may not match what was computed by the producer. | ||
* It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally, | ||
* message format v2 does not support a record-level checksum. To maintain compatibility, a partial |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe mention that record-level checksums were removed for performance reasons.
private volatile Long checksum; | ||
|
||
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp, | ||
Long checksum, int serializedKeySize, int serializedValueSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long
-> Long
is a binary incompatible change.
@@ -54,15 +57,7 @@ private RecordMetadata(TopicPartition topicPartition, long offset, long timestam | |||
|
|||
@Deprecated | |||
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is deprecated. Shall we remove it or undeprecate it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we deprecated in 0.10.0. Maybe it is safe to remove now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was thinking it was fine to remove it since users should not create an instance of this apart from tests and it's been deprecated for a few release cycles. Or if we think the method is useful, then it would make sense undeprecate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Let's remove it.
public long checksum() { | ||
if (checksum == null) | ||
this.checksum = DefaultRecordBatch.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add a comment here and ConsumerRecord
explaining that this will be null
for message format >= 2 and hence why we call DefaultRecordBatch
.
@@ -96,14 +96,10 @@ RecordMetadata valueOrError() throws ExecutionException { | |||
return value(); | |||
} | |||
|
|||
long checksum() { | |||
Long checksum() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be checksumOrNull
?
} | ||
|
||
/** | ||
* Write the record to `out` and return its crc. | ||
*/ | ||
public static long writeTo(ByteBuffer out, | ||
public static int writeTo(ByteBuffer out, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment above needs to be updated.
@@ -493,4 +496,13 @@ public void remove() { | |||
} | |||
|
|||
} | |||
|
|||
public static long computePartialChecksum(long timestamp, int serializedKeySize, int serializedValueSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why this is not in DefaultRecord
?
@@ -344,7 +344,7 @@ private int writeLegacyCompressedWrapperHeader() { | |||
return writtenCompressed; | |||
} | |||
|
|||
private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, | |||
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment stating the return value.
recordWritten(offset, timestamp, DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, key, value, headers)); | ||
return crc; | ||
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); | ||
recordWritten(offset, timestamp, sizeInBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The basic changes look good to me. Not much you can do differently IMO. It would be good to have some tests to make sure that the checksum
methods on RecordMetadata
(on the producer side) and the ConsumerRecord
(on the consume side) return what's expected.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks. Before merging I did a few trivial changes:
- Mentioned removed constructor in upgrade notes (we already had a line for that, so just mentioned the class name).
- Avoided some deprecation warnings by using
Long.valueOf
so that the right constructor is chosen. - Added @SuppressWarnings("deprecation") in a few places.
timestamp, checksum, serializedKeySize, serializedValueSize); | ||
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp, | ||
long checksum, int serializedKeySize, int serializedValueSize) { | ||
this(topicPartition, baseOffset, relativeOffset, timestamp, (Long) checksum, serializedKeySize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the cast to Long.valueOf
.
Struct keyStruct = type.recordKey(); | ||
ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf()); | ||
keyStruct.writeTo(key); | ||
key.flip(); | ||
return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS); | ||
} | ||
|
||
public long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) { | ||
public Long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I documented the return type above.
*/ | ||
public static long writeTo(DataOutputStream out, | ||
public static int writeTo(DataOutputStream out, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed the alignment here and the other method in this class.
…ecksum for magic v2 Author: Jason Gustafson <jason@confluent.io> Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes #3123 from hachikuji/KAFKA-4935
No description provided.