diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt index 9e86eacb0e8c7..07373d576633d 100644 --- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt +++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt @@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Identity Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -BooleanUpdater 0 0 0 17004.4 0.1 1.0X -ByteUpdater (INT32 -> Byte) 0 0 0 3746.5 0.3 0.2X -ShortUpdater (INT32 -> Short) 1 1 0 1681.2 0.6 0.1X -IntegerUpdater 0 0 0 10290.1 0.1 0.6X -LongUpdater 0 0 0 3875.9 0.3 0.2X -FloatUpdater 0 0 0 10148.5 0.1 0.6X -DoubleUpdater 0 0 0 5141.3 0.2 0.3X -BinaryUpdater 15 15 0 70.7 14.1 0.0X +BooleanUpdater 0 0 0 16990.6 0.1 1.0X +ByteUpdater (INT32 -> Byte) 0 0 0 3765.0 0.3 0.2X +ShortUpdater (INT32 -> Short) 1 1 0 1682.9 0.6 0.1X +IntegerUpdater 0 0 0 7756.2 0.1 0.5X +LongUpdater 0 0 0 3870.4 0.3 0.2X +FloatUpdater 0 0 0 7758.5 0.1 0.5X +DoubleUpdater 0 0 0 3875.9 0.3 0.2X +BinaryUpdater 15 15 0 70.4 14.2 0.0X ================================================================================================ @@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Type-converting Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -IntegerToLongUpdater 0 0 0 6237.3 0.2 1.0X -IntegerToDoubleUpdater 0 0 0 6117.3 0.2 1.0X -FloatToDoubleUpdater 0 0 0 2526.7 0.4 0.4X -DateToTimestampNTZUpdater 3 3 0 366.1 2.7 0.1X -DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0 0 5126.2 0.2 0.8X +IntegerToLongUpdater 0 0 0 5133.8 0.2 1.0X +IntegerToDoubleUpdater 0 0 0 6090.4 0.2 1.2X +FloatToDoubleUpdater 0 0 0 2527.1 0.4 0.5X +DateToTimestampNTZUpdater 1 1 0 934.8 1.1 0.2X +DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0 0 5108.5 0.2 1.0X ================================================================================================ @@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Rebase Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------- -IntegerWithRebaseUpdater (DATE legacy) 0 0 0 3640.1 0.3 1.0X -LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0 0 2281.8 0.4 0.6X +IntegerWithRebaseUpdater (DATE legacy) 0 0 0 3263.0 0.3 1.0X +LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0 0 2282.0 0.4 0.7X LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3 0 420.5 2.4 0.1X @@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Unsigned Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- -UnsignedIntegerUpdater (UINT32 -> Long) 0 0 0 5141.0 0.2 1.0X -UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 16 17 0 63.8 15.7 0.0X +UnsignedIntegerUpdater (UINT32 -> Long) 0 0 0 5112.2 0.2 1.0X +UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 16 17 0 63.9 15.7 0.0X ================================================================================================ @@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Decimal Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -IntegerToDecimalUpdater 0 0 0 10288.1 0.1 1.0X -LongToDecimalUpdater 0 0 0 3872.9 0.3 0.4X -FixedLenByteArrayToDecimalUpdater 21 21 1 50.2 19.9 0.0X +IntegerToDecimalUpdater 0 0 0 7750.4 0.1 1.0X +LongToDecimalUpdater 0 0 0 3866.5 0.3 0.5X +FixedLenByteArrayToDecimalUpdater 21 21 0 50.1 19.9 0.0X ================================================================================================ @@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor FixedLenByteArray Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -FixedLenByteArrayUpdater (len=16 -> Binary) 20 21 2 51.8 19.3 1.0X -FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 0 160.2 6.2 3.1X -FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8 8 0 133.3 7.5 2.6X +FixedLenByteArrayUpdater (len=16 -> Binary) 20 20 0 51.7 19.3 1.0X +FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 1 160.1 6.2 3.1X +FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8 8 0 133.2 7.5 2.6X diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt index aed60eaf5136e..e03dae8c072ac 100644 --- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt +++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt @@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Identity Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -BooleanUpdater 0 0 0 17126.9 0.1 1.0X -ByteUpdater (INT32 -> Byte) 0 0 0 3721.3 0.3 0.2X -ShortUpdater (INT32 -> Short) 1 1 0 1662.6 0.6 0.1X -IntegerUpdater 0 0 0 10216.0 0.1 0.6X -LongUpdater 0 0 0 5150.9 0.2 0.3X -FloatUpdater 0 0 0 10313.5 0.1 0.6X -DoubleUpdater 0 0 0 5147.6 0.2 0.3X -BinaryUpdater 16 16 0 66.4 15.1 0.0X +BooleanUpdater 0 0 0 17171.8 0.1 1.0X +ByteUpdater (INT32 -> Byte) 0 0 0 3679.8 0.3 0.2X +ShortUpdater (INT32 -> Short) 1 1 0 1662.3 0.6 0.1X +IntegerUpdater 0 0 0 10261.9 0.1 0.6X +LongUpdater 0 0 0 5130.7 0.2 0.3X +FloatUpdater 0 0 0 10255.9 0.1 0.6X +DoubleUpdater 0 0 0 5127.2 0.2 0.3X +BinaryUpdater 15 16 0 67.7 14.8 0.0X ================================================================================================ @@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Type-converting Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -IntegerToLongUpdater 0 0 0 5428.8 0.2 1.0X -IntegerToDoubleUpdater 0 0 0 5132.3 0.2 0.9X -FloatToDoubleUpdater 0 0 0 3199.4 0.3 0.6X -DateToTimestampNTZUpdater 3 3 1 378.3 2.6 0.1X -DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0 0 6548.3 0.2 1.2X +IntegerToLongUpdater 0 0 0 6438.7 0.2 1.0X +IntegerToDoubleUpdater 0 0 0 6441.2 0.2 1.0X +FloatToDoubleUpdater 0 0 0 3199.5 0.3 0.5X +DateToTimestampNTZUpdater 1 1 0 884.9 1.1 0.1X +DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0 0 6713.8 0.1 1.0X ================================================================================================ @@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Rebase Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------- -IntegerWithRebaseUpdater (DATE legacy) 0 0 0 3097.0 0.3 1.0X -LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0 0 2286.8 0.4 0.7X +IntegerWithRebaseUpdater (DATE legacy) 0 0 0 3664.5 0.3 1.0X +LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0 0 2668.7 0.4 0.7X LongAsMicrosUpdater (TIMESTAMP_MILLIS) 3 3 0 371.3 2.7 0.1X @@ -52,7 +52,7 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Unsigned Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- -UnsignedIntegerUpdater (UINT32 -> Long) 0 0 0 5119.0 0.2 1.0X +UnsignedIntegerUpdater (UINT32 -> Long) 0 0 0 6183.9 0.2 1.0X UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 17 0 60.4 16.6 0.0X @@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor Decimal Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -IntegerToDecimalUpdater 0 0 0 7753.9 0.1 1.0X -LongToDecimalUpdater 0 0 0 3880.7 0.3 0.5X -FixedLenByteArrayToDecimalUpdater 21 21 0 50.9 19.6 0.0X +IntegerToDecimalUpdater 0 0 0 10268.1 0.1 1.0X +LongToDecimalUpdater 0 0 0 5122.2 0.2 0.5X +FixedLenByteArrayToDecimalUpdater 21 21 0 50.9 19.7 0.0X ================================================================================================ @@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1013-azure AMD EPYC 7763 64-Core Processor FixedLenByteArray Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -FixedLenByteArrayUpdater (len=16 -> Binary) 21 21 1 50.5 19.8 1.0X -FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 0 152.7 6.5 3.0X -FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8 8 0 127.8 7.8 2.5X +FixedLenByteArrayUpdater (len=16 -> Binary) 21 21 1 50.3 19.9 1.0X +FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 0 152.7 6.6 3.0X +FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8 8 0 127.7 7.8 2.5X diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt index a0ad4843991b1..828e685788773 100644 --- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt +++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt @@ -3,17 +3,17 @@ Identity Updaters ================================================================================================ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure -AMD EPYC 9V74 80-Core Processor +AMD EPYC 7763 64-Core Processor Identity Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -BooleanUpdater 0 0 0 14742.5 0.1 1.0X -ByteUpdater (INT32 -> Byte) 0 0 0 3584.0 0.3 0.2X -ShortUpdater (INT32 -> Short) 1 1 0 1824.8 0.5 0.1X -IntegerUpdater 0 0 0 8346.1 0.1 0.6X -LongUpdater 0 0 0 4103.9 0.2 0.3X -FloatUpdater 0 0 0 8215.2 0.1 0.6X -DoubleUpdater 0 0 0 4141.1 0.2 0.3X -BinaryUpdater 18 18 0 58.9 17.0 0.0X +BooleanUpdater 0 0 0 14640.0 0.1 1.0X +ByteUpdater (INT32 -> Byte) 0 0 0 3686.8 0.3 0.3X +ShortUpdater (INT32 -> Short) 1 1 0 2054.1 0.5 0.1X +IntegerUpdater 0 0 0 7759.1 0.1 0.5X +LongUpdater 0 0 0 3876.1 0.3 0.3X +FloatUpdater 0 0 0 7762.5 0.1 0.5X +DoubleUpdater 0 0 0 5123.2 0.2 0.3X +BinaryUpdater 15 15 0 70.1 14.3 0.0X ================================================================================================ @@ -21,14 +21,14 @@ Type-converting Updaters ================================================================================================ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure -AMD EPYC 9V74 80-Core Processor +AMD EPYC 7763 64-Core Processor Type-converting Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -IntegerToLongUpdater 1 1 0 1129.8 0.9 1.0X -IntegerToDoubleUpdater 1 1 0 1365.8 0.7 1.2X -FloatToDoubleUpdater 1 1 0 1284.3 0.8 1.1X -DateToTimestampNTZUpdater 3 3 0 357.5 2.8 0.3X -DowncastLongUpdater (INT64 -> Decimal(9,2)) 1 1 0 1136.5 0.9 1.0X +IntegerToLongUpdater 1 1 0 1281.0 0.8 1.0X +IntegerToDoubleUpdater 1 1 0 1550.0 0.6 1.2X +FloatToDoubleUpdater 1 1 0 1419.0 0.7 1.1X +DateToTimestampNTZUpdater 2 2 0 605.2 1.7 0.5X +DowncastLongUpdater (INT64 -> Decimal(9,2)) 1 1 0 1285.1 0.8 1.0X ================================================================================================ @@ -36,12 +36,12 @@ Rebase Updaters ================================================================================================ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure -AMD EPYC 9V74 80-Core Processor +AMD EPYC 7763 64-Core Processor Rebase Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------- -IntegerWithRebaseUpdater (DATE legacy) 0 0 0 2180.9 0.5 1.0X -LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1 0 1744.3 0.6 0.8X -LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3 0 421.0 2.4 0.2X +IntegerWithRebaseUpdater (DATE legacy) 0 0 0 2662.8 0.4 1.0X +LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1 0 2084.1 0.5 0.8X +LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2 0 454.8 2.2 0.2X ================================================================================================ @@ -49,11 +49,11 @@ Unsigned Updaters ================================================================================================ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure -AMD EPYC 9V74 80-Core Processor +AMD EPYC 7763 64-Core Processor Unsigned Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- -UnsignedIntegerUpdater (UINT32 -> Long) 1 1 0 965.9 1.0 1.0X -UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18 0 58.6 17.1 0.1X +UnsignedIntegerUpdater (UINT32 -> Long) 1 1 0 1094.1 0.9 1.0X +UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 17 0 61.0 16.4 0.1X ================================================================================================ @@ -61,12 +61,12 @@ Decimal Updaters ================================================================================================ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure -AMD EPYC 9V74 80-Core Processor +AMD EPYC 7763 64-Core Processor Decimal Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -IntegerToDecimalUpdater 0 0 0 8299.8 0.1 1.0X -LongToDecimalUpdater 0 0 0 4106.5 0.2 0.5X -FixedLenByteArrayToDecimalUpdater 24 24 1 43.8 22.8 0.0X +IntegerToDecimalUpdater 0 0 0 10261.0 0.1 1.0X +LongToDecimalUpdater 0 0 0 5118.9 0.2 0.5X +FixedLenByteArrayToDecimalUpdater 21 21 0 51.0 19.6 0.0X ================================================================================================ @@ -74,11 +74,11 @@ FixedLenByteArray Updaters ================================================================================================ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure -AMD EPYC 9V74 80-Core Processor +AMD EPYC 7763 64-Core Processor FixedLenByteArray Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -FixedLenByteArrayUpdater (len=16 -> Binary) 22 23 0 47.0 21.3 1.0X -FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 6 6 1 166.4 6.0 3.5X -FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8 9 1 125.1 8.0 2.7X +FixedLenByteArrayUpdater (len=16 -> Binary) 19 19 0 55.3 18.1 1.0X +FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 0 160.2 6.2 2.9X +FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9 9 0 123.3 8.1 2.2X diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 76cedbbef3b44..c088f5f2844be 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -429,9 +429,7 @@ public void readValues( int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - for (int i = 0; i < total; ++i) { - readValue(offset + i, values, valuesReader); - } + valuesReader.readIntegersAsTimestampMicros(total, values, offset); } @Override diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 9249fab7915ca..23207e7db3570 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.time.ZoneOffset; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; @@ -26,6 +27,7 @@ import org.apache.parquet.io.ParquetDecodingException; import org.apache.spark.SparkUnsupportedOperationException; +import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.datasources.DataSourceUtils; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -204,6 +206,19 @@ public final void readLongsAsInts(int total, WritableColumnVector c, int rowId) } } + @Override + public final void readIntegersAsTimestampMicros( + int total, WritableColumnVector c, int rowId) { + int requiredBytes = total * 4; + ByteBuffer buffer = getBuffer(requiredBytes); + // Per-element conversion calls into `DateTimeUtils.daysToMicros`, which is `days * + // MICROS_PER_DAY` for UTC plus an overflow check via `Math.multiplyExact`. No + // `hasArray` bulk-copy path because source and target have different widths. + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, DateTimeUtils.daysToMicros(buffer.getInt(), ZoneOffset.UTC)); + } + } + // A fork of `readIntegers` to rebase the date values. For performance reasons, this method // iterates the values twice: check if we need to rebase first, then go to the optimized branch // if rebase is not needed. diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index c62f7bcec8c35..bf6a1c6a03886 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.nio.ByteBuffer; +import java.time.ZoneOffset; +import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.parquet.io.api.Binary; @@ -136,6 +138,32 @@ default void readLongsAsInts(int total, WritableColumnVector c, int rowId) { } } + /** + * Reads {@code total} INT32 date-day values (days since 1970-01-01, Proleptic Gregorian), + * converts each to TimestampNTZ micros at UTC via + * {@link DateTimeUtils#daysToMicros(int, java.time.ZoneId)}, and writes them into + * {@code c} starting at {@code c[rowId]}. Used by the type-converting updater that + * reads parquet INT32 DATE columns into Spark {@code TimestampNTZType} targets in + * {@code CORRECTED} datetime-rebase mode. The {@code LEGACY}/{@code EXCEPTION} rebase + * variants are out of scope for this method. + * + *

The default implementation is a per-row loop that calls + * {@code DateTimeUtils.daysToMicros} per element; it is algorithmically equivalent to + * the legacy per-row Updater path but the per-element conversion call dominates the + * loop, so the speedup from overriding this method is more modest than for the pure + * primitive-cast siblings ({@link #readIntegersAsLongs}, {@link #readIntegersAsDoubles}). + * Subclasses backed by contiguous bulk storage (e.g. PLAIN encoding via + * {@link VectorizedPlainValuesReader}) should override to read source bytes once and run + * a tight in-method conversion loop, avoiding {@code total} virtual dispatches on + * {@link #readInteger()}. Readers without an override preserve correctness but gain no + * speedup. + */ + default void readIntegersAsTimestampMicros(int total, WritableColumnVector c, int rowId) { + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, DateTimeUtils.daysToMicros(readInteger(), ZoneOffset.UTC)); + } + } + void readBinary(int total, WritableColumnVector c, int rowId); void readGeometry(int total, WritableColumnVector c, int rowId); void readGeography(int total, WritableColumnVector c, int rowId); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index c255221614439..0fc32b15d833f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -2073,6 +2073,69 @@ class ParquetIOSuite extends ParquetTest with SharedSparkSession { } } + test("DATE -> TimestampNTZ widening end-to-end via vectorized read path") { + // Round-trips a DATE Parquet file read back as TimestampNTZType, exercising + // DateToTimestampNTZUpdater on the vectorized path in CORRECTED rebase mode. + // Sample mixes epoch, recent dates, pre-epoch, far-past and far-future days; each + // day-count converts to UTC midnight via `DateTimeUtils.daysToMicros`. Both + // REQUIRED and OPTIONAL columns are covered so that `readValue` and `readValues` + // are both invoked. + withSQLConf( + SQLConf.PARQUET_REBASE_MODE_IN_READ.key -> "CORRECTED", + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { + withTempPath { file => + val n = 5000 + def sampleAt(i: Int): java.sql.Date = i % 6 match { + case 0 => java.sql.Date.valueOf("1970-01-01") + case 1 => java.sql.Date.valueOf("1900-01-01") + case 2 => java.sql.Date.valueOf("2023-01-01") + case 3 => java.sql.Date.valueOf("0001-01-01") + case 4 => java.sql.Date.valueOf("4707-11-28") + case _ => + // Cover every month (1..12) and a few years to exercise month-boundary + // behavior, not just months 1..9 the previous string-formatting allowed. + val year = 2020 + (i % 80) + val month = 1 + (i % 12) + java.sql.Date.valueOf(f"$year%04d-$month%02d-15") + } + + val nonNullData = (0 until n).map(i => Row(sampleAt(i))) + // Every 7th row is null; mixes value runs and null runs at sub-batch lengths. + val nullableData = (0 until n).map { i => + if (i % 7 == 0) Row(null) else Row(sampleAt(i)) + } + + val nonNullWriteSchema = new StructType().add("v", DateType, nullable = false) + val nonNullReadSchema = new StructType().add("v", TimestampNTZType, nullable = false) + val nullableWriteSchema = new StructType().add("v", DateType, nullable = true) + val nullableReadSchema = new StructType().add("v", TimestampNTZType, nullable = true) + + val nonNullPath = new java.io.File(file, "nonnull").getCanonicalPath + val nullablePath = new java.io.File(file, "nullable").getCanonicalPath + spark.createDataFrame(spark.sparkContext.parallelize(nonNullData, 4), nonNullWriteSchema) + .write.parquet(nonNullPath) + spark.createDataFrame( + spark.sparkContext.parallelize(nullableData, 4), nullableWriteSchema) + .write.parquet(nullablePath) + + def dateToNtz(d: java.sql.Date): java.time.LocalDateTime = { + val days = d.toLocalDate.toEpochDay.toInt + val micros = DateTimeUtils.daysToMicros(days, java.time.ZoneOffset.UTC) + DateTimeUtils.microsToLocalDateTime(micros) + } + val expectedNonNull = nonNullData.map(r => Row(dateToNtz(r.getDate(0)))) + val expectedNullable = nullableData.map { r => + if (r.isNullAt(0)) Row(null) else Row(dateToNtz(r.getDate(0))) + } + + withAllParquetReaders { + checkAnswer(spark.read.schema(nonNullReadSchema).parquet(nonNullPath), expectedNonNull) + checkAnswer(spark.read.schema(nullableReadSchema).parquet(nullablePath), expectedNullable) + } + } + } + } + test("SPARK-56872: INT64 DECIMAL into 32-bit Decimal column with dictionary fallback") { // `DowncastLongUpdater.decodeSingleDictionaryId` only runs when the vectorized reader has // to eagerly drain buffered dictionary IDs, which happens when parquet-mr writes the diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala index 65a4d90750b8f..13e9376fb6fac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.types._ @@ -37,6 +38,7 @@ import org.apache.spark.sql.types._ * - `IntegerToDoubleUpdater` (INT32 -> Double) * - `FloatToDoubleUpdater` (FLOAT -> Double) * - `DowncastLongUpdater` (INT64 DECIMAL -> 32-bit Decimal via narrowing cast) + * - `DateToTimestampNTZUpdater` (INT32 DATE -> TimestampNTZ micros at UTC) * * Covers boundary batch lengths, sign-extension on negative INT32 values, the singular * `readValue` path, and the factory's long-decimal dispatch @@ -408,4 +410,87 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite { val actual = readViaDowncastLongUpdater(desc, targetType, input) assert(actual === Array[Int](-999_999_999, -1, 0, 1, 999_999_999)) } + + // ---- DateToTimestampNTZUpdater: INT32 DATE -> TimestampNTZ micros at UTC ---- + + // INT32 column descriptor annotated as DATE; routes the factory through the + // `sparkType == TimestampNTZType && isDateTypeMatched(descriptor)` branch. + private val int32DateDescriptor: ColumnDescriptor = { + val pt = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.dateType()) + .named("col") + new ColumnDescriptor(Array("col"), pt, 0, 1) + } + + // Reads `values.length` INT32 day-counts through `DateToTimestampNTZUpdater.readValues` + // and returns the resulting micros column. + private def readViaDateToTimestampNTZUpdater(values: Array[Int]): Array[Long] = { + val fac = newFactory(int32DateDescriptor) + val updater = fac.getUpdater(int32DateDescriptor, DataTypes.TimestampNTZType) + val out = new OnHeapColumnVector(values.length.max(1), DataTypes.TimestampNTZType) + val reader = newPlainReader(plainIntBytes(values), values.length) + updater.readValues(values.length, 0, out, reader) + val result = new Array[Long](values.length) + var i = 0 + while (i < values.length) { result(i) = out.getLong(i); i += 1 } + result + } + + // Realistic day-count sample: epoch, recent dates, pre-epoch, far-past, far-future. All + // values are well within `Math.multiplyExact(days * 86400, 1_000_000)`'s safe range. + private def dateSampleValues(n: Int): Array[Int] = { + val out = new Array[Int](n) + var i = 0 + while (i < n) { + out(i) = i % 6 match { + case 0 => 0 // 1970-01-01 (epoch) + case 1 => -25567 // ~1900-01-01 + case 2 => 19366 // ~2023-01-01 + case 3 => -719162 // ~0001-01-01 + case 4 => 1000000 // ~4707-11-28 (far future, still safe) + case _ => i * 37 - 100 + } + i += 1 + } + out + } + + private def expectedMicros(values: Array[Int]): Array[Long] = + values.map(d => DateTimeUtils.daysToMicros(d, ZoneOffset.UTC)) + + for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) { + test(s"DateToTimestampNTZUpdater produces correct UTC micros (total=$n)") { + val input = dateSampleValues(n) + assert(readViaDateToTimestampNTZUpdater(input) === expectedMicros(input)) + } + } + + test("DateToTimestampNTZUpdater: readValue converts a single date-day to UTC micros") { + // Same rationale as the IntegerToLongUpdater readValue test: the def-level-decoder's + // run-of-1 path calls `readInteger()` directly rather than the bulk method. + val input = Array(0, 1, -1, 19366, -25567) + val fac = newFactory(int32DateDescriptor) + val updater = fac.getUpdater(int32DateDescriptor, DataTypes.TimestampNTZType) + val out = new OnHeapColumnVector(input.length, DataTypes.TimestampNTZType) + val reader = newPlainReader(plainIntBytes(input), input.length) + var i = 0 + while (i < input.length) { + updater.readValue(i, out, reader) + i += 1 + } + val actual = (0 until input.length).map(out.getLong).toArray + assert(actual === input.map(d => DateTimeUtils.daysToMicros(d, ZoneOffset.UTC))) + } + + test("DateToTimestampNTZUpdater: epoch and signed day-counts produce expected micros") { + // Pins reference micros for a handful of well-known dates so the conversion semantics + // are visible at unit-test level without relying on DateTimeUtils for the expected + // side. + val input = Array(0, 1, -1) + val expected = Array( + 0L, // 1970-01-01 00:00:00 UTC + 86_400_000_000L, // 1970-01-02 00:00:00 UTC + -86_400_000_000L) // 1969-12-31 00:00:00 UTC + assert(readViaDateToTimestampNTZUpdater(input) === expected) + } }