From 24939bc605ddcbfef642f311903be1a822f33850 Mon Sep 17 00:00:00 2001 From: Terence Yim Date: Mon, 14 Jan 2019 10:51:31 -0800 Subject: [PATCH] =?UTF-8?q?Update=20to=20use=20a=20RecordFormat=20that=20d?= =?UTF-8?q?oesn=E2=80=99t=20involve=20StreamEvent?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hydrator/plugin/batch/source/KafkaBatchSource.java | 6 +++--- .../cask/hydrator/plugin/source/KafkaStreamingSource.java | 7 +++---- .../hydrator/plugin/batch/source/KafkaBatchSource.java | 6 +++--- .../cask/hydrator/plugin/source/KafkaStreamingSource.java | 7 +++---- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java index f9accb9..010cafe 100644 --- a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java @@ -26,7 +26,6 @@ import co.cask.cdap.api.data.format.StructuredRecord; import co.cask.cdap.api.data.schema.Schema; import co.cask.cdap.api.dataset.lib.KeyValue; -import co.cask.cdap.api.flow.flowlet.StreamEvent; import co.cask.cdap.common.io.ByteBuffers; import co.cask.cdap.etl.api.Emitter; import co.cask.cdap.etl.api.PipelineConfigurer; @@ -50,6 +49,7 @@ import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -68,7 +68,7 @@ public class KafkaBatchSource extends BatchSource kafkaRequests; private Schema schema; - private RecordFormat recordFormat; + private RecordFormat recordFormat; private String messageField; private FileContext fileContext; private Path offsetsFile; @@ -231,7 +231,7 @@ public void transform(KeyValue input, Emitter recordFormat; + private transient RecordFormat recordFormat; FormatFunction(long ts, KafkaConfig conf) { super(ts, conf); @@ -283,11 +282,11 @@ protected void addMessage(StructuredRecord.Builder builder, String messageField, if (recordFormat == null) { Schema messageSchema = conf.getMessageSchema(); FormatSpecification spec = - new FormatSpecification(conf.getFormat(), messageSchema, new HashMap()); + new FormatSpecification(conf.getFormat(), messageSchema, new HashMap<>()); recordFormat = RecordFormats.createInitializedFormat(spec); } - StructuredRecord messageRecord = recordFormat.read(new StreamEvent(ByteBuffer.wrap(message))); + StructuredRecord messageRecord = recordFormat.read(ByteBuffer.wrap(message)); for (Schema.Field field : messageRecord.getSchema().getFields()) { String fieldName = field.getName(); builder.set(fieldName, messageRecord.get(fieldName)); diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java index 7bc223b..74fe08c 100644 --- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java +++ b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java @@ -25,7 +25,6 @@ import co.cask.cdap.api.data.format.StructuredRecord; import co.cask.cdap.api.data.schema.Schema; import co.cask.cdap.api.dataset.lib.KeyValue; -import co.cask.cdap.api.flow.flowlet.StreamEvent; import co.cask.cdap.common.io.ByteBuffers; import co.cask.cdap.etl.api.Emitter; import co.cask.cdap.etl.api.PipelineConfigurer; @@ -45,6 +44,7 @@ import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; @@ -64,7 +64,7 @@ public class KafkaBatchSource extends BatchSource kafkaRequests; private Schema schema; - private RecordFormat recordFormat; + private RecordFormat recordFormat; private String messageField; public KafkaBatchSource(KafkaBatchConfig config) { @@ -164,7 +164,7 @@ public void transform(KeyValue input, Emitter recordFormat; + private transient RecordFormat recordFormat; FormatFunction(long ts, KafkaConfig conf) { super(ts, conf); @@ -283,11 +282,11 @@ protected void addMessage(StructuredRecord.Builder builder, String messageField, if (recordFormat == null) { Schema messageSchema = conf.getMessageSchema(); FormatSpecification spec = - new FormatSpecification(conf.getFormat(), messageSchema, new HashMap()); + new FormatSpecification(conf.getFormat(), messageSchema, new HashMap<>()); recordFormat = RecordFormats.createInitializedFormat(spec); } - StructuredRecord messageRecord = recordFormat.read(new StreamEvent(ByteBuffer.wrap(message))); + StructuredRecord messageRecord = recordFormat.read(ByteBuffer.wrap(message)); for (Schema.Field field : messageRecord.getSchema().getFields()) { String fieldName = field.getName(); builder.set(fieldName, messageRecord.get(fieldName));