Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35912][SQL] Fix cast struct contains null value to string/struct #33146

Closed
wants to merge 3 commits into from

Conversation

cfmcgrady
Copy link
Contributor

@cfmcgrady cfmcgrady commented Jun 30, 2021

What changes were proposed in this pull request?

This PR fixes an issue that cast the struct which contains null value to other type has a difference result when we enable/disable codegen.

Here is an example:

  val schema = StructType(Seq(StructField("value",
    StructType(Seq(
      StructField("x", IntegerType, nullable = false),
      StructField("y", IntegerType, nullable = false)
    ))
  )))
  schema.printTreeString()
  // root
  // |-- value: struct (nullable = true)
  // |    |-- x: integer (nullable = false)
  // |    |-- y: integer (nullable = false)
  
  val testDS = Seq("""{"value":{"x":1}}""").toDS
  val jsonDS = spark.read.schema(schema).json(testDS)

  jsonDS.show()
  // incorrect result
  // +---------+
  // |    value|
  // +---------+
  // |{1, null}|
  // +---------+

  spark.sql("set spark.sql.codegen.wholeStage=false")
  jsonDS.show()
  // the result we expected due to y.nullable = false.
  // +------+
  // | value|
  // +------+
  // |{1, 0}|
  // +------+

Actually, the result should be depending on the field nullable setting. this bug also happens when we cast struct to struct.

Does this PR introduce any user-facing change?

No, only bug fix.

How was this patch tested?

New test.

@github-actions github-actions bot added the SQL label Jun 30, 2021
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

val javaType = JavaCode.javaType(ft)
code"""
|${if (i != 0) code"""$buffer.append(",");""" else EmptyBlock}
|if ($row.isNullAt($i)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the actual value is null, for primitive type field, row.isNullAt(i) return ture, but row.getXXX return a default value.

For exmaple:

val r = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(Array(1, null))
println(r.getInt(0))   // 1
println(r.getInt(1))   // 0
println(r.isNullAt(1)) // true

so we cann't only check row.isNullAt(i) here, we need to do the same logical like BoundReference.doGenCode(), add nullable check.

@HyukjinKwon
Copy link
Member

how is the cache issue related to the cast?

@cfmcgrady
Copy link
Contributor Author

how is the cache issue related to the cast?

HI, @HyukjinKwon
The root cause is cast struct to other types, updated the pr description.

@cfmcgrady cfmcgrady changed the title [WIP][SPARK-35912][SQL] Fix cast struct contains null value to string [SPARK-35912][SQL] Fix cast struct contains null value to string/struct Jun 30, 2021
@cfmcgrady
Copy link
Contributor Author

cc @cloud-fan @viirya @maropu

@@ -406,19 +406,21 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
if (row.numFields > 0) {
val st = fields.map(_.dataType)
val toUTF8StringFuncs = st.map(castToString)
if (row.isNullAt(0)) {
if (fields(0).nullable && row.isNullAt(0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if fields(0).nullable is false, how can row.isNullAt(0) be true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I have the same question)

Copy link
Contributor Author

@cfmcgrady cfmcgrady Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user create dataframe from spark.internalCreateDataFrame(), the row.isNullAt() may be true even though the schema nullable is false.
For instance:

  val schema = StructType(Seq(
    StructField("x",
      StructType(Seq(
        StructField("y", IntegerType, true),
        StructField("z", IntegerType, false)
      )))))
  val rdd = spark.sparkContext.parallelize(Seq(InternalRow(InternalRow(1, null))))
  val df = spark.internalCreateDataFrame(rdd, schema)
  df.show
  // current master branch output
  //  +---------+
  //  |        x|
  //  +---------+
  //  |{1, null}|
  //  +---------+

Although the spark.internalCreateDataFrame() is sql package private API, but spark.read.json() and spark.read.csv() call it without null value handled.(the example show in pr description)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we need to fix the nullability. There are so many places in the Spark codebase that relies on nullability to do optimizations. It's not possible to change all of them to not trust the nullability anymore.

Can we fix spark.read.json() to set the nullability correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me try.

@HyukjinKwon
Copy link
Member

Hey mind explaining why cast path issue is related to being cached?


test("SPARK-35912: Cast struct contains the null value to string") {
Seq(true, false).foreach {
case nullable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

    Seq(true, false).foreach { nullable =>

}

test("SPARK-35912: Cast struct contains the null value to struct") {
Seq(true, false).foreach { case nullable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Seq(true, false).foreach { nullable =>

checkEvaluation(ret, expected)
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please remove this blank.

@@ -406,19 +406,21 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
if (row.numFields > 0) {
val st = fields.map(_.dataType)
val toUTF8StringFuncs = st.map(castToString)
if (row.isNullAt(0)) {
if (fields(0).nullable && row.isNullAt(0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I have the same question)

@cfmcgrady
Copy link
Contributor Author

Hey mind explaining why cast path issue is related to being cached?

Actually, the cached result is what we want. The issue is that spark.read.json is not an expected result when the input schema has a nullable setting to false. From the log, we found it's a cast operation.

=== Result of Batch Preparations ===
 CollectLimit 21                                    CollectLimit 21
!+- Project [cast(value#5 as string) AS value#18]   +- *(1) Project [cast(value#5 as string) AS value#18]
!   +- Scan ExistingRDD[value#5]                       +- *(1) Scan ExistingRDD[value#5]

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 1, 2021

Shouldn't it fail instead of setting it as 0 or null? I feel like the handling should be done somewhere in JacksonParser.

@cfmcgrady
Copy link
Contributor Author

Shouldn't it fail instead of setting it as 0 or null? I feel like the handling should be done somewhere in JacksonParser.

Thanks for your suggestion, I'll try.

@cfmcgrady
Copy link
Contributor Author

Create a new PR

BTW, Shall we merge this PR? The cast issue may occur when the user create dataframe from API spark.internalCreateDataFrame(). cc @cloud-fan @maropu

@cloud-fan
Copy link
Contributor

spark.internalCreateDataFrame() is a private API and users should be responsible if they do something wrong. We can't merge this PR, because we can't sacrifice the performance in the happy path for invalid usage of private APIs from users.

@cfmcgrady cfmcgrady closed this Jul 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants