Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40460][SS] Fix streaming metrics when selecting _metadata #37905

Closed
wants to merge 4 commits into from

Conversation

Yaohua628
Copy link
Contributor

@Yaohua628 Yaohua628 commented Sep 16, 2022

What changes were proposed in this pull request?

Streaming metrics report all 0 (processedRowsPerSecond, etc) when selecting _metadata column. Because the logical plan from the batch and the actual planned logical plan are mismatched. So, here we cannot find the plan and collect metrics correctly.

This PR fixes this by replacing the initial LogicalPlan with the LogicalPlan containing the metadata column

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing + New UTs

@Yaohua628
Copy link
Contributor Author

Hi, @cloud-fan @HeartSaVioR could you please take a look whenever you have a chance? Thanks! Happy weekend!

@@ -590,7 +591,7 @@ class MicroBatchExecution(
val newBatchesPlan = logicalPlan transform {
// For v1 sources.
case StreamingExecutionRelation(source, output, catalogTable) =>
newData.get(source).map { dataPlan =>
mutableNewData.get(source).map { dataPlan =>
val hasFileMetadata = output.exists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at the code, seems the problem is we resolve the metadata columns in every micro-batch. Shouldn't we only resolve it once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will require Source to indicate the request of metadata column and produce the logical plan accordingly when getBatch is called. My understanding is that DSv1 source does not have an interface to receive the information of which columns will be referred in actual query.

@@ -590,7 +591,7 @@ class MicroBatchExecution(
val newBatchesPlan = logicalPlan transform {
// For v1 sources.
case StreamingExecutionRelation(source, output, catalogTable) =>
newData.get(source).map { dataPlan =>
mutableNewData.get(source).map { dataPlan =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here, probably less intrusive change would be moving (L594 ~ L610) to L567. After the change we wouldn't need to make a change to newData here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I initially thought about that, but we need to know the output from StreamingExecutionRelation(source, output, catalogTable) to resolve _metadata right (L591 ~ L593)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. I missed that.

Btw, looks like my change (tagging catalogTable into LogicalRelation) will also fall into this bug. Thanks for fixing this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Np - an unintentional fix :-)
Thanks for helping!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we may want to check the case of self-union / self-join to verify we really didn't break things. This works only when this condition is true leaf : source = 1 : 1 (otherwise we are overwriting the value in map), while the code comment of ProgressReporter tells there are counter cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it - could you share an example? In this case, does that mean the leaf : source = 1 : N?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code comment actually doesn't say much and I'm speculating. Let's just try a best effort, self-union and self-join. df = spark.readStream... -> df.union(df) / df = spark.readStream... -> df.join(df)

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

val df1 = spark.read.format("json")
.load(dir.getCanonicalPath + "/target/new-streaming-data-union")
// Verify self-union results
assert(streamQuery0.lastProgress.numInputRows == 2L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamQuery1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending build.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master / 3.3.

@HeartSaVioR
Copy link
Contributor

There's conflict in branch-3.3. @Yaohua628 Could you please craft a PR for branch-3.3? Thanks in advance!

Yaohua628 added a commit to Yaohua628/spark that referenced this pull request Sep 19, 2022
Streaming metrics report all 0 (`processedRowsPerSecond`, etc) when selecting `_metadata` column. Because the logical plan from the batch and the actual planned logical plan are mismatched. So, [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L348) we cannot find the plan and collect metrics correctly.

This PR fixes this by replacing the initial `LogicalPlan` with the `LogicalPlan` containing the metadata column

Bug fix.

No

Existing + New UTs

Closes apache#37905 from Yaohua628/spark-40460.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@Yaohua628
Copy link
Contributor Author

There's conflict in branch-3.3. @Yaohua628 Could you please craft a PR for branch-3.3? Thanks in advance!

Done! #37932 - Thank you

HyukjinKwon pushed a commit that referenced this pull request Sep 20, 2022
### What changes were proposed in this pull request?

Cherry-picked from #37905

Streaming metrics report all 0 (`processedRowsPerSecond`, etc) when selecting `_metadata` column. Because the logical plan from the batch and the actual planned logical plan are mismatched. So, [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L348) we cannot find the plan and collect metrics correctly.

This PR fixes this by replacing the initial `LogicalPlan` with the `LogicalPlan` containing the metadata column

### Why are the changes needed?
Bug fix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing + New UTs

Closes #37932 from Yaohua628/spark-40460-3-3.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request Sep 20, 2022
### What changes were proposed in this pull request?
Streaming metrics report all 0 (`processedRowsPerSecond`, etc) when selecting `_metadata` column. Because the logical plan from the batch and the actual planned logical plan are mismatched. So, [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L348) we cannot find the plan and collect metrics correctly.

This PR fixes this by replacing the initial `LogicalPlan` with the `LogicalPlan` containing the metadata column

### Why are the changes needed?
Bug fix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing + New UTs

Closes apache#37905 from Yaohua628/spark-40460.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants