Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39313][SQL] toCatalystOrdering should fail if V2Expression can not be translated #36697

Closed
wants to merge 4 commits into from

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented May 27, 2022

What changes were proposed in this pull request?

After reading code changes in #35657, I guess the original intention of changing the return type of V2ExpressionUtils.toCatalyst from Expression to Option[Expression] is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict.

Specifically, V2ExpressionUtils.toCatalystOrdering should fail if V2Expression can not be translated instead of returning empty Seq.

Why are the changes needed?

V2ExpressionUtils.toCatalystOrdering is used by DistributionAndOrderingUtils, the current behavior will break the semantics of RequiresDistributionAndOrdering#requiredOrdering in some cases(see UT).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UT.

@pan3793
Copy link
Member Author

pan3793 commented May 27, 2022

cc @sunchao, and @MaxGekk

@github-actions github-actions bot added the SQL label May 27, 2022
@pan3793 pan3793 force-pushed the SPARK-39313 branch 2 times, most recently from 8a560e8 to 35e9bdb Compare May 27, 2022 09:06
@pan3793 pan3793 marked this pull request as ready for review May 27, 2022 09:07
@pan3793
Copy link
Member Author

pan3793 commented May 27, 2022

cc @cloud-fan

val exc = intercept[AnalysisException] {
V2ExpressionUtils.toCatalystOrdering(
Array(supportedV2Sort, unsupportedV2Sort),
LocalRelation.apply(AttributeReference("a", StringType)()))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current master/3.3 return Seq.empty instead of throwing AnalysisException

@pan3793
Copy link
Member Author

pan3793 commented May 27, 2022

My change breaks KeyGroupedPartitioningSuite because it try to write data to V2Relation which claims unsupported distributions and orderings.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change makes sense to me. Thanks for catching it @pan3793 !

Should we raise this as a blocker for 3.3 release @MaxGekk ? I know RC3 already received a few votes - so sorry about it. I think this could cause data corruption if someone uses unsupported transforms on the write path.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @pan3793 .

@@ -40,7 +40,7 @@ object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {

val catalystPartitioning = scan.outputPartitioning() match {
case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map(
V2ExpressionUtils.toCatalyst(_, relation, funCatalogOpt)))
V2ExpressionUtils.toCatalyst(_, relation, false, funCatalogOpt)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, shall we use the following style, @pan3793 ?

- V2ExpressionUtils.toCatalyst(_, relation, false, funCatalogOpt)))
+ V2ExpressionUtils.toCatalyst(_, relation, strict = false, funCatalogOpt)))

import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.types.StringType

class V2ExpressionUtilsSuite extends SparkFunSuite {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2ExpressionUtils is package org.apache.spark.sql.catalyst.expressions while this V2ExpressionUtilsSuite is in package org.apache.spark.sql.execution.datasources.v2.

If this test suite depends on another stuffs (outside org.apache.spark.sql.catalyst.expressions package), we had better avoid creating a new V2ExpressionUtilsSuite here. If you don't mind, shall we move this test case into another suite?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, the current changes are not the final solution, I will make the following changes as suggested.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-39313][SQL] V2ExpressionUtils.toCatalystOrdering should fail if V2Expression can not be translated [SPARK-39313][SQL] toCatalystOrdering should fail if V2Expression can not be translated May 27, 2022
@pan3793
Copy link
Member Author

pan3793 commented May 27, 2022

Thanks to @sunchao for the confirmation, since KeyGroupedPartitioningSuite depends on the bug code to write data, the current change breaks it totally.

I'm trying to get V2Write to support V2Function, but it seems more complicated than I thought at first. Would you please give me some suggestions?

@sunchao
Copy link
Member

sunchao commented May 27, 2022

Sure. Let me check.

@MaxGekk
Copy link
Member

MaxGekk commented May 27, 2022

I think this could cause data corruption ...

If so, I will fail RC3.

@pan3793 pan3793 marked this pull request as draft May 27, 2022 19:12
@sunchao
Copy link
Member

sunchao commented May 27, 2022

@pan3793 this commit should help to fix the tests.

@pan3793
Copy link
Member Author

pan3793 commented May 27, 2022

@pan3793 this commit should help to fix the tests.

Thanks, seems my approach is too overkill to fix it

@sunchao
Copy link
Member

sunchao commented May 27, 2022

I think that can be a separate PR for master only to support function catalog on the write path.

@pan3793 pan3793 marked this pull request as ready for review May 27, 2022 20:27
@pan3793
Copy link
Member Author

pan3793 commented May 27, 2022

Thanks @sunchao and @dongjoon-hyun, changed as suggested.

@pan3793
Copy link
Member Author

pan3793 commented May 28, 2022

@pan3793 this commit should help to fix the tests.

The change breaks "SPARK-30289 Create: partitioned by nested column"

@sunchao
Copy link
Member

sunchao commented May 28, 2022

@pan3793 hmm how? that change is test only and I don't see any test failure in the latest CI run

@pan3793
Copy link
Member Author

pan3793 commented May 28, 2022

[info] - SPARK-30289 Create: partitioned by nested column *** FAILED *** (363 milliseconds)
[info]   java.lang.RuntimeException: Once strategy's idempotence is broken for batch Early Filter and Projection Push-Down
[info]  RelationV2[ts#1494] testcat.table_name   RelationV2[ts#1494] testcat.table_name
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.onceStrategyIdempotenceIsBrokenForBatchError(QueryExecutionErrors.scala:1302)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.checkBatchIdempotence(RuleExecutor.scala:168)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:254)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
[info]   at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
[info]   at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:125)
[info]   at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
[info]   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
[info]   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
[info]   at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
[info]   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:121)
[info]   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:117)
[info]   at org.apache.spark.sql.QueryTest.assertEmptyMissingInput(QueryTest.scala:226)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:148)
[info]   at org.apache.spark.sql.DataFrameWriterV2Suite.$anonfun$new$97(DataFrameWriterV2Suite.scala:692)

In RuleExecutor.checkBatchIdempotence, the Alias node of keyGroupedPartitioning changes exprId.

image

@@ -32,15 +32,15 @@ import org.apache.spark.util.collection.Utils.sequenceToOption
*/
object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, _) =>
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's sufficient to only match keyGroupedPartitioning = None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After second thought, I think it should only match None here, otherwise catalystPartitioning will be calculated every round, if the generated catalystPartitioning contains Alias, will cause !plan.fastEquals(reOptimized) and fail checkBatchIdempotence.

"SPARK-30289 Create: partitioned by nested column" happen to cover this case after we changed the InMemoryTable#outputPartitioning

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it's better to match None here.

@pan3793
Copy link
Member Author

pan3793 commented May 28, 2022

CI is green, please take another look @dongjoon-hyun

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sunchao
Copy link
Member

sunchao commented May 28, 2022

Hmm thinking more about this, I think maybe we should completely retain the previous behavior and only allow identity transform and fail even if a V2 transform exist in the function catalog. Otherwise, the write may fail at a later stage.

@pan3793
Copy link
Member Author

pan3793 commented May 29, 2022

DistributionAndOrderingUtils does not pass FunctionCatalog into toCatalyst, so only the identity transform will be translated.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

}

def toCatalyst(
expr: V2Expression,
query: LogicalPlan,
strict: Boolean = true,
Copy link
Contributor

@cloud-fan cloud-fan May 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of adding a new parameter, I think it's clearer and more type-safe to have 2 methods

def toCatalystOpt ... : Option[Expression]
def toCatalyst(...): Expression = toCatalystOpt(...).getOrElse(throw error)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, updated

val funCatalogOpt = relation.catalog.flatMap {
case c: FunctionCatalog => Some(c)
case _ => None
}

val catalystPartitioning = scan.outputPartitioning() match {
case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map(
V2ExpressionUtils.toCatalyst(_, relation, funCatalogOpt)))
V2ExpressionUtils.toCatalystOpt(_, relation, funCatalogOpt)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should also fail here. If a data source uses an invalid partitioning, we should fail the query and let users know, so that they can debug and fix the data source. Otherwise, users may live with a performance bug for a while as it's hard to figure out where the problem is. cc @sunchao

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, w/o error messages the user must infer the issue by query plan, but I think it's a little bit aggressive to fail the query because it's only a performance issue but not for correctness, how about adding warning logs here? ping @sunchao

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a bit more inclined to use warning messages here. To debug performance issue, it should be relatively straightforward to check out the query plan and see whether there's any shuffle in it, combined with checking query logs. In addition, existing V2 data source tables could already have custom transforms such as bucket, days, etc, and we don't want queries against these tables start to fail (because there is no function catalog) after upgrading to Spark 3.3.

There is already warning message when function catalog exists but the transform function doesn't, in the method loadV2FunctionOpt. It seems we don't have warning message for the case when a custom transform function is used but there is no function catalog.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with the updated changes.

@sunchao sunchao closed this in ef0b87a Jun 1, 2022
@sunchao
Copy link
Member

sunchao commented Jun 1, 2022

Merged to master/3.3, thanks!

sunchao pushed a commit that referenced this pull request Jun 1, 2022
…an not be translated

After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict.

Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq.

`V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT).

No.

New UT.

Closes #36697 from pan3793/SPARK-39313.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
@dongjoon-hyun
Copy link
Member

Thank you, @pan3793 , @sunchao , @cloud-fan !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants