Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partition key support #198

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10.3</version>
<version>2.8.11</version>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
Expand Down Expand Up @@ -196,6 +196,14 @@
<finalName>${project.artifactId}-${project.version}</finalName>
<sourceDirectory>src</sourceDirectory>
<scriptSourceDirectory>src</scriptSourceDirectory>
<resources>
<resource>
<directory>src</directory>
<includes>
<include>**/*.properties</include>
Copy link
Author

@yang-wei yang-wei Mar 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

</includes>
</resource>
</resources>
<pluginManagement>
<plugins>
<plugin>
Expand Down Expand Up @@ -229,4 +237,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
* Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.amazon.kinesis.streaming.agent.tailing;
Expand All @@ -18,6 +18,7 @@
public class KinesisConstants extends Constants {
public static final String DESTINATION_KEY = "kinesisStream";
public static final String PARTITION_KEY = "partitionKeyOption";
public static final String PARTITION_PATTERN = "partitionKeyPattern";

public static final int PER_RECORD_OVERHEAD_BYTES = 0;
public static final int MAX_RECORD_SIZE_BYTES = 1024 * 1024;
Expand All @@ -28,8 +29,10 @@ public class KinesisConstants extends Constants {
public static final int MAX_BUFFER_SIZE_BYTES = MAX_PUT_RECORDS_SIZE_BYTES;
public static final int DEFAULT_PARSER_BUFFER_SIZE_BYTES = MAX_BUFFER_SIZE_BYTES;

public static final PartitionKeyOption DefaultPartitionKeyOption = PartitionKeyOption.RANDOM;
public static enum PartitionKeyOption {
RANDOM,
DETERMINISTIC
DETERMINISTIC,
PATTERN
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
/*
* Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.amazon.kinesis.streaming.agent.tailing;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import lombok.Getter;
import lombok.ToString;
Expand All @@ -42,12 +43,18 @@ public class KinesisFileFlow extends FileFlow<KinesisRecord> {
@Getter protected final String id;
@Getter protected final String destination;
@Getter protected final PartitionKeyOption partitionKeyOption;
@Getter protected Pattern partitionKeyPattern = null;

public KinesisFileFlow(AgentContext context, Configuration config) {
super(context, config);
destination = readString(KinesisConstants.DESTINATION_KEY);
id = "kinesis:" + destination + ":" + sourceFile.toString();
partitionKeyOption = readEnum(PartitionKeyOption.class, KinesisConstants.PARTITION_KEY, PartitionKeyOption.RANDOM);
partitionKeyOption = readEnum(PartitionKeyOption.class, KinesisConstants.PARTITION_KEY, KinesisConstants.DefaultPartitionKeyOption);

if (partitionKeyOption == PartitionKeyOption.PATTERN) {
String patternValue = readString(KinesisConstants.PARTITION_PATTERN);
partitionKeyPattern = Pattern.compile(patternValue);
}
}

@Override
Expand Down
68 changes: 52 additions & 16 deletions src/com/amazon/kinesis/streaming/agent/tailing/KinesisRecord.java
Original file line number Diff line number Diff line change
@@ -1,42 +1,55 @@
/*
* Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.amazon.kinesis.streaming.agent.tailing;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.amazon.kinesis.streaming.agent.AgentContext;
import com.amazon.kinesis.streaming.agent.tailing.KinesisConstants.PartitionKeyOption;
import com.amazonaws.partitions.model.Partition;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.sun.xml.internal.ws.wsdl.writer.document.Part;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KinesisRecord extends AbstractRecord {

private static final Logger LOGGER = LoggerFactory.getLogger(KinesisRecord.class);

protected final String partitionKey;

public KinesisRecord(TrackedFile file, long offset, ByteBuffer data, long originalLength) {
super(file, offset, data, originalLength);
Preconditions.checkNotNull(file);
partitionKey = generatePartitionKey(((KinesisFileFlow)file.getFlow()).getPartitionKeyOption());
KinesisFileFlow flow = ((KinesisFileFlow)file.getFlow());
partitionKey = generatePartitionKey(flow.getPartitionKeyOption(), flow.getPartitionKeyPattern());
}

public KinesisRecord(TrackedFile file, long offset, byte[] data, long originalLength) {
super(file, offset, data, originalLength);
Preconditions.checkNotNull(file);
partitionKey = generatePartitionKey(((KinesisFileFlow)file.getFlow()).getPartitionKeyOption());
KinesisFileFlow flow = ((KinesisFileFlow)file.getFlow());
partitionKey = generatePartitionKey(flow.getPartitionKeyOption(), flow.getPartitionKeyPattern());
}

public String partitionKey() {
return partitionKey;
}
Expand All @@ -45,29 +58,52 @@ public String partitionKey() {
public long lengthWithOverhead() {
return length() + KinesisConstants.PER_RECORD_OVERHEAD_BYTES;
}

@Override
public long length() {
return dataLength() + partitionKey.length();
}

@Override
protected int getMaxDataSize() {
return KinesisConstants.MAX_RECORD_SIZE_BYTES - partitionKey.length();
}

@VisibleForTesting
String generatePartitionKey(PartitionKeyOption option) {
String generatePartitionKey(PartitionKeyOption option, Pattern pattern) {
Preconditions.checkNotNull(option);

if (option == PartitionKeyOption.DETERMINISTIC) {
Hasher hasher = Hashing.md5().newHasher();
hasher.putBytes(data.array());
return hasher.hash().toString();
}
if (option == PartitionKeyOption.RANDOM)
return "" + ThreadLocalRandom.current().nextDouble(1000000);

if (option == PartitionKeyOption.PATTERN) {
return generatePartitionKeyByPattern(pattern);
}

return null;
}

String generatePartitionKeyByPattern(Pattern pattern) {
if (pattern == null) {
LOGGER.error("PartitionKeyPattern is required when PATTERN is set as PartitionKeyOption.");
return generatePartitionKey(KinesisConstants.DefaultPartitionKeyOption, null);
}
Matcher matcher = pattern.matcher(getRemainingString());
if (matcher.matches() && matcher.groupCount() == 1) {
return matcher.group(1);
} else {
LOGGER.error("Failed to generate partition key based on pattern (" + pattern + ") on line [" + getRemainingString() + "]. Fallback to default.");
return generatePartitionKey(KinesisConstants.DefaultPartitionKeyOption, null);
}
}

private String getRemainingString() {
byte[] bytes = Arrays.copyOfRange(data.array(), data.arrayOffset(), data.arrayOffset() + data.remaining());
return new String(bytes, 0, bytes.length).trim();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void testKinesisConfigurationDefaults() {
assertEquals(ff.getRetryMaxBackoffMillis(), KinesisConstants.DEFAULT_RETRY_MAX_BACKOFF_MILLIS);
assertEquals(ff.getPartitionKeyOption(), KinesisConstants.PartitionKeyOption.RANDOM);
}


@SuppressWarnings("serial")
@Test
Expand All @@ -47,15 +47,22 @@ public void testPartitionKeyOption() {
put(getDestinationKey(), "des2");
put(KinesisConstants.PARTITION_KEY, "RANDOM");
}}));
KinesisFileFlow ff3 = buildFileFlow(context, new Configuration(new HashMap<String, Object>() {{
put("filePattern", file);
put(getDestinationKey(), "des3");
put(KinesisConstants.PARTITION_KEY, "PATTERN");
put(KinesisConstants.PARTITION_PATTERN, ".*([a-z]).*");
}}));
assertEquals(ff1.getPartitionKeyOption(), KinesisConstants.PartitionKeyOption.DETERMINISTIC);
assertEquals(ff2.getPartitionKeyOption(), KinesisConstants.PartitionKeyOption.RANDOM);
assertEquals(ff3.getPartitionKeyOption(), KinesisConstants.PartitionKeyOption.PATTERN);
}

@DataProvider(name="badPartitionKeyOptionInConfig")
public Object[][] testPartitionKeyOptionInConfigData(){
return new Object[][] { { "UNSUPPORTED" }, { "random" }, { "" } };
}

@SuppressWarnings("serial")
@Test(dataProvider="badPartitionKeyOptionInConfig",
expectedExceptions=ConfigurationException.class)
Expand All @@ -69,7 +76,18 @@ public void testWrongPartitionKeyOption(final String partitionKeyOption) {
}}));
ff.getPartitionKeyOption();
}


@Test(expectedExceptions = ConfigurationException.class)
public void testEmptyPatternWhenPartitionKeyOptionIsPATTERN() {
AgentContext context = TestUtils.getTestAgentContext();
final String file = "/var/log/message*";
buildFileFlow(context, new Configuration(new HashMap<String, Object>() {{
put("filePattern", file);
put(getDestinationKey(), "des1");
put(KinesisConstants.PARTITION_KEY, "PATTERN");
}}));
}

@DataProvider(name="badMaxBufferAgeMillisInConfig")
@Override
public Object[][] testMaxBufferAgeMillisInConfigData(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;

import org.apache.commons.lang3.RandomUtils;
import org.mockito.Mockito;
Expand All @@ -27,7 +28,7 @@ public class KinesisRecordTest extends TestBase {
@SuppressWarnings("rawtypes")
private FileFlow flow;
private TrackedFile file;

@BeforeMethod
public void setup() throws IOException {
flow = Mockito.mock(KinesisFileFlow.class);
Expand Down Expand Up @@ -62,16 +63,58 @@ public void testTruncate() throws IOException {
Assert.assertEquals(record.length(), KinesisConstants.MAX_RECORD_SIZE_BYTES);
Assert.assertTrue(ByteBuffers.toString(record.data, StandardCharsets.UTF_8).endsWith(KinesisFileFlow.DEFAULT_TRUNCATED_RECORD_TERMINATOR));
}

@SuppressWarnings("rawtypes")
@Test
public void testGeneratePartitionKey() {
final PartitionKeyOption partitionKeyOption = KinesisConstants.PartitionKeyOption.DETERMINISTIC;
when(((KinesisFileFlow)flow).getPartitionKeyOption()).thenReturn(partitionKeyOption);

byte[] data = RandomUtils.nextBytes(200);
KinesisRecord record = new KinesisRecord(file, 1023, data, data.length);
Assert.assertNotNull(record.partitionKey());
Assert.assertEquals(record.partitionKey(), record.generatePartitionKey(partitionKeyOption));
Assert.assertEquals(record.partitionKey(), record.generatePartitionKey(partitionKeyOption, null));
}


@SuppressWarnings("rawtypes")
@Test
public void testGeneratePatternPartitionKey() {
final PartitionKeyOption partitionKeyOption = PartitionKeyOption.PATTERN;
when(((KinesisFileFlow)flow).getPartitionKeyOption()).thenReturn(partitionKeyOption);
final Pattern pattern = Pattern.compile(".*\"deviceId\"\\s*:\\s*\"([a-zA-Z0-9-]+)\".*");
when(((KinesisFileFlow)flow).getPartitionKeyPattern()).thenReturn(pattern);

byte[] data = "{ \"deviceId\": \"1234-5678\", \"weight\": 52 }".getBytes();
KinesisRecord record = new KinesisRecord(file, 1023, data, data.length);
Assert.assertNotNull(record.partitionKey());
Assert.assertEquals(record.partitionKey(), "1234-5678");
Assert.assertEquals(record.generatePartitionKey(partitionKeyOption, pattern), "1234-5678");
}

@SuppressWarnings("rawtypes")
@Test
public void testGenerateNonPatternPartitionKey() {
final PartitionKeyOption partitionKeyOption = PartitionKeyOption.PATTERN;
when(((KinesisFileFlow)flow).getPartitionKeyOption()).thenReturn(partitionKeyOption);
final Pattern pattern = Pattern.compile(".*\"nonExistKey\"\\s*:\\s*\"([a-zA-Z0-9-]+)\".*");
when(((KinesisFileFlow)flow).getPartitionKeyPattern()).thenReturn(pattern);

byte[] data = "{ \"deviceId\": \"1234-5678\", \"weight\": 52 }".getBytes();
KinesisRecord record = new KinesisRecord(file, 1023, data, data.length);
Assert.assertNotNull(record.partitionKey());
Assert.assertNotEquals(record.partitionKey(), "1234-5678");
}

@SuppressWarnings("rawtypes")
@Test
public void testGeneratePatternPartitionKeyWhenPatternIsNull() {
final PartitionKeyOption partitionKeyOption = PartitionKeyOption.PATTERN;
when(((KinesisFileFlow)flow).getPartitionKeyOption()).thenReturn(partitionKeyOption);

byte[] data = "{ \"wrongId\": \"1234-5678\", \"weight\": 52 }".getBytes();
KinesisRecord record = new KinesisRecord(file, 1023, data, data.length);
Assert.assertNotNull(record.partitionKey());
Assert.assertNotEquals(record.partitionKey(), "1234-5678");
}
}