From 23565a534ed87755c7ceb7bc497f0ea081d2771d Mon Sep 17 00:00:00 2001 From: jameswillis Date: Mon, 11 May 2026 10:23:10 -0700 Subject: [PATCH] [SPARK-55897][SQL] Handle UserDefinedType in ColumnarRow, ColumnarBatchRow, and ColumnarArray get() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? `ColumnarRow.get()`, `ColumnarBatchRow.get()`, and `ColumnarArray.get()` throw `SparkUnsupportedOperationException` when called with a `UserDefinedType` because they have no branch to handle UDTs. This PR adds UDT handling to all three methods: - **ColumnarRow** and **ColumnarBatchRow**: Add an `instanceof UserDefinedType` branch that recurses with `udt.sqlType()`, matching the pattern already used in `SpecializedGettersReader.read()`. - **ColumnarArray**: Change the `handleUserDefinedType` flag from `false` to `true` in the existing call to `SpecializedGettersReader.read()`. ### Why are the changes needed? The codegen path (`CodeGenerator.getValue()`) unwraps `udt.sqlType()` before generating accessor calls, so UDT columns work when whole-stage codegen is active. However, on the interpreted eval path — when codegen is disabled, falls back, or the number of fields exceeds `spark.sql.codegen.maxFields` — `GetStructField.nullSafeEval` calls `ColumnarRow.get(ordinal, udtType)` directly, which hits the unhandled branch and throws. ### Does this PR introduce _any_ user-facing change? Yes. UDT columns in columnar data sources (e.g., Parquet) now work correctly on the interpreted evaluation path. Previously they would throw `SparkUnsupportedOperationException`. ### How was this patch tested? Added 6 new tests in `ColumnarBatchSuite` covering all 3 methods × 2 UDT backing types (primitive `IntegerType` and complex `StructType`). Each test creates columnar vectors with UDT data and verifies that `get()` returns the correct value. Two helper UDT classes (`TestIntUDT`, `TestStructWrapperUDT`) are defined for the tests. ### Was this patch authored or co-authored using generative AI tooling? Yes. Opus 4.6 Closes #54701 from james-willis/columnar-row-udt-test. Authored-by: jameswillis Signed-off-by: Huaxin Gao (cherry picked from commit 472735cefef51159b0ab332fa12705c67e146d42) --- .../spark/sql/vectorized/ColumnarArray.java | 2 +- .../sql/vectorized/ColumnarBatchRow.java | 2 + .../spark/sql/vectorized/ColumnarRow.java | 2 + .../vectorized/ColumnarBatchSuite.scala | 121 ++++++++++++++++++ 4 files changed, 126 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java index fad1817aca19..861a6a4c50e4 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java @@ -213,7 +213,7 @@ public ColumnarMap getMap(int ordinal) { @Override public Object get(int ordinal, DataType dataType) { - return SpecializedGettersReader.read(this, ordinal, dataType, false, false); + return SpecializedGettersReader.read(this, ordinal, dataType, false, true); } @Override diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java index 3d1e780f6e05..42b335dfd2bc 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java @@ -215,6 +215,8 @@ public Object get(int ordinal, DataType dataType) { return getMap(ordinal); } else if (dataType instanceof VariantType) { return getVariant(ordinal); + } else if (dataType instanceof UserDefinedType udt) { + return get(ordinal, udt.sqlType()); } else { throw new SparkUnsupportedOperationException( "_LEGACY_ERROR_TEMP_3152", Map.of("dataType", String.valueOf(dataType))); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java index 656c5f8a8f30..d66baa8fd8fe 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java @@ -217,6 +217,8 @@ public Object get(int ordinal, DataType dataType) { return getMap(ordinal); } else if (dataType instanceof VariantType) { return getVariant(ordinal); + } else if (dataType instanceof UserDefinedType udt) { + return get(ordinal, udt.sqlType()); } else { throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3155"); } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 6d90bb985e26..93b3ea67d6bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -48,6 +48,38 @@ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal} import org.apache.spark.util.ArrayImplicits._ +/** + * A minimal UDT backed by IntegerType, used by SPARK-55897 tests. + */ +@SQLUserDefinedType(udt = classOf[TestIntUDT]) +private case class TestIntWrapper(value: Int) + +private class TestIntUDT extends UserDefinedType[TestIntWrapper] { + override def sqlType: DataType = IntegerType + override def serialize(obj: TestIntWrapper): Any = obj.value + override def userClass: Class[TestIntWrapper] = classOf[TestIntWrapper] + override def deserialize(datum: Any): TestIntWrapper = datum match { + case v: Int => TestIntWrapper(v) + } +} + +/** + * A minimal UDT backed by StructType, used by SPARK-55897 tests. + */ +@SQLUserDefinedType(udt = classOf[TestStructWrapperUDT]) +private case class TestStructWrapper(x: Int, y: Long) + +private class TestStructWrapperUDT extends UserDefinedType[TestStructWrapper] { + override def sqlType: DataType = new StructType() + .add("x", IntegerType) + .add("y", LongType) + override def serialize(obj: TestStructWrapper): Any = InternalRow(obj.x, obj.y) + override def userClass: Class[TestStructWrapper] = classOf[TestStructWrapper] + override def deserialize(datum: Any): TestStructWrapper = datum match { + case row: InternalRow => TestStructWrapper(row.getInt(0), row.getLong(1)) + } +} + @ExtendedSQLTest class ColumnarBatchSuite extends SparkFunSuite { @@ -2060,4 +2092,93 @@ class ColumnarBatchSuite extends SparkFunSuite { } } } + + testVector( + "SPARK-55897: ColumnarRow.get with primitive-backed UDT", + 10, + new StructType().add("name", StringType).add("udt_field", IntegerType)) { column => + column.getChild(0).putByteArray(0, "hello".getBytes) + column.getChild(1).putInt(0, 42) + + val row = column.getStruct(0) + assert(row.get(1, new TestIntUDT()) === 42) + } + + testVector( + "SPARK-55897: ColumnarRow.get with struct-backed UDT", + 10, + new StructType() + .add("id", IntegerType) + .add("nested", new StructType().add("x", IntegerType).add("y", LongType))) { column => + column.getChild(0).putInt(0, 1) + column.getChild(1).getChild(0).putInt(0, 10) + column.getChild(1).getChild(1).putLong(0, 20L) + + val row = column.getStruct(0) + val nested = row.get(1, new TestStructWrapperUDT()).asInstanceOf[InternalRow] + assert(nested.getInt(0) === 10) + assert(nested.getLong(1) === 20L) + } + + testVector( + "SPARK-55897: ColumnarArray.get with primitive-backed UDT", + 10, + new ArrayType(IntegerType, false)) { column => + val data = column.arrayData() + data.putInt(0, 10) + data.putInt(1, 20) + column.putArray(0, 0, 2) + + val arr = column.getArray(0) + assert(arr.get(0, new TestIntUDT()) === 10) + assert(arr.get(1, new TestIntUDT()) === 20) + } + + testVector( + "SPARK-55897: ColumnarArray.get with struct-backed UDT", + 10, + new ArrayType(new StructType().add("x", IntegerType).add("y", LongType), false)) { column => + val data = column.arrayData() + data.getChild(0).putInt(0, 100) + data.getChild(1).putLong(0, 200L) + column.putArray(0, 0, 1) + + val arr = column.getArray(0) + val row = arr.get(0, new TestStructWrapperUDT()).asInstanceOf[InternalRow] + assert(row.getInt(0) === 100) + assert(row.getLong(1) === 200L) + } + + test("SPARK-55897: ColumnarBatchRow.get with primitive-backed UDT") { + Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { memMode => + val col = allocate(10, IntegerType, memMode) + try { + col.putInt(0, 99) + val batchRow = new ColumnarBatchRow(Array(col)) + batchRow.rowId = 0 + assert(batchRow.get(0, new TestIntUDT()) === 99) + } finally { + col.close() + } + } + } + + test("SPARK-55897: ColumnarBatchRow.get with struct-backed UDT") { + Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { memMode => + val col = allocate(10, + new StructType().add("x", IntegerType).add("y", LongType), memMode) + try { + col.getChild(0).putInt(0, 5) + col.getChild(1).putLong(0, 15L) + val batchRow = new ColumnarBatchRow(Array(col)) + batchRow.rowId = 0 + + val row = batchRow.get(0, new TestStructWrapperUDT()).asInstanceOf[InternalRow] + assert(row.getInt(0) === 5) + assert(row.getLong(1) === 15L) + } finally { + col.close() + } + } + } }