From f785b6d5603d94b126a9611b4a583e4803dd54f7 Mon Sep 17 00:00:00 2001 From: Teddy Choi Date: Mon, 11 Jun 2018 18:29:57 +0900 Subject: [PATCH 1/2] HIVE-19853: Arrow serializer needs to create a TimeStampMicroTZVector instead of TimeStampMicroVector --- .../hadoop/hive/ql/io/arrow/Deserializer.java | 94 +++++++------------ .../hadoop/hive/ql/io/arrow/Serializer.java | 16 ++-- 2 files changed, 41 insertions(+), 69 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java index 6e09d3991f5c..edc4b39922d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java @@ -29,9 +29,7 @@ import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampMicroVector; -import org.apache.arrow.vector.TimeStampMilliVector; -import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -268,35 +266,11 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector) { } break; case TIMESTAMPMILLI: - { - for (int i = 0; i < size; i++) { - if (arrowVector.isNull(i)) { - VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); - } else { - hiveVector.isNull[i] = false; - - // Time = second + sub-second - final long timeInMillis = ((TimeStampMilliVector) arrowVector).get(i); - final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; - int subSecondInNanos = (int) ((timeInMillis % MILLIS_PER_SECOND) * NS_PER_MILLIS); - long second = timeInMillis / MILLIS_PER_SECOND; - - // A nanosecond value should not be negative - if (subSecondInNanos < 0) { - - // So add one second to the negative nanosecond value to make it positive - subSecondInNanos += NS_PER_SECOND; - - // Subtract one second from the second value because we added one second - second -= 1; - } - timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; - timestampColumnVector.nanos[i] = subSecondInNanos; - } - } - } - break; + case TIMESTAMPMILLITZ: case TIMESTAMPMICRO: + case TIMESTAMPMICROTZ: + case TIMESTAMPNANO: + case TIMESTAMPNANOTZ: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -305,40 +279,36 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector) { hiveVector.isNull[i] = false; // Time = second + sub-second - final long timeInMicros = ((TimeStampMicroVector) arrowVector).get(i); - final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; - int subSecondInNanos = (int) ((timeInMicros % MICROS_PER_SECOND) * NS_PER_MICROS); - long second = timeInMicros / MICROS_PER_SECOND; - - // A nanosecond value should not be negative - if (subSecondInNanos < 0) { - - // So add one second to the negative nanosecond value to make it positive - subSecondInNanos += NS_PER_SECOND; - - // Subtract one second from the second value because we added one second - second -= 1; + final long time = ((TimeStampVector) arrowVector).get(i); + long second; + int subSecondInNanos; + switch (minorType) { + case TIMESTAMPMILLI: + case TIMESTAMPMILLITZ: + { + subSecondInNanos = (int) ((time % MILLIS_PER_SECOND) * NS_PER_MILLIS); + second = time / MILLIS_PER_SECOND; + } + break; + case TIMESTAMPMICROTZ: + case TIMESTAMPMICRO: + { + subSecondInNanos = (int) ((time % MICROS_PER_SECOND) * NS_PER_MICROS); + second = time / MICROS_PER_SECOND; + } + break; + case TIMESTAMPNANOTZ: + case TIMESTAMPNANO: + { + subSecondInNanos = (int) (time % NS_PER_SECOND); + second = time / NS_PER_SECOND; + } + break; + default: + throw new IllegalArgumentException(); } - timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; - timestampColumnVector.nanos[i] = subSecondInNanos; - } - } - } - break; - case TIMESTAMPNANO: - { - for (int i = 0; i < size; i++) { - if (arrowVector.isNull(i)) { - VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); - } else { - hiveVector.isNull[i] = false; - // Time = second + sub-second - final long timeInNanos = ((TimeStampNanoVector) arrowVector).get(i); final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; - int subSecondInNanos = (int) (timeInNanos % NS_PER_SECOND); - long second = timeInNanos / NS_PER_SECOND; - // A nanosecond value should not be negative if (subSecondInNanos < 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index e6af916ce811..4f8078b73c4d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -30,7 +30,7 @@ import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -38,6 +38,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; +import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.FieldType; @@ -71,6 +72,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.TimeZone; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; @@ -177,8 +179,8 @@ private ArrowType toArrowType(TypeInfo typeInfo) { case DATE: return Types.MinorType.DATEDAY.getType(); case TIMESTAMP: - // HIVE-19723: Prefer microsecond because Spark supports it - return Types.MinorType.TIMESTAMPMICRO.getType(); + // HIVE-19853: Prefer timestamp in microsecond with time zone because Spark supports it + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, TimeZone.getDefault().getID()); case BINARY: return Types.MinorType.VARBINARY.getType(); case DECIMAL: @@ -433,11 +435,11 @@ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, Ty break; case TIMESTAMP: { - final TimeStampMicroVector timeStampMicroVector = (TimeStampMicroVector) arrowVector; + final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector; final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; for (int i = 0; i < size; i++) { if (hiveVector.isNull[i]) { - timeStampMicroVector.setNull(i); + timeStampMicroTZVector.setNull(i); } else { // Time = second + sub-second final long secondInMillis = timestampColumnVector.getTime(i); @@ -446,9 +448,9 @@ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, Ty if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) { // If the timestamp cannot be represented in long microsecond, set it as a null value - timeStampMicroVector.setNull(i); + timeStampMicroTZVector.setNull(i); } else { - timeStampMicroVector.set(i, secondInMicros + subSecondInMicros); + timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros); } } } From 92c49d50d984f9ba6c8985d6f4da4ca8b22343a0 Mon Sep 17 00:00:00 2001 From: Teddy Choi Date: Thu, 14 Jun 2018 23:52:41 +0900 Subject: [PATCH 2/2] Compute timestamps in UTC --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 4f8078b73c4d..2961050532fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -72,7 +72,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.TimeZone; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; @@ -180,7 +179,7 @@ private ArrowType toArrowType(TypeInfo typeInfo) { return Types.MinorType.DATEDAY.getType(); case TIMESTAMP: // HIVE-19853: Prefer timestamp in microsecond with time zone because Spark supports it - return new ArrowType.Timestamp(TimeUnit.MICROSECOND, TimeZone.getDefault().getID()); + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); case BINARY: return Types.MinorType.VARBINARY.getType(); case DECIMAL: