Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33136][SQL] Fix mistakenly swapped parameter in V2WriteCommand.outputResolved #30033

Closed
wants to merge 1 commit into from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to fix a bug on calling DataType.equalsIgnoreCompatibleNullability with mistakenly swapped parameters in V2WriteCommand.outputResolved. The order of parameters for DataType.equalsIgnoreCompatibleNullability are from and to, which says that the right order of matching variables are inAttr and outAttr.

Why are the changes needed?

Spark throws AnalysisException due to unresolved operator in v2 write, while the operator is unresolved due to a bug that parameters to call DataType.equalsIgnoreCompatibleNullability in outputResolved have been swapped.

Does this PR introduce any user-facing change?

Yes, end users no longer suffer on unresolved operator in v2 write if they're trying to write dataframe containing non-nullable complex types against table matching complex types as nullable.

How was this patch tested?

New UT added.

@HeartSaVioR
Copy link
Contributor Author

Btw that was hard to debug and required me to deal with Spark test code, as we get nothing from the error message on the case when all columns are matched. (In other words, considered as unresolved due to the type incompatibility on write between same column.) We can't get the information about the reason why the operator is considered as unresolved even we turn on TRACE log.

org.apache.spark.sql.AnalysisException: unresolved operator 'AppendData RelationV2[col_b#225, col_i#226, col_l#227L, col_f#228, col_d#229, col_da#230, col_ts_tz#231, col_s#232, col_fi#233, col_bi#234, col_de_1#235, col_de_2#236, col_de_3#237, col_st#238, col_li#239, col_ma#240] table_convert_read_all_types_5, Map(path -> table_convert_read_all_types_5), true;;
'AppendData RelationV2[col_b#225, col_i#226, col_l#227L, col_f#228, col_d#229, col_da#230, col_ts_tz#231, col_s#232, col_fi#233, col_bi#234, col_de_1#235, col_de_2#236, col_de_3#237, col_st#238, col_li#239, col_ma#240] table_convert_read_all_types_5, Map(path -> table_convert_read_all_types_5), true
+- Project [col_b#12, col_i#13, col_l#14L, col_f#15, col_d#16, col_da#49, col_ts_tz#63, col_s#17, col_fi#18, col_bi#19, col_de_1#78, col_de_2#94, col_de_3#111, col_st#21, col_li#22, col_ma#23]
   +- Project [col_b#12, col_i#13, col_l#14L, col_f#15, col_d#16, col_s#17, col_fi#18, col_bi#19, col_st#21, col_li#22, col_ma#23, col_da#49, col_ts_tz#63, col_de_1#78, col_de_2#94, col_de_3#111]
      +- Project [col_b#12, col_i#13, col_l#14L, col_f#15, col_d#16, col_s#17, col_fi#18, col_bi#19, col_de#20, col_st#21, col_li#22, col_ma#23, col_da#49, col_ts_tz#63, col_de_1#78, col_de_2#94, cast(col_de#20 as decimal(38,10)) AS col_de_3#111]
         +- Project [col_b#12, col_i#13, col_l#14L, col_f#15, col_d#16, col_s#17, col_fi#18, col_bi#19, col_de#20, col_st#21, col_li#22, col_ma#23, col_da#49, col_ts_tz#63, col_de_1#78, cast(col_de#20 as decimal(11,2)) AS col_de_2#94]
            +- Project [col_b#12, col_i#13, col_l#14L, col_f#15, col_d#16, col_s#17, col_fi#18, col_bi#19, col_de#20, col_st#21, col_li#22, col_ma#23, col_da#49, col_ts_tz#63, cast(col_de#20 as decimal(9,0)) AS col_de_1#78]
               +- Project [col_b#12, col_i#13, col_l#14L, col_f#15, col_d#16, col_s#17, col_fi#18, col_bi#19, col_de#20, col_st#21, col_li#22, col_ma#23, col_da#49, now() AS col_ts_tz#63]
                  +- Project [col_b#12, col_i#13, col_l#14L, col_f#15, col_d#16, col_s#17, col_fi#18, col_bi#19, col_de#20, col_st#21, col_li#22, col_ma#23, current_date(Some(Asia/Seoul)) AS col_da#49]
                     +- LocalRelation [col_b#12, col_i#13, col_l#14L, col_f#15, col_d#16, col_s#17, col_fi#18, col_bi#19, col_de#20, col_st#21, col_li#22, col_ma#23]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:49)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:48)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:130)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$43(CheckAnalysis.scala:666)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$43$adapted(CheckAnalysis.scala:664)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:664)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:89)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:130)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:156)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:68)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:72)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:71)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:71)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
  at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:85)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:103)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:354)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
  ... 47 elided

@@ -45,7 +45,7 @@ trait V2WriteCommand extends Command {
case (inAttr, outAttr) =>
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) &&
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outAttr.dataType) &&
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @HeartSaVioR . The original code looks like the same with branch-2.4 but the issue is reported at 3.0.0+. Could you confirm that this is 3.0.0-only issue or not?

  override lazy val resolved: Boolean = {
    table.resolved && query.resolved && query.output.size == table.output.size &&
        query.output.zip(table.output).forall {
          case (inAttr, outAttr) =>
            // names and types must match, nullability must be compatible
            inAttr.name == outAttr.name &&
                DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) &&
                (outAttr.nullable || !inAttr.nullable)
        }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks for noticing. Nice finding. I found it from V2WriteCommand so thought it was added later. Will check the code path in branch-2.4 and test it.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of AppendData is reverted in b6e4aca for branch-2.4 and shipped to Spark 2.4.0. So while the code in AppendData for branch-2.4 is broken as well, it's a dead code.

We seem to have three options: 1) revert remaining part of AppendData in branch-2.4 2) fix the code but leave it as dead 3) leave it as it is. What's our preference?

cc. @cloud-fan @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer (2) because there is a downstream using it in their fork.

@@ -101,6 +100,86 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(v2.catalog.exists(_ == catalogPlugin))
}

case class FakeV2WriteCommand(table: NamedRelation, query: LogicalPlan) extends V2WriteCommand

test("SPARK-33136 output resolved on complex types for V2 write commands") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this bug only affects complex types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, according to the implementation of equalsIgnoreCompatibleNullability.

private[sql] def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean = {
    (from, to) match {
      case (ArrayType(fromElement, fn), ArrayType(toElement, tn)) =>
        (tn || !fn) && equalsIgnoreCompatibleNullability(fromElement, toElement)

      case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) =>
        (tn || !fn) &&
          equalsIgnoreCompatibleNullability(fromKey, toKey) &&
          equalsIgnoreCompatibleNullability(fromValue, toValue)

      case (StructType(fromFields), StructType(toFields)) =>
        fromFields.length == toFields.length &&
          fromFields.zip(toFields).forall { case (fromField, toField) =>
            fromField.name == toField.name &&
              (toField.nullable || !fromField.nullable) &&
              equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType)
          }

      case (fromDataType, toDataType) => fromDataType == toDataType
    }
  }

For primitive types the order doesn't affect the result. outputResolved itself does the right comparison, just except the swapped parameters.

@cloud-fan
Copy link
Contributor

We can't get the information about the reason why the operator is considered as unresolved

This is a good point. CheckAnalysis provides better error messages for many cases, and the unresolved operator error is the last resort. I think we should provide better error message for this case as well.

@HeartSaVioR
Copy link
Contributor Author

We can't get the information about the reason why the operator is considered as unresolved

This is a good point. CheckAnalysis provides better error messages for many cases, and the unresolved operator error is the last resort. I think we should provide better error message for this case as well.

Yeah. It looks to be a bit tricky to generalize as any operator can make a decision on resolvable arbitrary. Probably the operator needs to have some method to provide additional information on resolve so that Analyzer can call to print out?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @HeartSaVioR and @cloud-fan .
Merged to master/3.0.

dongjoon-hyun pushed a commit that referenced this pull request Oct 14, 2020
….outputResolved

### What changes were proposed in this pull request?

This PR proposes to fix a bug on calling `DataType.equalsIgnoreCompatibleNullability` with mistakenly swapped parameters in `V2WriteCommand.outputResolved`. The order of parameters for `DataType.equalsIgnoreCompatibleNullability` are `from` and `to`, which says that the right order of matching variables are `inAttr` and `outAttr`.

### Why are the changes needed?

Spark throws AnalysisException due to unresolved operator in v2 write, while the operator is unresolved due to a bug that parameters to call `DataType.equalsIgnoreCompatibleNullability` in `outputResolved` have been swapped.

### Does this PR introduce _any_ user-facing change?

Yes, end users no longer suffer on unresolved operator in v2 write if they're trying to write dataframe containing non-nullable complex types against table matching complex types as nullable.

### How was this patch tested?

New UT added.

Closes #30033 from HeartSaVioR/SPARK-33136.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 8e5cb1d)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@dongjoon-hyun
Copy link
Member

Hi, @HeartSaVioR .
Could you make a backport to branch-2.4?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Oct 15, 2020

#30043 for branch-2.4

@dongjoon-hyun
Copy link
Member

Thank you, @HeartSaVioR !

@HeartSaVioR
Copy link
Contributor Author

Forgot to say, thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-33136 branch October 15, 2020 03:10
holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
….outputResolved

### What changes were proposed in this pull request?

This PR proposes to fix a bug on calling `DataType.equalsIgnoreCompatibleNullability` with mistakenly swapped parameters in `V2WriteCommand.outputResolved`. The order of parameters for `DataType.equalsIgnoreCompatibleNullability` are `from` and `to`, which says that the right order of matching variables are `inAttr` and `outAttr`.

### Why are the changes needed?

Spark throws AnalysisException due to unresolved operator in v2 write, while the operator is unresolved due to a bug that parameters to call `DataType.equalsIgnoreCompatibleNullability` in `outputResolved` have been swapped.

### Does this PR introduce _any_ user-facing change?

Yes, end users no longer suffer on unresolved operator in v2 write if they're trying to write dataframe containing non-nullable complex types against table matching complex types as nullable.

### How was this patch tested?

New UT added.

Closes apache#30033 from HeartSaVioR/SPARK-33136.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 8e5cb1d)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants