Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan #44688

Closed
wants to merge 4 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Jan 11, 2024

What changes were proposed in this pull request?

This PR proposes to fix the bug on canonicalizing the plan which contains the physical node of dropDuplicatesWithinWatermark (StreamingDeduplicateWithinWatermarkExec).

Why are the changes needed?

Canonicalization of the plan will replace the expressions (including attributes) to remove out cosmetic, including name, "and metadata", which denotes the event time column marker.

StreamingDeduplicateWithinWatermarkExec assumes that the input attributes of child node contain the event time column, and it is determined at the initialization of the node instance. Once canonicalization is being triggered, child node will lose the notion of event time column from its attributes, and copy of StreamingDeduplicateWithinWatermarkExec will be performed which instantiating a new node of StreamingDeduplicateWithinWatermarkExec with new child node, which no longer has an event time column, hence instantiation will fail.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UT added.

Was this patch authored or co-authored using generative AI tooling?

No.

@HeartSaVioR
Copy link
Contributor Author

cc. @zsxwing @viirya @anishshri-db Please take a look.
Also cc. @cloud-fan Maybe this does not require streaming expertise, hence asking for a review.

Thanks in advance!

@WweiL
Copy link
Contributor

WweiL commented Jan 11, 2024

As the finder of the bug repro and a curious learner of Catalyst I'd really appreciate it if anyone can tell me

  1. Where is this canonicalized spark plan constructed in this simple test case, or why do we even bother to create such a plan for spark physical plans??
  2. Why without select (i.e. Project), the query runs successfully? It should still go through this canonicalization process right?
    Thanks in advance!

@HeartSaVioR
Copy link
Contributor Author

There are some optimization rules being applied to the physical plan to reuse the shuffle/subquery in batch side across microbatches (they are unlikely changed between microbatches unless the batch source has updated). One example is ReuseExchangeAndSubquery.

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/ReuseExchangeAndSubquery.scala

This rule leverages canonicalized plan to compare with cached subtree (plan). It does not require the plan to be exactly same - as long as they are identical without cosmetic.

@HeartSaVioR
Copy link
Contributor Author

cc. @zsxwing @viirya @anishshri-db @cloud-fan Friendly reminder.


testStream(dedupe, Append)(
StartStream(checkpointLocation = checkpoint.getCanonicalPath),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should we skip newline ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

_.metadata.getLong(EventTimeWatermark.delayKey))
private val eventTimeColOrdinalOpt: Option[Int] = eventTimeColOpt.map(child.output.indexOf)

// Below three variables will be set lazily when doExecute() is called.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is only possible for this operator. But do you think there is a way to enforce this at an operator level so that future operators won't miss this case ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually leverage trait already for watermark. See WatermarkSupport. I don't see a way to enforce this for operator-specific changes.

@HeartSaVioR
Copy link
Contributor Author

@anishshri-db Thanks for reviewing!

cc. @zsxwing @viirya @cloud-fan @HyukjinKwon Mind having a quick look given @anishshri-db reviewed this already? Thanks in advance!

@HeartSaVioR
Copy link
Contributor Author

GA failure is just a linter issue - Sphinx version.

@cloud-fan
Copy link
Contributor

If some attribute metadata streaming is not cosmetic, shall we keep them during canonicalization?

@HeartSaVioR
Copy link
Contributor Author

@cloud-fan Do you suggest retaining metadata selectively, or keep all metadata? For event time column metadata I guess it's unlikely to have a side effect, but I'm not sure about propagating all metadata.

@cloud-fan
Copy link
Contributor

Do you suggest retaining metadata selectively, or keep all metadata?

I mean the metadatas that are not cosmetic, and I think even time metadata is one of them.

@HeartSaVioR
Copy link
Contributor Author

@cloud-fan
Ah OK. Thanks for the suggestion. I can look into how to propagate event time metadata only. Will ping you again once I'm done with it.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jan 17, 2024

@cloud-fan
It does not seem to be an easy fix - I have a hard time debugging on this:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala

eventTime in the constructor does not seem to match with child.output via semanticEquals. (Maybe issuing exprId happens separately?)

Does canonicalization care about making child.output and attribute referring child.output in sync after canonicalization? Otherwise I expect the fix to be very involved and current fix of the PR sounds to me as reasonable workaround.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jan 17, 2024

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jan 17, 2024

I'm not really sure I understand the logic correctly.

val dedupeInputData = MemoryStream[(String, Int)]
val dedupe = dedupeInputData.toDS()
  .withColumn("eventTime", timestamp_seconds($"_2"))
  .withWatermark("eventTime", "10 second")
  .dropDuplicatesWithinWatermark("_1")
  .select($"_1", $"eventTime".cast("long").as[Long])
20:59:20.593 WARN org.apache.spark.sql.execution.streaming.EventTimeWatermarkExec: 
CALL [withNewChildInternal] newChild: *(1) !Project [none#0, timestamp_seconds(none#1) AS #0]

The above Project represents withColumn in above query. Do we expect exprId 0 to be used "twice"? They are referring different columns (second one is Alias against expression AFAIK) but use the same exprId on canonicalization. Are we issuing the exprId correctly on canonicalization?
(FYI, eventTime in EventTimeWatermarkExec is canonicalized as none#1.)

You can change EventTimeWatermarkExec and run the new test in this PR to reproduce. (probably just printing out newChild is enough.)

  override protected def withNewChildInternal(newChild: SparkPlan): EventTimeWatermarkExec = {
    val metadataMap = newChild.output.map { attr =>
      attr.name -> attr.metadata
    }
    logWarning(s"CALL [withNewChildInternal] newChild: $newChild, metadata: $metadataMap")
    copy(child = newChild)
  }

@cloud-fan
Copy link
Contributor

There are two kinds of expr IDs: 1) to reference attributes from children. 2) to indicate the output of the plan node.

After canonicalization, the output expr IDs for each plan node (if they have output columns, such as Project) will be normalized to 0, 1, 2, .... The reference expr IDs will be normalized to the ordinal of the matching column from children output columns. So ideally they can match, as the ordinal 0 means the first output column, whose expr ID should also be 0. However, it's possible that an attribute is both a reference and a output, e.g. Project(a#1 AS aa#3, b#2). So it seems we can't find the event column on a canonicalized plan.

@HeartSaVioR
Copy link
Contributor Author

OK, looks like we have a case where propagating event time metadata on canonicalization won't help. Given that current fix could handle such case as well, shall we allow the current fix as it is?

@HeartSaVioR
Copy link
Contributor Author

@cloud-fan Mind having another look, please? The failure in GA is only from doc generation (sphinx version), which is a known one.

@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing! Merging to master/3.5.

HeartSaVioR added a commit that referenced this pull request Jan 19, 2024
…nonicalization of the plan

### What changes were proposed in this pull request?

This PR proposes to fix the bug on canonicalizing the plan which contains the physical node of dropDuplicatesWithinWatermark (`StreamingDeduplicateWithinWatermarkExec`).

### Why are the changes needed?

Canonicalization of the plan will replace the expressions (including attributes) to remove out cosmetic, including name, "and metadata", which denotes the event time column marker.

StreamingDeduplicateWithinWatermarkExec assumes that the input attributes of child node contain the event time column, and it is determined at the initialization of the node instance. Once canonicalization is being triggered, child node will lose the notion of event time column from its attributes, and copy of StreamingDeduplicateWithinWatermarkExec will be performed which instantiating a new node of `StreamingDeduplicateWithinWatermarkExec` with new child node, which no longer has an event time column, hence instantiation will fail.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT added.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44688 from HeartSaVioR/SPARK-46676.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit c1ed3e6)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants