Skip to content

native_datafusion: silent wrong-answer paths for primitive Parquet type conversions Spark rejects #4297

@andygrove

Description

@andygrove

Description

native_datafusion silently accepts a number of primitive-to-primitive Parquet conversions that Spark's vectorized reader rejects on every supported version (Spark's ParquetVectorUpdaterFactory.getUpdater falls through to constructConvertNotSupportedException). The result is wrong answers (silent overflow on narrowing, silent precision loss on widening, raw int-as-epoch-seconds reinterpretation for int -> timestamp) rather than the SchemaColumnConvertNotSupportedException Spark would throw.

This is the same class of gap as #3720 (which focuses on STRING -> INT and decimal precision narrowing), but for primitive-to-primitive numeric and date/timestamp conversions.

Context

#4229 closes the three widening cases gated by spark.comet.schemaEvolution.enabled (INT32 -> INT64, FLOAT -> DOUBLE, INT32 -> DOUBLE) by mirroring the Comet-specific branches in TypeUtil.checkParquetType. The cases listed below are not gated by that flag in Spark — they throw unconditionally on the JVM vectorized reader — so they remain silently wrong under both schemaEvolution.enabled=true and false after #4229 lands.

From Spark 4.0.2's ParquetTypeWideningSuite expectError = true list: Long -> Int, Double -> Float, Float -> Long, Long -> Double, Int -> Float, Int -> TimestampType, Date -> TimestampType.

Probe results on this PR (Spark 3.5, COMET_NATIVE_SCAN_IMPL=native_datafusion)

Credit: probe authored by @mbutrovich in #4229 (comment).

Case Written Spark ref behavior schemaEvolution=false schemaEvolution=true
long -> int (narrowing) [1, 2, 3, 2147483652] throws [1, 2, 3, -2147483644] [1, 2, 3, -2147483644]
double -> float (narrowing) [1.5, 2.5, 1e40] throws [1.5, 2.5, Infinity] [1.5, 2.5, Infinity]
float -> long [1.5, 2.5] throws [1, 2] (truncated) [1, 2] (truncated)
long -> double [1, 2, 2^54+1] throws [1.0, 2.0, 1.8014398509481984E16] (lost +1) same
int -> float [1, 2, 2^25+1] throws [1.0, 2.0, 3.3554432E7] (lost +1) same
int -> timestamp [1, 2, 3] throws [1969-12-31 16:00:01 ... 03] (PST, int-as-seconds) same
double -> long [1.0, 2.0, 3.0] throws [1, 2, 3] [1, 2, 3]

Suggested approach

Per the PR discussion, the cleanest fix is to invert the check in replace_with_spark_cast to an allowlist of Spark-supported (physical, target) pairs (mirroring the accept cases in Spark's ParquetVectorUpdaterFactory.getUpdater per Spark version), so anything else raises SparkError::ParquetSchemaConvert. This closes the whole category at once and avoids continuing to enumerate denylist cases.

Probe used

package org.apache.comet.parquet

import scala.util.Try
import org.apache.spark.sql.{CometTestBase, DataFrame}
import org.apache.spark.sql.internal.SQLConf
import org.apache.comet.CometConf

class TypePromotionProbeSuite extends CometTestBase {
  import testImplicits._

  private def probe(label: String)(body: => Any): Unit = {
    val result = Try(body)
    // scalastyle:off println
    println(s"[PROBE] $label -> ${result match {
        case scala.util.Success(v) => s"OK value=$v"
        case scala.util.Failure(e) => s"THROW ${e.getClass.getSimpleName}"
      }}")
    // scalastyle:on println
  }

  private def runAll(ev: Boolean): Unit = withSQLConf(
    CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
    CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> ev.toString,
    SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
    def run(label: String, df: DataFrame, writeType: String, readAs: String): Unit =
      probe(s"$label (ev=$ev)") {
        withTempPath { dir =>
          df.selectExpr(s"cast(c as $writeType) as c").write.parquet(dir.getCanonicalPath)
          spark.read.schema(s"c $readAs").parquet(dir.getCanonicalPath)
            .collect().map(_.get(0)).toSeq
        }
      }
    run("int->long",              Seq(1, 2, 3).toDF("c"),                              "int",    "bigint")
    run("float->double",          Seq(1.0f, 2.0f, 3.0f).toDF("c"),                     "float",  "double")
    run("int->double",            Seq(1, 2, 3).toDF("c"),                              "int",    "double")
    run("long->int narrowing",    Seq(1L, 2L, 3L, Int.MaxValue.toLong + 5L).toDF("c"), "bigint", "int")
    run("double->float narrowing",Seq(1.5, 2.5, 1e40).toDF("c"),                       "double", "float")
    run("float->long",            Seq(1.5f, 2.5f).toDF("c"),                           "float",  "bigint")
    run("long->double",           Seq(1L, 2L, (1L << 54) + 1L).toDF("c"),              "bigint", "double")
    run("int->float",             Seq(1, 2, (1 << 25) + 1).toDF("c"),                  "int",    "float")
    run("int->timestamp",         Seq(1, 2, 3).toDF("c"),                              "int",    "timestamp")
    run("double->long",           Seq(1.0, 2.0, 3.0).toDF("c"),                        "double", "bigint")
  }

  test("probe ev=false") { runAll(ev = false) }
  test("probe ev=true")  { runAll(ev = true) }
}

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:scanParquet scan / data readingbugSomething isn't workingnative_datafusionSpecific to native_datafusion scan typerequires-triage

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions