diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 1882d990bef5..da52cdf5c835 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -619,7 +619,8 @@ protected void reserveInternal(int newCapacity) { this.data = Platform.reallocateMemory(data, oldCapacity * 4L, newCapacity * 4L); } else if (type instanceof LongType || type instanceof DoubleType || DecimalType.is64BitDecimalType(type) || type instanceof TimestampType || - type instanceof TimestampNTZType || type instanceof DayTimeIntervalType) { + type instanceof TimestampNTZType || type instanceof DayTimeIntervalType || + type instanceof TimeType) { this.data = Platform.reallocateMemory(data, oldCapacity * 8L, newCapacity * 8L); } else if (childColumns != null) { // Nothing to store. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala index ee283386b8ef..ae07255f01ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.time.{Duration, Period} +import java.time.{Duration, LocalTime, Period} import org.apache.hadoop.fs.{FileSystem, Path} @@ -34,6 +34,8 @@ abstract class ParquetFileFormatSuite with SharedSparkSession with CommonFileDataSourceSuite { + import testImplicits._ + override protected def dataSourceFormat = "parquet" test("read parquet footers in parallel") { @@ -129,6 +131,20 @@ abstract class ParquetFileFormatSuite } } } + + test("Write and read back TIME values") { + Seq(false, true).foreach { offHeapEnabled => + withSQLConf(SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offHeapEnabled.toString) { + withTempPath { dir => + val data = Seq(LocalTime.parse("01:12:30.999999")).toDF("col") + data.write.parquet(dir.getCanonicalPath) + val readback = spark.read.parquet(dir.getCanonicalPath) + assertResult(readback.schema) { new StructType().add("col", TimeType()) } + checkAnswer(readback, data) + } + } + } + } } class ParquetFileFormatV1Suite extends ParquetFileFormatSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 7a382f858683..87a8f10010dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1628,16 +1628,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } } - - test("Write TimeType") { - withTempPath { dir => - val data = Seq(LocalTime.parse("01:12:30.999999")).toDF("col") - data.write.parquet(dir.getCanonicalPath) - val readback = spark.read.parquet(dir.getCanonicalPath) - assertResult(readback.schema) { new StructType().add("col", TimeType()) } - checkAnswer(readback, data) - } - } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)