Skip to content

TimedWithCodegenExec.doConsume drops row: ExprCode → NPE in CodegenFallback expressions (e.g. from_json) under whole-stage codegen #74

@eyaldar

Description

@eyaldar

Describe the bug

TimedWithCodegenExec.doConsume drops the row: ExprCode parameter when calling
consume(ctx, input). This causes ctx.INPUT_ROW to be null for any downstream
CodegenFallback expression that interpolates INPUT_ROW into its generated code,
producing an NPE inside Block.code interpolation:

java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "arg" is null
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotInterpolateClassIntoCodeBlockError(QueryExecutionErrors.scala:426)
    at org.apache.spark.sql.catalyst.expressions.codegen.Block$BlockHelper$.$anonfun$code$1(javaCode.scala:240)
    ...
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback.doGenCode(CodegenFallback.scala:56)
    at org.apache.spark.sql.catalyst.expressions.JsonToStructs.doGenCode(jsonExpressions.scala:541)

The plugin works correctly for plans that contain only fully codegen'd expressions; it
breaks any plan that contains a CodegenFallback expression — most visibly from_json
(JsonToStructs), but the same applies to any expression that extends CodegenFallback
and uses ctx.INPUT_ROW in its generated code.

Root cause

In spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala
(lines 127-128 on main):

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String =
  consume(ctx, input)

This drops the third argument. The Spark CodegenSupport.consume signature is

final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String

When TimedWithCodegenExec is inserted between operators, downstream CodegenSupport
nodes that consult ctx.INPUT_ROW see null instead of the underlying row variable.

CodegenFallback.doGenCode (Spark 3.5, CodegenFallback.scala:56) interpolates
val input = ctx.INPUT_ROW into a code"..." block:

ev.copy(code = code"""
  | ...
  | ((Expression) references[$idx]).eval($input);
  | ...
""")

The Scala code macro at javaCode.scala:237-250 walks each interpolated arg and calls
arg.getClass to dispatch by type; a null arg here triggers
cannotInterpolateClassIntoCodeBlockError, which immediately NPEs on arg.getClass
before the type-error message can even be constructed.

Environemnt

spark verison: 3.5 (3.5.6-amzn-1)
platform: EMR (7.12.0, Scala 2.12)

  • DataFlint spark_2.12-0.9.7 (also reproduces against earlier 0.9.x — the offending
    code in TimedExec.scala has not changed materially in recent versions)
  • spark.plugins=io.dataflint.spark.SparkDataflintPlugin
  • spark.dataflint.instrument.spark.enabled=true
  • AQE on (default), Kryo serializer

To Reproduce

Steps to reproduce the behavior:

  1. Start a Spark 3.5.6 session on EMR 7.12 with the DataFlint plugin enabled:
    spark.plugins=io.dataflint.spark.SparkDataflintPlugin,
    spark.dataflint.instrument.spark.enabled=true.

  2. Run the following PySpark snippet (uses from_json, a CodegenFallback expression):

    from pyspark.sql.functions import from_json, col, explode
    from pyspark.sql.types import ArrayType, StructType, StructField, StringType
    
    schema = ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("kind", StringType(), True),
    ]))
    
    df = spark.createDataFrame(
        [("k1", '[{"name":"a","kind":"x"}]'), ("k2", None)],
        "key STRING, payload STRING",
    )
    
    (df
     .filter(col("payload").isNotNull())
     .withColumn("parsed", from_json(col("payload"), schema))
     .filter(col("parsed").isNotNull())
     .select("key", explode("parsed").alias("d"))
     .filter(col("d.name").isNotNull())
     .count())
  3. Trigger the action (.count()).

  4. See the NPE in the stack trace above, raised from
    CodegenFallback.doGenCodeBlock.code interpolation.

Expected behavior

The query runs to completion and returns the row count, the same as it does without
the DataFlint plugin or with whole-stage codegen disabled. TimedWithCodegenExec
should propagate the row parameter to consume, so downstream CodegenFallback
expressions see a valid ctx.INPUT_ROW:

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String =
  consume(ctx, input, if (row == null) null else row.value)

This matches how stock Spark operators that don't transform rows (e.g.
InputAdapter's consume path) propagate INPUT_ROW.

Screenshots

N/A — failure is a JVM stack trace (included above).

Additional context

  • Workarounds: Setting spark.dataflint.instrument.spark.enabled=false (plugin
    still loaded) avoids the crash and is what we are deploying. Setting
    spark.sql.codegen.wholeStage=false also avoids it but at a real performance cost.
  • Impact: Any Spark workload using DataFlint instrumentation that contains a
    CodegenFallback expression which references INPUT_ROW in its generated code.
    The most common offender is from_json, but the same path is used by other
    expressions in this family. Because the bug is plan-shape-dependent, it can lurk
    in pipelines for a long time and then surface when a new column or preprocessor
    introduces a CodegenFallback expression — exactly how we hit it.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions