Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4208: Add Record Headers #2772

Closed
wants to merge 7 commits into from

Conversation

@michaelandrepearce
Copy link
Contributor

commented Mar 30, 2017

As per KIP-82

Adding record headers api to ProducerRecord, ConsumerRecord
Support to convert from protocol to api added Kafka Producer, Kafka Fetcher (Consumer)
Updated MirrorMaker, ConsoleConsumer and scala BaseConsumer
Add RecordHeaders and RecordHeader implementation of the interfaces Headers and Header

Some bits using are reverted to being Java 7 compatible, for the moment until KIP-118 is implemented.

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2546/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2546/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2550/
Test FAILed (JDK 8 and Scala 2.11).

@michaelandrepearce michaelandrepearce force-pushed the michaelandrepearce:KIP-82 branch Mar 30, 2017
@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2547/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2547/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2551/
Test FAILed (JDK 8 and Scala 2.11).

@michaelandrepearce michaelandrepearce force-pushed the michaelandrepearce:KIP-82 branch Mar 30, 2017
@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2548/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2548/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2552/
Test FAILed (JDK 8 and Scala 2.11).

@michaelandrepearce michaelandrepearce force-pushed the michaelandrepearce:KIP-82 branch Mar 30, 2017
@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2549/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2553/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2549/
Test FAILed (JDK 7 and Scala 2.10).

@michaelandrepearce michaelandrepearce force-pushed the michaelandrepearce:KIP-82 branch Mar 30, 2017
@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2551/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2555/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2551/
Test FAILed (JDK 7 and Scala 2.10).

@michaelandrepearce michaelandrepearce force-pushed the michaelandrepearce:KIP-82 branch Mar 30, 2017
@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2553/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2553/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot

This comment has been minimized.

Copy link

commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2557/
Test PASSed (JDK 8 and Scala 2.11).

Copy link
Contributor Author

left a comment

Review notes, to address small formatting changes, missed in cherry pick merge.

clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java Outdated
@@ -35,7 +35,7 @@
* @param isKey whether is for key or value
*/
public void configure(Map<String, ?> configs, boolean isKey);

This comment has been minimized.

Copy link
@michaelandrepearce

michaelandrepearce Mar 31, 2017

Author Contributor

accidental formatting, no need, need to revert.

clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java Outdated
@@ -42,6 +43,8 @@ public void testOldConstructor() {
assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum());
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize());
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize());
assertEquals(new RecordHeaders(), record.headers());

This comment has been minimized.

Copy link
@michaelandrepearce

michaelandrepearce Mar 31, 2017

Author Contributor

remove extra space

clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java Outdated
assertEquals("headerValue2", new String(record.headers().lastHeader("headerKey").value(), Charset.forName("UTF-8")));
}


This comment has been minimized.

Copy link
@michaelandrepearce

michaelandrepearce Mar 31, 2017

Author Contributor

remove extra un-needed whitespace

clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java Outdated
@@ -180,7 +180,7 @@ public void testOnAcknowledgementWithErrorChain() {
assertEquals(2, onErrorAckWithTopicPartitionSetCount);

// if producer record does not contain partition, interceptor should get partition == -1
ProducerRecord<Integer, String> record2 = new ProducerRecord<>("test2", null, 1, "value");
ProducerRecord<Integer, String> record2 = new ProducerRecord<>("test2", (Integer) null, 1, "value");

This comment has been minimized.

Copy link
@michaelandrepearce

michaelandrepearce Mar 31, 2017

Author Contributor

is this needed?

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated
@@ -477,18 +483,23 @@ private static int parseAcks(String acksString) {
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}

This comment has been minimized.

Copy link
@michaelandrepearce

michaelandrepearce Mar 31, 2017

Author Contributor

remove accidental, whitespace formatting change.

@asfbot

This comment has been minimized.

Copy link

commented Mar 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2567/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Mar 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2567/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot

This comment has been minimized.

Copy link

commented Mar 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2571/
Test FAILed (JDK 8 and Scala 2.11).

@michaelandrepearce

This comment has been minimized.

Copy link
Contributor Author

commented Mar 31, 2017

need to rebase, dd71e4a changed test class's breaking FetcherTest on CI build

@michaelandrepearce michaelandrepearce force-pushed the michaelandrepearce:KIP-82 branch Mar 31, 2017
@asfbot

This comment has been minimized.

Copy link

commented Mar 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2569/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot

This comment has been minimized.

Copy link

commented Apr 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3271/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot

This comment has been minimized.

Copy link

commented Apr 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3262/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Apr 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3266/
Test FAILed (JDK 7 and Scala 2.10).

As per KIP-82

Adding record headers api to ProducerRecord, ConsumerRecord
Support to convert from protocol to api added Kafka Producer, Kafka Fetcher (Consumer)
Updated MirrorMaker, ConsoleConsumer and scala BaseConsumer
Add RecordHeaders and RecordHeader implementation of the interfaces Headers and Header
Converting filter iterable into a private static class as per review comments.
Updating Java Doc to be more detailed as per review feedback
Check key variables passed to methods, to ensure it is not null, and throw if it is, as per Header behaviour when key is null.
Refactor duplicated code into private methods.
Update ProducerRecord so Iterable<Header> is after V value so is consistent with other areas, in response to review comment.
Support headers being able to be availble for serialization/deserialization for use case: https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers

Some parts of solution are in-lieu of Java 8 default methods.
Addressing review comments by @becketqin
revert back in Java 7 parts.
Addressing review comments from @hachikuji
Fix constructor to take Iterable<Header>
Fix conflict post rebase.
@michaelandrepearce michaelandrepearce force-pushed the michaelandrepearce:KIP-82 branch Apr 28, 2017
@asfbot

This comment has been minimized.

Copy link

commented Apr 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3263/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Apr 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3267/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot

This comment has been minimized.

Copy link

commented Apr 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3272/
Test FAILed (JDK 8 and Scala 2.11).

Enhance Tests, change close() to setReadOnly(), minor other review changes.
@michaelandrepearce michaelandrepearce force-pushed the michaelandrepearce:KIP-82 branch to 0d3a241 Apr 28, 2017
@michaelandrepearce

This comment has been minimized.

@asfbot

This comment has been minimized.

Copy link

commented Apr 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3268/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot

This comment has been minimized.

Copy link

commented Apr 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3264/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

commented Apr 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3273/
Test PASSed (JDK 8 and Scala 2.11).

@hachikuji

This comment has been minimized.

Copy link
Contributor

commented Apr 29, 2017

Merge to trunk is imminent. The record header saga is finally drawing to a close. What battles await tomorrow?!

(In all seriousness, thanks for the patch and the persistence with this KIP)

@asfgit asfgit closed this in 6185bc0 Apr 29, 2017
@simplesteph

This comment has been minimized.

Copy link
Contributor

commented Apr 29, 2017

Sorry I'm late to the party and I'm going to be a pain, but if the headers are making it to the ProducerRecord, they should also make it to the SourceRecord, so that the Connect Framework directly benefits from it.
Also, IMO this would have been a perfection opportunity of adding such headers as a method or something... Right now, many constructors are missing the headers parameter, and the only way to get to add headers is the very dangerous https://github.com/apache/kafka/pull/2772/files#diff-6deeb698c2574836af58336908d76ae5R66 (because of the partition parameter). Just saw this got merged an hour ago...

CF https://cwiki.apache.org/confluence/display/KAFKA/KIP+141+-+ProducerRecord+Interface+Improvements

cc @ijuma @mjsax

@michaelandrepearce

This comment has been minimized.

Copy link
Contributor Author

commented Apr 29, 2017

Hi Steph,

You actually add a header in normal usage by calling record.headers.add(key,value), this constructor was added in the record really only aimed at mirror making solutions.

Also adding headers is mostly aimed to be added by interceptors (where record is given) thus mutable at that stage. This is why on produce once sent the headers are made read only.

Yes connect framework isn't done as wasn't covered in this KIP, once this KIP was done I was actially going to raise another KIP immediately to add them (a bit like timestamp was first added to core and then a second kip was raised to add them to connect), but first step it was important to get headers added at the core level. We actually have the code for that ready so hold on :)

@simplesteph

This comment has been minimized.

Copy link
Contributor

commented Apr 29, 2017

That helps thanks !

@michaelandrepearce

This comment has been minimized.

Copy link
Contributor Author

commented Apr 29, 2017

@simplesteph
Was going to raise all this in a week or so time, but as we had the code ready and obviously you have a need thus raised the point re connect, i have raised it all now.

Here's KIP
https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect

Here's PR (which we had the code for)
#2942

Obviously first needs KIP discussion and vote. So please feel free to get active in that space supporting that :)

@mjsax

This comment has been minimized.

Copy link
Member

commented Apr 29, 2017

@simplesteph See https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers for details about this work. You can just update your own KIP accordingly.

@michaelandrepearce michaelandrepearce deleted the michaelandrepearce:KIP-82 branch May 7, 2017
asfgit pushed a commit that referenced this pull request May 21, 2017
Update upgrade.html

Raising this now, as KIP-118 is pulled from release as such submitting this without java 8 changes.

As per remaining review comment from #2772, updating the upgrade notes.

Author: Michael André Pearce <michael.andre.pearce@me.com>
Author: Michael Andre Pearce <Michael.Andre.Pearce@me.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2991 from michaelandrepearce/KIP-82
asfgit pushed a commit that referenced this pull request May 25, 2017
Update upgrade.html

Raising this now, as KIP-118 is pulled from release as such submitting this without java 8 changes.

As per remaining review comment from #2772, updating the upgrade notes.

Author: Michael André Pearce <michael.andre.pearce@me.com>
Author: Michael Andre Pearce <Michael.Andre.Pearce@me.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2991 from michaelandrepearce/KIP-82
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants
You can’t perform that action at this time.