From f48aacceff8691e785f6ff8eb767f954a78b29de Mon Sep 17 00:00:00 2001 From: Jim Lim Date: Tue, 27 Jan 2015 13:31:41 -0800 Subject: [PATCH 1/2] reset pKey when message#key is null The pKey object is reused across messages, but is not being reset between each read. #getNext should empty pKey when it receives a message without a partition key. --- .../java/com/linkedin/camus/etl/kafka/common/KafkaReader.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java index 672a4f685..a8857abe2 100644 --- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java @@ -116,6 +116,8 @@ public boolean getNext(EtlKey key, BytesWritable payload, BytesWritable pKey) th bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); pKey.set(bytes, 0, origSize); + } else { + pKey.setSize(0); } key.clear(); From 3695ea5e928b76b03a36cb2bc437d52132a77950 Mon Sep 17 00:00:00 2001 From: Jim Lim Date: Fri, 20 Feb 2015 16:14:30 -0800 Subject: [PATCH 2/2] add logging when message#key is null --- .../java/com/linkedin/camus/etl/kafka/common/KafkaReader.java | 1 + 1 file changed, 1 insertion(+) diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java index a8857abe2..5eb22f4a2 100644 --- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java @@ -117,6 +117,7 @@ public boolean getNext(EtlKey key, BytesWritable payload, BytesWritable pKey) th buf.get(bytes, buf.position(), origSize); pKey.set(bytes, 0, origSize); } else { + log.warn("Received message with null message.key(): " + msgAndOffset); pKey.setSize(0); }