From bb7b45cd5d454d64c3454b01f8f3f1e13ed26ff3 Mon Sep 17 00:00:00 2001 From: Sam Meder Date: Tue, 24 Sep 2013 08:31:18 -0700 Subject: [PATCH] kafka-946; Kafka Hadoop Consumer fails when verifying message checksum; patched by Sam Meder; reviewed by Jun Rao --- .../src/main/java/kafka/etl/KafkaETLContext.java | 2 +- .../src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index 8e98efc1cdff..1d0e0a917985 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -196,7 +196,7 @@ protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException { if (_messageIt != null && _messageIt.hasNext()) { MessageAndOffset messageAndOffset = _messageIt.next(); - ByteBuffer buf = messageAndOffset.message().payload(); + ByteBuffer buf = messageAndOffset.message().buffer(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java index b0aadff332d6..45cc92181014 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java @@ -16,8 +16,6 @@ */ package kafka.etl.impl; -import java.io.IOException; -import java.nio.ByteBuffer; import kafka.etl.KafkaETLKey; import kafka.etl.KafkaETLUtils; import kafka.message.Message; @@ -29,6 +27,9 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import java.io.IOException; +import java.nio.ByteBuffer; + /** * Simple implementation of KafkaETLMapper. It assumes that * input data are text timestamp (long). @@ -59,7 +60,7 @@ public void map(KafkaETLKey key, BytesWritable val, byte[] bytes = KafkaETLUtils.getBytes(val); //check the checksum of message - Message message = new Message(bytes); + Message message = new Message(ByteBuffer.wrap(bytes)); long checksum = key.getChecksum(); if (checksum != message.checksum()) throw new IOException ("Invalid message checksum "