Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KCL Deaggregation #6917

Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2d59c8b
Created a protobuf schema for user records
justinborromeo Jan 22, 2019
a4e5134
Renamed UserRecord to KinesisAggregatedRecord
justinborromeo Jan 22, 2019
edf8644
Untested implementation of deaggregation
justinborromeo Jan 22, 2019
e6cd3f8
Happy path deaggregation test
justinborromeo Jan 23, 2019
8472c33
Added tests for preconditions
justinborromeo Jan 23, 2019
4ee535a
Non-aggregated records work
justinborromeo Jan 25, 2019
f5608ae
Live tested on Kinesis stream, tests finished
justinborromeo Jan 25, 2019
629e9ac
Fixed small message bug and added test
justinborromeo Jan 25, 2019
46be748
Made path relative for test files
justinborromeo Jan 25, 2019
87059c1
Protobuf java class is now generated (codestyle is ignored)
justinborromeo Jan 25, 2019
070b90c
Oops there were some extra commas
justinborromeo Jan 26, 2019
3294efd
Suppress override errors on protoc-generated files
justinborromeo Jan 26, 2019
2788990
.
justinborromeo Jan 27, 2019
957231b
Added log message for invalid protocol buffer exception
justinborromeo Jan 27, 2019
9a94ea6
Teamcity why u do dis to me
justinborromeo Jan 28, 2019
5359a26
Merge branch 'master' into 6714-Implement-KCL-Deaggregation
justinborromeo Jan 31, 2019
93cfa38
Added protoc generated source as source
justinborromeo Feb 2, 2019
a77ef0d
Added suppression for AggregatedRecordProtos
justinborromeo Feb 2, 2019
e153624
Trying a new suppression
justinborromeo Feb 2, 2019
d70538d
Trying some stuff
justinborromeo Feb 2, 2019
f343b4c
Trying to suppress the checkstyle issues with protoc-compiled file
justinborromeo Feb 5, 2019
debcf21
Merge branch '6714-Implement-KCL-Deaggregation' of github.com:justinb…
justinborromeo Feb 5, 2019
b9bca93
Removed protoc compiler dependency
justinborromeo Feb 5, 2019
a39fdc4
Fixed import
justinborromeo Feb 5, 2019
41d1d4f
Added Apache License
justinborromeo Feb 5, 2019
7658a30
Changed pom to remove stuff that didn't work for TeamCity inspection
justinborromeo Feb 5, 2019
21d2c25
Changed some classes to conform to checkstyle and added suppressions to
justinborromeo Feb 5, 2019
1546695
Merge branch 'master' into 6714-Implement-KCL-Deaggregation
justinborromeo Feb 5, 2019
9696bfd
Kick TeamCity
justinborromeo Feb 12, 2019
4268f8b
Merge branch 'master' into 6714-Implement-KCL-Deaggregation
justinborromeo Feb 12, 2019
5731689
kick teamcity (again)
justinborromeo Feb 12, 2019
882b6d9
Merge branch 'master' into 6714-Implement-KCL-Deaggregation
justinborromeo Feb 15, 2019
eef6b73
Improved naming and added a bit of javadoc
justinborromeo Feb 15, 2019
d3b39bf
Logic is much cleaner and use Druid util classes
justinborromeo Feb 15, 2019
4345190
Merge branch 'master' into 6714-Implement-KCL-Deaggregation
justinborromeo Feb 20, 2019
542f912
Merge branch 'master' into 6714-Implement-KCL-Deaggregation
justinborromeo Mar 12, 2019
79ce422
Fixed weird import
justinborromeo Mar 15, 2019
1b2db0e
Merge branch 'master' into 6714-Implement-KCL-Deaggregation
justinborromeo Mar 22, 2019
1d164ac
Merge branch 'master' into 6714-Implement-KCL-Deaggregation
justinborromeo Apr 3, 2019
55787df
Merge branch 'master' of https://github.com/apache/incubator-druid in…
justinborromeo Apr 3, 2019
fd332bf
Fix tests
justinborromeo Apr 4, 2019
8a67ff5
Change visibility of Magic Numbers
justinborromeo Jun 15, 2019
63a0656
Merge branch '6714-Implement-KCL-Deaggregation' of github.com:justinb…
justinborromeo Jun 15, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions docs/content/development/extensions-core/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ For Roaring bitmaps:
|`fetchDelayMillis`|Integer|Time in milliseconds to wait between subsequent GetRecords calls to Kinesis. See 'Determining Fetch Settings' below.|no (default == 1000)|
|`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|no|
|`awsExternalId`|String|The AWS external id to use for additional permissions.|no|
|`deaggregate`|Boolean|Whether to use the de-aggregate function of the KCL. See below for details.|no|

## Operations

Expand Down Expand Up @@ -380,13 +379,7 @@ Internally, each indexing task maintains a buffer that stores the fetched but no
control this behavior. The number of records that the indexing task fetch from the buffer is controlled by `maxRecordsPerPoll`, which
determines the number of records to be processed per each ingestion loop in the task.

## Deaggregation
See [issue](https://github.com/apache/incubator-druid/issues/6714)
# Deaggregation

The Kinesis indexing service supports de-aggregation of multiple rows packed into a single record by the Kinesis
Producer Library's aggregate method for more efficient data transfer. Currently, enabling the de-aggregate functionality
requires the user to manually provide the Kinesis Client Library on the classpath, since this library has a license not
compatible with Apache projects.

To enable this feature, add the `amazon-kinesis-client` (tested on version `1.9.2`) jar file ([link](https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-client/1.9.2)) under `dist/druid/extensions/druid-kinesis-indexing-service/`.
Then when submitting a supervisor-spec, set `deaggregate` to true.
Producer Library's aggregate method for more efficient data transfer.
218 changes: 132 additions & 86 deletions extensions-core/kinesis-indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,97 +20,143 @@

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modelVersion>4.0.0</modelVersion>
Copy link
Contributor Author

@justinborromeo justinborromeo Jan 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops changing the indent from 4 spaces to 2 spaces (like all of the other poms) borked the diff.

The actual changes in this file are:

  • Getting rid of the amazon-kinesis-client dependency
  • Adding the protobuf-java dependency
  • Adding the protoc-jar-maven-plugin which runs at compile time and generates a source file for the aggregated record .proto template
  • Adding a profile to suppress MissingOverride errors -> protoc doesn't add Overrides

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, adding ?w=1 to the PR URL (https://github.com/apache/incubator-druid/pull/6917/files?w=1) gets GitHub to show the diff in whitespace-doesn't-matter mode. In this case it makes it clearer what the real changes are.


<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-kinesis-indexing-service</artifactId>
<name>druid-kinesis-indexing-service</name>
<description>druid-kinesis-indexing-service</description>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-kinesis-indexing-service</artifactId>
<name>druid-kinesis-indexing-service</name>
<description>druid-kinesis-indexing-service</description>

<parent>
<groupId>org.apache.druid</groupId>
<artifactId>druid</artifactId>
<version>0.13.0-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<properties>
<protobuf.version>3.6.1</protobuf.version>
<checkstyle.skip>true</checkstyle.skip>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws.sdk.version}</version>
<exclusions>
<!-- aws-java-sdk-core is provided by Druid core. -->
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<parent>
<groupId>org.apache.druid</groupId>
<artifactId>druid</artifactId>
<version>0.13.0-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.9.2</version>
<scope>provided</scope>
</dependency>
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws.sdk.version}</version>
<exclusions>
<!-- aws-java-sdk-core is provided by Druid core. -->
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Tests -->
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>

<!-- Tests -->
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.6.0.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<addProtoSources>all</addProtoSources>
<inputDirectories>
<include>src/main/protobuf</include>
</inputDirectories>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
Copy link
Contributor Author

@justinborromeo justinborromeo Jan 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a way to target this towards a specific directory, lmk.

<profile>
<id>strict</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs>
<!-- Protoc-generated classes miss @Override, that is not easy to fix -->
<arg>-Xep:MissingOverride:WARN</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ protected KinesisRecordSupplier newTaskRecordSupplier()
ioConfig.getRecordsPerFetch(),
ioConfig.getFetchDelayMillis(),
fetchThreads,
ioConfig.isDeaggregate(),
tuningConfig.getRecordBufferSize(),
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St

private final String awsAssumedRoleArn;
private final String awsExternalId;
private final boolean deaggregate;

@JsonCreator
public KinesisIndexTaskIOConfig(
Expand All @@ -56,8 +55,7 @@ public KinesisIndexTaskIOConfig(
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("exclusiveStartSequenceNumberPartitions") Set<String> exclusiveStartSequenceNumberPartitions,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
@JsonProperty("deaggregate") boolean deaggregate
@JsonProperty("awsExternalId") String awsExternalId
)
{
super(
Expand All @@ -81,7 +79,6 @@ public KinesisIndexTaskIOConfig(
this.fetchDelayMillis = fetchDelayMillis != null ? fetchDelayMillis : DEFAULT_FETCH_DELAY_MILLIS;
this.awsAssumedRoleArn = awsAssumedRoleArn;
this.awsExternalId = awsExternalId;
this.deaggregate = deaggregate;
}

@JsonProperty
Expand Down Expand Up @@ -114,12 +111,6 @@ public String getAwsExternalId()
return awsExternalId;
}

@JsonProperty
public boolean isDeaggregate()
{
return deaggregate;
}

@Override
public String toString()
{
Expand All @@ -136,7 +127,6 @@ public String toString()
", exclusiveStartSequenceNumberPartitions=" + getExclusiveStartSequenceNumberPartitions() +
", awsAssumedRoleArn='" + awsAssumedRoleArn + '\'' +
", awsExternalId='" + awsExternalId + '\'' +
", deaggregate=" + deaggregate +
'}';
}
}
Loading