From 00b315797e855fb868e62173e0e2ada143d304cb Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Tue, 4 Dec 2018 10:41:43 -0800 Subject: [PATCH 1/2] HIVE-21004: Reduce the amount of not needed object creation at Kafka storage handler Change-Id: Ifc3571d9de7570067458bcc0029f4d9a9038000d --- .../apache/hadoop/hive/kafka/KafkaSerDe.java | 36 ++++++++++++------- .../kafka/VectorizedKafkaRecordReader.java | 18 +++++++--- 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java index d3cd45150bf3..ffe77886f8e6 100644 --- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java +++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java @@ -156,8 +156,7 @@ int firstMetadataColumnIndex = data.size() - MetadataColumn.values().length; if (delegateSerializerOI == null) { //@TODO check if i can cache this if it is the same. - delegateSerializerOI = - new SubStructObjectInspector(structObjectInspector, firstMetadataColumnIndex); + delegateSerializerOI = new SubStructObjectInspector(structObjectInspector, firstMetadataColumnIndex); } // We always append the metadata columns to the end of the row. final List row = data.subList(0, firstMetadataColumnIndex); @@ -198,24 +197,30 @@ } @Override public Object deserialize(Writable blob) throws SerDeException { - return deserializeKWritable((KafkaWritable) blob); + Object[] rowBoat = new Object[columnNames.size()]; + deserializeKWritable((KafkaWritable) blob, rowBoat); + return rowBoat; } - ArrayList deserializeKWritable(KafkaWritable kafkaWritable) throws SerDeException { - ArrayList resultRow = new ArrayList<>(columnNames.size()); + /** + * @param kafkaWritable Kafka writable object containing the row plus kafka metadata + * @param rowBoat Boat sized to width of the kafka row plus metadata to carry the row to operator upstream. + * + * @throws SerDeException in case of any serde issue. + */ + void deserializeKWritable(KafkaWritable kafkaWritable, Object[] rowBoat) throws SerDeException { + final Object row = delegateSerDe.deserialize(bytesConverter.getWritable(kafkaWritable.getValue())); //first add the value payload elements - for (int i = 0; i < metadataStartIndex; i++) { - resultRow.add(delegateDeserializerOI.getStructFieldData(row, - delegateDeserializerOI.getStructFieldRef(columnNames.get(i)))); + rowBoat[i] = + delegateDeserializerOI.getStructFieldData(row, delegateDeserializerOI.getStructFieldRef(columnNames.get(i))); } //add the metadata columns for (int i = metadataStartIndex; i < columnNames.size(); i++) { final MetadataColumn metadataColumn = MetadataColumn.forName(columnNames.get(i)); - resultRow.add(kafkaWritable.getHiveWritable(metadataColumn)); + rowBoat[i] = (kafkaWritable.getHiveWritable(metadataColumn)); } - return resultRow; } @Override public ObjectInspector getObjectInspector() { @@ -233,7 +238,8 @@ private static final class SubStructObjectInspector extends StructObjectInspecto /** * Returns a live view of the base Object inspector starting form 0 to {@code toIndex} exclusive. - * @param baseOI base Object Inspector. + * + * @param baseOI base Object Inspector. * @param toIndex toIndex. */ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { @@ -250,6 +256,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * Look up a field. + * * @param fieldName fieldName to be looked up. */ @SuppressWarnings("OptionalGetWithoutIsPresent") @Override public StructField getStructFieldRef(String fieldName) { @@ -262,7 +269,8 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * returns null for data = null. - * @param data input. + * + * @param data input. * @param fieldRef field to extract. */ @Override public Object getStructFieldData(Object data, StructField fieldRef) { @@ -271,6 +279,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * returns null for data = null. + * * @param data input data. */ @Override public List getStructFieldsDataAsList(Object data) { @@ -288,7 +297,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * Returns the name of the data type that is inspected by this * ObjectInspector. This is used to display the type information to the user. - * + *

* For primitive types, the type name is standardized. For other types, the * type name can be something like "list<int>", "map<int,string>", java class * names, or user-defined type names similar to typedef. @@ -309,6 +318,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * Class that encapsulate the logic of serialize and deserialize bytes array to/from the delegate writable format. + * * @param delegate writable class. */ private interface BytesConverter { diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java index 294085610e68..12eb72612e58 100644 --- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java +++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.Properties; @@ -66,6 +65,7 @@ class VectorizedKafkaRecordReader implements RecordReader 0) { - ArrayList row = serDe.deserializeKWritable(kafkaWritable); + serDe.deserializeKWritable(kafkaWritable, row); for (int i : projectedColumns) { - vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row.get(i)); + vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row[i]); } } rowsCount++; } vectorizedRowBatch.size = rowsCount; consumedRecords += rowsCount; + cleanRowBoat(); return rowsCount; } - private static KafkaSerDe createAndInitializeSerde(Configuration jobConf) { + @SuppressWarnings("Duplicates") private static KafkaSerDe createAndInitializeSerde(Configuration jobConf) { KafkaSerDe serDe = new KafkaSerDe(); MapWork mapWork = Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null"); Properties From b0ebdc4b5e59a080fbfef02e2645066f73da881a Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Tue, 4 Dec 2018 13:48:58 -0800 Subject: [PATCH 2/2] fix style Change-Id: I11cd02df3070e8eea7d6edc986a6d8dced72274e --- .../hadoop/hive/kafka/VectorizedKafkaRecordReader.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java index 12eb72612e58..5f55bbce20b4 100644 --- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java +++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java @@ -138,11 +138,10 @@ private void cleanRowBoat() { } @Override public float getProgress() { - if (consumedRecords == 0) { - return 0f; - } if (consumedRecords >= totalNumberRecords) { return 1f; + } else if (consumedRecords == 0) { + return 0f; } return consumedRecords * 1.0f / totalNumberRecords; } @@ -163,7 +162,7 @@ private int readNextBatch(VectorizedRowBatch vectorizedRowBatch, kafkaWritable.set(kRecord); readBytes += kRecord.serializedKeySize() + kRecord.serializedValueSize(); if (projectedColumns.length > 0) { - serDe.deserializeKWritable(kafkaWritable, row); + serDe.deserializeKWritable(kafkaWritable, row); for (int i : projectedColumns) { vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row[i]); } @@ -176,7 +175,7 @@ private int readNextBatch(VectorizedRowBatch vectorizedRowBatch, return rowsCount; } - @SuppressWarnings("Duplicates") private static KafkaSerDe createAndInitializeSerde(Configuration jobConf) { + @SuppressWarnings("Duplicates") private static KafkaSerDe createAndInitializeSerde(Configuration jobConf) { KafkaSerDe serDe = new KafkaSerDe(); MapWork mapWork = Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null"); Properties