Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KCL Deaggregation #6917

Conversation

justinborromeo
Copy link
Contributor

@justinborromeo justinborromeo commented Jan 26, 2019

Fixes #6714. Rather than having users download the Kinesis Client Library jar, this change adds the capability to perform deaggregation on Kinesis Producer Library-aggregated messages without the KCL dependency. These changes were all written without any knowledge of how KCL implements deaggregation.

Changes:

  1. Messages are checked to see if they are aggregations based on whether they have the magic numbers and their checksum is valid (see https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md for protobuf schema). If so, they are deaggregated using the Google protobuf library. If not, they are processed in the same manner as before.

  2. The deaggregation config value was removed. The new deaggregation method can handle both aggregated and non-aggregated messages.

  3. A maven step was added to generate a Java file that matches the KPL protobuf schema.

I know my doc changes were minimal so if anyone has any suggestions about what else to write about, feel free to lmk.

@@ -747,6 +702,40 @@ private String getSequenceNumberInternal(StreamPartition<String> partition, Stri

}

@VisibleForTesting
List<byte[]> deaggregateKinesisRecord(Record kinesisRecord) throws InvalidProtocolBufferException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this function outside of the PartitionResource class so I could test it. Is there a way to test private class methods?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really a good one. That's usually what @VisibleForTesting means: "I would have made this private, except I needed to call it in a unit test."

@@ -20,90 +20,135 @@

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modelVersion>4.0.0</modelVersion>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops changing the indent from 4 spaces to 2 spaces (like all of the other poms) borked the diff.

