[VL] Add RDDScanExec support to Velox backend#12077
Conversation
0546b9d to
b70bf18
Compare
| override val outputPartitioning: Partitioning, | ||
| override val outputOrdering: Seq[SortOrder] | ||
| ) extends RDDScanTransformer(outputAttributes, outputPartitioning, outputOrdering) { | ||
|
|
There was a problem hiding this comment.
PR description contradicts validation logic for complex types
Problem: The PR description states "rejects complex types (ARRAY, MAP, STRUCT)" but doValidateInternal() explicitly accepts these types. The code is correct — Velox does support complex types via UnsafeRowFast::deserialize. The PR description should be updated to avoid misleading reviewers.
Evidence:
case _: org.apache.spark.sql.types.ArrayType =>
case _: org.apache.spark.sql.types.MapType =>
case _: org.apache.spark.sql.types.StructType =>These cases fall through to ValidationResult.succeeded, meaning complex types are accepted.
Suggested Fix: Update the PR description to remove the claim that complex types are rejected, e.g.:
Supports all Velox-compatible types including complex types (Array, Map, Struct). Rejects only truly unsupported types (e.g., CalendarIntervalType) with clean fallback to vanilla Spark.
There was a problem hiding this comment.
Good catch — updated the PR description. It now correctly states that complex types (Array, Map, Struct) are supported via the UnsafeRowFast::deserialize path, and only truly unsupported types trigger fallback
| rdd: RDD[InternalRow], | ||
| name: String, | ||
| override val outputPartitioning: Partitioning, | ||
| override val outputOrdering: Seq[SortOrder] |
There was a problem hiding this comment.
Validation does not recurse into complex type element types
Problem: The type allowlist checks top-level types only. An ArrayType(UnsupportedType) or MapType(StringType, UnsupportedType) would pass validation but could fail at native execution time. The CH backend avoids this by delegating to ConverterUtils.getTypeNode() which recursively validates.
Evidence:
case _: org.apache.spark.sql.types.ArrayType => // passes any ArrayType, no element check
case _: org.apache.spark.sql.types.MapType => // passes any MapType, no key/value check
case _: org.apache.spark.sql.types.StructType => // passes any StructType, no field checkSuggested Fix:
case a: org.apache.spark.sql.types.ArrayType =>
validateType(a.elementType)
case m: org.apache.spark.sql.types.MapType =>
validateType(m.keyType)
validateType(m.valueType)
case s: org.apache.spark.sql.types.StructType =>
s.fields.foreach(f => validateType(f.dataType))Alternatively, delegate to VeloxValidatorApi for centralized type validation.
There was a problem hiding this comment.
Thanks, this is a great point. Replaced the manual allowlist with VeloxValidatorApi.validateSchema which handles recursive validation for complex type elements and also catches variant shredded structs. This keeps validation logic centralized
| case org.apache.spark.sql.types.YearMonthIntervalType.DEFAULT => | ||
| case _: org.apache.spark.sql.types.NullType => | ||
| case dt | ||
| if !VeloxConfig.get.enableTimestampNtzValidation && |
There was a problem hiding this comment.
withNewChildrenInternal returns this instead of copy()
Problem: Returning this from a case class's withNewChildrenInternal breaks Spark's convention that tree transformations produce structurally new nodes. The CH backend returns copy(...) for the equivalent transformer. While this is functionally safe for a leaf node today, it's inconsistent with the project pattern.
Evidence:
// Velox (this PR):
override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
this
// CH backend (CHRDDScanTransformer):
override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering)Suggested Fix:
override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
copy()There was a problem hiding this comment.
Fixed — now uses copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering) consistent with CHRDDScanTransformer
| case _: org.apache.spark.sql.types.StringType => | ||
| case _: org.apache.spark.sql.types.TimestampType => | ||
| case _: org.apache.spark.sql.types.DateType => | ||
| case _: org.apache.spark.sql.types.BinaryType => |
There was a problem hiding this comment.
No SQLMetrics propagation — Spark UI won't show conversion metrics
Problem: The 4-param overload of toColumnarBatchIterator creates throwaway SQLMetric instances not attached to this plan's metrics map. As a result, numInputRows, numOutputBatches, and convertTime won't appear in the Spark UI for this operator, making production debugging harder.
Evidence:
// Current (4-param overload creates throwaway metrics):
RowToVeloxColumnarExec.toColumnarBatchIterator(iter, localSchema, batchSize, batchBytes)Suggested Fix: Define plan-level metrics and use the 7-param overload:
override lazy val metrics = Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert"))
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
val localSchema = this.schema
val batchSize = GlutenConfig.get.maxBatchSize
val batchBytes = VeloxConfig.get.veloxPreferredBatchBytes
rdd.mapPartitions { iter =>
RowToVeloxColumnarExec.toColumnarBatchIterator(
iter, localSchema, numInputRows, numOutputBatches, convertTime, batchSize, batchBytes)
}
}There was a problem hiding this comment.
Added numInputRows, numOutputBatches, and convertTime as plan-level metrics and switched to the 7-param toColumnarBatchIterator overload. These will now show up in the Spark UI
|
|
||
| checkAnswer(df, expectedAnswer) | ||
| } | ||
| } |
There was a problem hiding this comment.
Missing test coverage for complex types and unsupported-type fallback
Problem: The 7 tests cover primitives, nulls, empty RDD, and aggregation — but two important scenarios are untested:
- Complex types (ArrayType, MapType, StructType) — validation explicitly accepts them, but no test exercises the full row-to-columnar JNI path with nested data.
- Unsupported type fallback — no test verifies that a truly unsupported type (e.g.,
CalendarIntervalType) triggers graceful fallback to vanilla Spark instead of a runtime crash.
Suggested Fix: Add at least these two tests:
test("RDDScan with array type") {
val rdd = spark.sparkContext.parallelize(Seq(Row(Seq(1, 2, 3)), Row(Seq(4, 5))))
val schema = StructType(Seq(StructField("arr", ArrayType(IntegerType))))
val data = spark.createDataFrame(rdd, schema)
val expectedAnswer = data.collect()
val node = LogicalRDD.fromDataset(
rdd = data.queryExecution.toRdd, originDataset = data, isStreaming = false)
val df = ClassicDataset.ofRows(spark, node).toDF()
checkAnswer(df, expectedAnswer)
}
test("RDDScan falls back for unsupported types") {
// Create RDD with CalendarIntervalType or another unsupported type
// Verify plan does NOT contain VeloxRDDScanTransformer (i.e., fallback occurred)
}There was a problem hiding this comment.
Added 4 new tests: array type, map type, struct type, and unsupported-type fallback (DayTimeIntervalType → verifies VeloxRDDScanTransformer is absent from plan). Total coverage is now 11 tests.
d816859 to
89494df
Compare
| ValidationResult.succeeded | ||
| } | ||
|
|
||
| override def doExecuteColumnar(): RDD[ColumnarBatch] = { |
There was a problem hiding this comment.
RowToVeloxColumnarExec.toColumnarBatchIterator does UnsafeProjection.apply(row), which throws on a BatchCarrierRow since PlaceholderRow's getters all throw UnsupportedOperationException. This can show up via df.checkpoint() or user code that does df.queryExecution.toRdd and re-wraps with LogicalRDD.fromDataset, when the upstream Gluten plan ends in VeloxColumnarToCarrierRowExec. CHRDDScanTransformer.scala L101-104 detects this and unwraps via findNextTerminalRow.batch(). Either mirror that, or fail fast with a clear error for carrier rows and add a checkpoint round-trip test to document the current behavior.
There was a problem hiding this comment.
Great catch — this is a real bug. If the upstream RDD was produced by a Gluten plan ending in VeloxColumnarToCarrierRowExec (e.g., via df.checkpoint()), the rows would be BatchCarrierRow instances and UnsafeProjection.apply() would throw. Fixed by peeking at the first row and branching: carrier rows are unwrapped directly via BatchCarrierRow.unwrap(), skipping row-to-columnar conversion entirely. This mirrors the CH pattern.
| } | ||
| } | ||
|
|
||
| override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = |
There was a problem hiding this comment.
Leaf node — could assert(newChildren.isEmpty) here.
There was a problem hiding this comment.
Added assert(newChildren.isEmpty, "VeloxRDDScanTransformer is a leaf node").
| } | ||
|
|
||
| object VeloxRDDScanTransformer { | ||
| def replace(plan: org.apache.spark.sql.execution.RDDScanExec): RDDScanTransformer = |
There was a problem hiding this comment.
CH uses UnknownPartitioning(0); we pass plan.outputPartitioning through. If the original RDDScanExec declares e.g. HashPartitioning, downstream Velox ops might skip a shuffle based on a hint we never verified survives the row→columnar conversion. Worth either justifying with a comment or aligning with CH.
There was a problem hiding this comment.
Valid concern. Row-to-columnar conversion doesn't change data distribution — it converts row format within each partition, preserving the partition layout. This is consistent with RowToVeloxColumnarExec which also carries through the child's outputPartitioning. Added an inline comment explaining the rationale and the difference from CH's approach
| isStreaming = false) | ||
| val df = ClassicDataset.ofRows(spark, node).toDF() | ||
|
|
||
| checkAnswer(df, expectedAnswer) |
There was a problem hiding this comment.
This test — and the following empty RDD / multiple re-reads / null values / array / map / struct ones — only does checkAnswer. Without a collectFirst { case _: VeloxRDDScanTransformer => true } assertion they'd silently pass even if the rewriter stopped offloading (vanilla Spark also gets the right answer). Tests 1 and 2 already assert plan shape; please add the same here.
There was a problem hiding this comment.
You're right — tests 3–11 would silently pass even if offloading stopped working. Added collect { case _: VeloxRDDScanTransformer => true } assertions to all 8 tests that were missing them. The unsupported-type fallback test already asserts the absence of the transformer, so that one was fine as-is.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
89494df to
47ee93f
Compare
CONTEXT
RDDScanExecis Spark's physical plan node used when creating DataFrames from in-memory RDDsvia
sparkSession.createDataset(rdd)orcreateDataFrame(rdd, schema). Currently, when Glutenencounters this node with the Velox backend, it falls back to vanilla Spark's row-based execution.
The ClickHouse backend already supports this via
CHRDDScanTransformer.WHAT
This PR adds native Velox execution support for
RDDScanExecby implementing aVeloxRDDScanTransformer. The transformer convertsRDD[InternalRow]into Velox columnarbatches using the existing
RowToVeloxColumnarExecJNI infrastructure, so no new native codeis needed.
Key design decisions:
RowToVeloxColumnarExecfor the actual row-to-columnar conversion, keeping the implementation lean and consistent
with how Velox already handles row-based input.
VeloxValidatorApi.validateSchemafor recursive typevalidation. Supports all Velox-compatible types including complex types (Array, Map, Struct)
which are handled via the
UnsafeRowFast::deserializepath in the native converter. Rejectsonly truly unsupported types with clean fallback to vanilla Spark.
withNewChildrenInternalreturnscopy(...)consistent withCHRDDScanTransformer.
CHRDDScanTransformerin theClickHouse backend.
Changes
VeloxRDDScanTransformer.scala(new) — Columnar execution node wrappingRowToVeloxColumnarExecfor native row-to-columnar conversion.VeloxSparkPlanExecApi.scala(modified) — OverridesisSupportRDDScanExecandgetRDDScanTransformto wire up the new transformer.VeloxRDDScanSuite.scala(new) — 7 unit tests covering plan replacement, type coverage,aggregation, empty RDD, null values, idempotent reads, and all primitive types.
Test Results
All 7 unit tests passed on the internal CI pipeline (build 218528457):