Skip to content

Commit

Permalink
Kinesis-sink consider topic-name as partition-key if record key empty (
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Aug 14, 2018
1 parent 7680173 commit ad5fc83
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void write(Record<byte[]> record) throws Exception {
record.getRecordSequence());
throw new IllegalStateException("kinesis queue has publish failure");
}
String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
String partitionedKey = record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey));
partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least one, and at most 256
Expand Down

0 comments on commit ad5fc83

Please sign in to comment.