The actual changes in this file are:

  • Getting rid of the amazon-kinesis-client dependency
  • Adding the protobuf-java dependency
  • Adding the protoc-jar-maven-plugin which runs at compile time and generates a source file for the aggregated record .proto template
  • Adding a profile to suppress MissingOverride errors -> protoc doesn't add Overrides

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, adding ?w=1 to the PR URL (https://github.com/apache/incubator-druid/pull/6917/files?w=1) gets GitHub to show the diff in whitespace-doesn't-matter mode. In this case it makes it clearer what the real changes are.

</plugin>
</plugins>
</build>
<profiles>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a way to target this towards a specific directory, lmk.

if (Arrays.equals(magicNumbers, KPL_AGGREGATE_MAGIC_NUMBERS)
&& Arrays.equals(messageHash, checksum)) {
List<byte[]> data = new ArrayList<>();
AggregatedRecordProtos.AggregatedRecord aggregatedRecord = AggregatedRecordProtos.AggregatedRecord.parseFrom(protobufMessage);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity isn't recognizing the AggregatedRecordProtos class because it doesn't generate sources. What's the best way around that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the error look like? (The job looks like it was restarted recently.) How do the protobuf tests make this work - I think they use generated sources too?

If those avenues don't turn up anything interesting, maybe we can suppress something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TeamCity build says that deaggregateKinesisRecords doesn't throw InvalidProtocolBufferException even though AggregatedRecordProtos#parseFrom throws the exception. From what I understand, the protobuf tests don't generate source files from .proto files at compile time...rather, someone compiled it once. That might be a valid option since we wouldn't expect the KPL aggregation schema to change anytime soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoided the TeamCity inspection errors by removing the generation of AggregatedRecordProtos at compile time and just using it as a regular class with warnings suppressed. This matches the implementation of ProtoTestEvent in the protobuf-extensions tests.

@fjy fjy added this to the 0.15.0 milestone Mar 11, 2019
@@ -0,0 +1 @@
84mawgoObXlQYXJ0aXRpb25LZXkacAgAGmx7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjU1LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjI1LjI5MDQxOTI4ODU4OTAxOH0abggAGmp7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjYxLTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjcuMjYwMjY2MzIxMjkzODR9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2NS0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjozNi4zMzg1NjI0NjQ3ODgyMn0abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjU3LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjk5LjM3MTk4NjE2NjM0ODY5fRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjEtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6NzguMTY4NDU1MDIxMTI4NTh9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2Ni0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo5MS4zMDg0NDg4NTEyMjk0M30abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjU4LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjI4LjA5MjYwMTIwMDk4NDM3fRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjEtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6NjUuMDUwODc3MzIzOTQwMDR9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2Ni0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo1NC4yMzAwNzI0MzE4NzU0NH0abggAGmp7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjU4LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjc5LjM1MjU5ODMxMDgxNjl9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2MS0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo2NS43OTQ3MDA0NTAwOTQxNH0abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjU4LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjgwLjExODYzOTM0OTU2Njg2fRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjItMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6ODEuMTEwNTIwNTY4NjM4MjF9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2Mi0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo4My4yMTQ0MzcwODg3MzEwNX0abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjU4LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjQuNTg1NDg1MzUwODI4MDQ0fRpwCAAabHsidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjItMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6MTEuOTMyNTUyNTE3NjM0MDY1fRpwCAAabHsidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NTgtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6MjUuMDQxMDQ2NTI2ODU4NzQ3fRpwCAAabHsidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjMtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6MTMuNTE5NjIzMDcwMTk2NDUyfRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NTktMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6MzUuMjQxNjg2NjYwNjM0NDR9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2My0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo0LjEyNzI3Njg4NjE2ODQ2M30abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjU5LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjYxLjI1Mjk3ODE2OTI0OTIyfRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjMtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6OTEuODcxMDEwNzc0MDUwMDZ9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY1OS0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo0MS40ODUzMzUwNTkyNDgwM30acAgAGmx7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjYzLTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjMwLjIxOTA5NzIzNjg1OTYxNH0acAgAGmx7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjU5LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjMyLjg5MDgwNzEwNjA3MjUzNn0abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjY0LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjQuNzQ5Njg3MTI5NzcxODk2fRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjAtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6My40Mzc3MzA1NDc4NzMxMDh9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2NC0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo3My41MDQwMDA0Mjc0NjMxMX0acAgAGmx7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjYwLTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjM4LjU3Mjk3MDAwNTg3NTMwNX0abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjY0LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjgxLjc5NTAxNTI2MTkwNjk1fRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjAtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6NzEuODgxNzUwODE0OTM0MjJ9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2NS0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo3Ny41MjExMTEyNjM0NTE3Nn0abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjYwLTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjM1LjAzOTExMjI0ODM4MTkyfRptCAAaaXsidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjUtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6ODEuMDY0MzAxODU0MzIyfRpuCAAaansidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjEtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6MzUuMTU1NjE1MjY1OTI4NH0acAgAGmx7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjY3LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjU3LjM2MzkwODA3Mjc3MjQ5Nn0abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjY3LTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjgyLjI4NzgzMjM5NTkyNjM0fRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjgtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6ODcuNjU5NDQ4MjYyNzkzNDl9GnAIABpseyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2OC0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjoxMi43NTUyNjUxMTQwNzg4ODV9Gm4IABpqeyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY2OC0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo4Mi4wODI5NTEyODA0MDEzfRpwCAAabHsidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjktMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6NC44NTgzMjQyMzg4NzIyNTg1fRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NjktMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6NTAuMzE4OTc5NDM0NTA4NTZ9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY3MC0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjozNi45NjY5MDkyODE0NTI0OX0abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjcwLTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjk2LjgzMzM5MzE4Mjc0NDI3fRpuCAAaansidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NzAtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6NDUuMTQyMDcwMjkxNTA2MX0abggAGmp7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjcxLTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjcwLjg0MjE5MjY3MjM5Mzl9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY3MS0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo2Ni40NDU2NDU2ODE2MzkyNX0abwgAGmt7InRpbWVzdGFtcCI6IjIwMTktMDEtMjVUMTA6MjY6MzUuNjcxLTA4OjAwIiwibXlTdHJpbmciOiJzb21lIHRleHQiLCJteUludCI6NCwibXlEb3VibGUiOjUuMDY0MzczODg3MzcxNzMxfRpvCAAaa3sidGltZXN0YW1wIjoiMjAxOS0wMS0yNVQxMDoyNjozNS42NzEtMDg6MDAiLCJteVN0cmluZyI6InNvbWUgdGV4dCIsIm15SW50Ijo0LCJteURvdWJsZSI6NjMuNTQ1MjE5MjM3ODc2MDd9Gm8IABpreyJ0aW1lc3RhbXAiOiIyMDE5LTAxLTI1VDEwOjI2OjM1LjY3Mi0wODowMCIsIm15U3RyaW5nIjoic29tZSB0ZXh0IiwibXlJbnQiOjQsIm15RG91YmxlIjo1My4zNDI4ODM3MjcwNDg5M31foE8ypcllp3vZXupl8YPE
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this file need a license? If so, I'll have to add something to the reader code in tests to ignore the header

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't, we have a rat exclusion for <exclude>**/test/resources/**</exclude>

@justinborromeo justinborromeo changed the title #6714-Implement-KCL-Deaggregation Implement KCL Deaggregation Apr 4, 2019
@jihoonson
Copy link
Contributor

I'm untagging milestone since this issue is not necessarily a release blocker. Feel free to let me know if you think this should be.

@jihoonson jihoonson removed this from the 0.15.0 milestone May 7, 2019
@justinborromeo
Copy link
Contributor Author

@jihoonson I agree that this shouldn't be a blocker

@justinborromeo
Copy link
Contributor Author

Unnecessary since KCLv2 is Apache 2.0-licensed

@justinborromeo justinborromeo deleted the 6714-Implement-KCL-Deaggregation branch September 6, 2019 23:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement the KPL Aggregated Record Format for Kinesis indexing service
6 participants