New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4208: Add Record Headers #2772
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
f70c094
to
6245945
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
6245945
to
02e982a
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
02e982a
to
04f81fa
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
04f81fa
to
49cfa31
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
49cfa31
to
a843f7c
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review notes, to address small formatting changes, missed in cherry pick merge.
@@ -35,7 +35,7 @@ | |||
* @param isKey whether is for key or value | |||
*/ | |||
public void configure(Map<String, ?> configs, boolean isKey); | |||
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accidental formatting, no need, need to revert.
@@ -42,6 +43,8 @@ public void testOldConstructor() { | |||
assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum()); | |||
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize()); | |||
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize()); | |||
assertEquals(new RecordHeaders(), record.headers()); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra space
assertEquals("headerValue2", new String(record.headers().lastHeader("headerKey").value(), Charset.forName("UTF-8"))); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra un-needed whitespace
@@ -180,7 +180,7 @@ public void testOnAcknowledgementWithErrorChain() { | |||
assertEquals(2, onErrorAckWithTopicPartitionSetCount); | |||
|
|||
// if producer record does not contain partition, interceptor should get partition == -1 | |||
ProducerRecord<Integer, String> record2 = new ProducerRecord<>("test2", null, 1, "value"); | |||
ProducerRecord<Integer, String> record2 = new ProducerRecord<>("test2", (Integer) null, 1, "value"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this needed?
@@ -477,18 +483,23 @@ private static int parseAcks(String acksString) { | |||
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + | |||
" specified in value.serializer"); | |||
} | |||
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove accidental, whitespace formatting change.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
need to rebase, dd71e4a changed test class's breaking FetcherTest on CI build |
cdf4cf1
to
b4a0365
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
As per KIP-82 Adding record headers api to ProducerRecord, ConsumerRecord Support to convert from protocol to api added Kafka Producer, Kafka Fetcher (Consumer) Updated MirrorMaker, ConsoleConsumer and scala BaseConsumer Add RecordHeaders and RecordHeader implementation of the interfaces Headers and Header Converting filter iterable into a private static class as per review comments. Updating Java Doc to be more detailed as per review feedback Check key variables passed to methods, to ensure it is not null, and throw if it is, as per Header behaviour when key is null. Refactor duplicated code into private methods. Update ProducerRecord so Iterable<Header> is after V value so is consistent with other areas, in response to review comment. Support headers being able to be availble for serialization/deserialization for use case: https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers Some parts of solution are in-lieu of Java 8 default methods.
Addressing review comments by @becketqin
revert back in Java 7 parts.
Addressing review comments from @hachikuji
Fix constructor to take Iterable<Header>
Fix conflict post rebase.
cfd7796
to
b75f939
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Enhance Tests, change close() to setReadOnly(), minor other review changes.
b75f939
to
0d3a241
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Merge to trunk is imminent. The record header saga is finally drawing to a close. What battles await tomorrow?! (In all seriousness, thanks for the patch and the persistence with this KIP) |
Sorry I'm late to the party and I'm going to be a pain, but if the headers are making it to the ProducerRecord, they should also make it to the SourceRecord, so that the Connect Framework directly benefits from it. CF https://cwiki.apache.org/confluence/display/KAFKA/KIP+141+-+ProducerRecord+Interface+Improvements |
Hi Steph, You actually add a header in normal usage by calling record.headers.add(key,value), this constructor was added in the record really only aimed at mirror making solutions. Also adding headers is mostly aimed to be added by interceptors (where record is given) thus mutable at that stage. This is why on produce once sent the headers are made read only. Yes connect framework isn't done as wasn't covered in this KIP, once this KIP was done I was actially going to raise another KIP immediately to add them (a bit like timestamp was first added to core and then a second kip was raised to add them to connect), but first step it was important to get headers added at the core level. We actually have the code for that ready so hold on :) |
That helps thanks ! |
@simplesteph Here's KIP Here's PR (which we had the code for) Obviously first needs KIP discussion and vote. So please feel free to get active in that space supporting that :) |
@simplesteph See https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers for details about this work. You can just update your own KIP accordingly. |
Update upgrade.html Raising this now, as KIP-118 is pulled from release as such submitting this without java 8 changes. As per remaining review comment from #2772, updating the upgrade notes. Author: Michael André Pearce <michael.andre.pearce@me.com> Author: Michael Andre Pearce <Michael.Andre.Pearce@me.com> Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Ismael Juma <ismael@juma.me.uk> Closes #2991 from michaelandrepearce/KIP-82
Update upgrade.html Raising this now, as KIP-118 is pulled from release as such submitting this without java 8 changes. As per remaining review comment from #2772, updating the upgrade notes. Author: Michael André Pearce <michael.andre.pearce@me.com> Author: Michael Andre Pearce <Michael.Andre.Pearce@me.com> Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Ismael Juma <ismael@juma.me.uk> Closes #2991 from michaelandrepearce/KIP-82
As per KIP-82
Adding record headers api to ProducerRecord, ConsumerRecord
Support to convert from protocol to api added Kafka Producer, Kafka Fetcher (Consumer)
Updated MirrorMaker, ConsoleConsumer and scala BaseConsumer
Add RecordHeaders and RecordHeader implementation of the interfaces Headers and Header
Some bits using are reverted to being Java 7 compatible, for the moment until KIP-118 is implemented.