Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects #40812

Closed
wants to merge 4 commits into from

Conversation

robreeves
Copy link
Contributor

@robreeves robreeves commented Apr 16, 2023

What changes were proposed in this pull request?

This is the most narrow fix for the issue observed in SPARK-43157. It does not attempt to identify or solve all potential correctness and concurrency issues from TreeNode.tags being modified in multiple places. It solves the issue described in SPARK-43157 by cloning the cached plan when populating InMemoryRelation.innerChildren. I chose to do the clone at this point to limit the scope to tree traversal used for building up the string representation of the plan, which is where we see the issue. I do not see any other uses for TreeNode.innerChildren. I did not clone any earlier because the caching objects have mutable state that I wanted to avoid touching to be extra safe.

Another solution I tried was to modify InMemoryRelation.clone to create a new CachedRDDBuilder and pass in a cloned cachedPlan. I opted not to go with this approach because CachedRDDBuilder has mutable state that needs to be moved to the new object and I didn't want to add that complexity if not needed.

Why are the changes needed?

When caching is used the cached part of the SparkPlan is leaked to new clones of the plan. This leakage is an issue because if the TreeNode.tags are modified in one plan, it impacts the other plan. This is a correctness issue and a concurrency issue if the TreeNode.tags are set in different threads for the cloned plans.

See the description of SPARK-43157 for an example of the concurrency issue.

Does this PR introduce any user-facing change?

Yes. It fixes a driver hanging issue the user can observe.

How was this patch tested?

Unit test added and I manually verified Dataset.explain("formatted") still had the expected output.

spark.range(10).cache.filter($"id" > 5).explain("formatted")

== Physical Plan ==
* Filter (4)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- * Range (3)


(1) InMemoryTableScan
Output [1]: [id#0L]
Arguments: [id#0L], [(id#0L > 5)]

(2) InMemoryRelation
Arguments: [id#0L], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@418b946b,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16)
,None), [id#0L ASC NULLS FIRST]

(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (0, 10, step=1, splits=Some(16))

(4) Filter [codegen id : 1]
Input [1]: [id#0L]
Condition : (id#0L > 5)

I also verified that the InMemory.innerChildren is cloned when the entire plan is cloned.

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import spark.implicits._

def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
  if (plan.isInstanceOf[InMemoryTableScanExec]) {
    Some(plan.asInstanceOf[InMemoryTableScanExec])
  } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
    None
  } else {
    (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
      plan.children.flatMap(findCacheOperator)).headOption
  }
}

val df = spark.range(10).filter($"id" < 100).cache()
val df1 = df.limit(1)
val df2 = df.limit(1)

// Get the cache operator (InMemoryTableScanExec) in each plan
val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get

// Check if InMemoryTableScanExec references point to the same object
println(plan1.eq(plan2))
// returns false// Check if InMemoryRelation references point to the same object

println(plan1.relation.eq(plan2.relation))
// returns false

// Check if the cached SparkPlan references point to the same object
println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head))
// returns false
// This shows the issue is fixed

@github-actions github-actions bot added the SQL label Apr 16, 2023
@robreeves robreeves changed the title [WIP][SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects Apr 17, 2023
@robreeves robreeves marked this pull request as ready for review April 17, 2023 16:54
Copy link
Contributor

@shardulm94 shardulm94 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @robreeves! This seems like a reasonable change with limited scope. I looked at the usages of innerChildren and saw that its only being used to generate plan strings (either for the short format or the detailed format). So this should not affect existing query execution behavior of cached plans.

@mridulm
Copy link
Contributor

mridulm commented Apr 27, 2023

+CC @cloud-fan, @dongjoon-hyun who have reviewed work on explain output earlier.

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @mridulm .

Also, cc @sunchao

@@ -389,7 +389,7 @@ case class InMemoryRelation(

@volatile var statsOfPlanToCache: Statistics = null

override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
override val innerChildren: Seq[SparkPlan] = Seq(cachedPlan.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean the cloned cachedPlan can still be shared by multiple threads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since plans are considered immutable and usually cloned, thread safety is not an issue. cachedPlan was not cloned so it was easy to get in a situation where multiple threads access the same plan objects. This solves that narrow issue. After this the cachedPlan is no more susceptible to concurrency issues than any other object in the plan.

I still think there is a wider thread safety issue with the mutable state in TreeNode.tags, but I need to investigate it more to propose a holistic solution. Consider the case where in the application code:

  1. User thread 1 calls d.explain("formatted") where d is a Dataset object. This will set the TreeNode.tag keys QueryPlan.OP_ID_TAG and QueryPlan.CODEGEN_ID_TAG.
  2. At the same time in user thread 2 they call d.filter($"foo" > 1). This will create a new dataset that reads the TreeNode.tags during the clone.

At first I considered making TreeNode.tags a thread-safe map, but that only solves the concurrency issue. At least for this issue, there would still be a correctness issue with other threads setting different operator ids for the same node. TreeNode.tags is used in so many other places that I'm not sure if there are other correctness issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan friendly ping on this

@robreeves robreeves requested a review from cloud-fan May 1, 2023 17:18
@cloud-fan
Copy link
Contributor

I agree that we do need to copy the cached plan when getting it, but I feel the current change is hard to understand and reason about.

I think a better place to fix is CacheManager#useCachedDataInternal. It does copy InMemoryRelation but does not copy the physical plan:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L298-L306

We can add a new method InMemoryRelation#freshCopy which takes new output attributes and copy the physical plan, then we change cached.cachedRepresentation.withOutput(currentFragment.output) to cached.cachedRepresentation.freshCopy(currentFragment.output)

@robreeves
Copy link
Contributor Author

I agree that we do need to copy the cached plan when getting it, but I feel the current change is hard to understand and reason about.

I think a better place to fix is CacheManager#useCachedDataInternal. It does copy InMemoryRelation but does not copy the physical plan: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L298-L306

We can add a new method InMemoryRelation#freshCopy which takes new output attributes and copy the physical plan, then we change cached.cachedRepresentation.withOutput(currentFragment.output) to cached.cachedRepresentation.freshCopy(currentFragment.output)

The challenge with this approach is that InMemoryRelation does not get the physical plan passed in directly. It is through CachedRDDBuilder. I originally tried making a new CachedRDDBuilder with a copy of the physical plan, but it broke other tests due to the private state it maintains.

@cloud-fan
Copy link
Contributor

I see, can we change def cachedPlan: SparkPlan = cacheBuilder.cachedPlan to lazy val cachedPlan: SparkPlan = cacheBuilder.cachedPlan.clone with comments to explain it? This makes sure that a new instance of InMemoryRelation will have a new instance of the physical plan.

@robreeves
Copy link
Contributor Author

@cloud-fan Cloning the cachedPlan is also problematic because it contains state (accumulators in private fields) when it includes a CollectMetricsExec operator. CollectMetricsExec.collect specifically looks at the InMemoryRelation.cachedPlan to get the stateful metrics.

I verified this is an issue by cloning InMemoryRelation.cachedPlan. Then I modified a unit test that uses Dataset.observe to cache the dataset. This breaks the unit test. When I revert cloning the cachedPlan it passes. Here is how I modified the unit test to prove the issue.

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index f046daacb91..3de33e3e1b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -277,7 +277,7 @@ class DataFrameCallbackSuite extends QueryTest
         max($"id").as("max_val"),
         // Test unresolved alias
         sum($"id"),
-        count(when($"id" % 2 === 0, 1)).as("num_even"))
+        count(when($"id" % 2 === 0, 1)).as("num_even")).cache()
       .observe(
         name = "other_event",
         avg($"id").cast("int").as("avg_val"))

So I think we should stick with only cloning it for the innerChildren since the only usage is the ExplainUtils and statefulness doesn't matter, besides the TreeNode.tag values.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.4!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @robreeves and @cloud-fan .

@cloud-fan cloud-fan closed this in 5e59995 May 18, 2023
cloud-fan pushed a commit that referenced this pull request May 18, 2023
…ed plan from referencing same objects

### What changes were proposed in this pull request?
This is the most narrow fix for the issue observed in SPARK-43157. It does not attempt to identify or solve all potential correctness and concurrency issues from TreeNode.tags being modified in multiple places. It solves the issue described in  SPARK-43157 by cloning the cached plan when populating `InMemoryRelation.innerChildren`. I chose to do the clone at this point to limit the scope to tree traversal used for building up the string representation of the plan, which is where we see the issue. I do not see any other uses for `TreeNode.innerChildren`. I did not clone any earlier because the caching objects have mutable state that I wanted to avoid touching to be extra safe.

Another solution I tried was to modify `InMemoryRelation.clone` to create a new `CachedRDDBuilder` and pass in a cloned `cachedPlan`. I opted not to go with this approach because `CachedRDDBuilder` has mutable state that needs to be moved to the new object and I didn't want to add that complexity if not needed.

### Why are the changes needed?
When caching is used the cached part of the SparkPlan is leaked to new clones of the plan. This leakage is an issue because if the TreeNode.tags are modified in one plan, it impacts the other plan. This is a correctness issue and a concurrency issue if the TreeNode.tags are set in different threads for the cloned plans.

See the description of [SPARK-43157](https://issues.apache.org/jira/browse/SPARK-43157) for an example of the concurrency issue.

### Does this PR introduce _any_ user-facing change?
Yes. It fixes a driver hanging issue the user can observe.

### How was this patch tested?
Unit test added and I manually verified `Dataset.explain("formatted")` still had the expected output.
```scala
spark.range(10).cache.filter($"id" > 5).explain("formatted")

== Physical Plan ==
* Filter (4)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- * Range (3)

(1) InMemoryTableScan
Output [1]: [id#0L]
Arguments: [id#0L], [(id#0L > 5)]

(2) InMemoryRelation
Arguments: [id#0L], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer418b946b,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16)
,None), [id#0L ASC NULLS FIRST]

(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (0, 10, step=1, splits=Some(16))

(4) Filter [codegen id : 1]
Input [1]: [id#0L]
Condition : (id#0L > 5)
```

I also verified that the `InMemory.innerChildren` is cloned when the entire plan is cloned.
```scala
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import spark.implicits._

def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
  if (plan.isInstanceOf[InMemoryTableScanExec]) {
    Some(plan.asInstanceOf[InMemoryTableScanExec])
  } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
    None
  } else {
    (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
      plan.children.flatMap(findCacheOperator)).headOption
  }
}

val df = spark.range(10).filter($"id" < 100).cache()
val df1 = df.limit(1)
val df2 = df.limit(1)

// Get the cache operator (InMemoryTableScanExec) in each plan
val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get

// Check if InMemoryTableScanExec references point to the same object
println(plan1.eq(plan2))
// returns false// Check if InMemoryRelation references point to the same object

println(plan1.relation.eq(plan2.relation))
// returns false

// Check if the cached SparkPlan references point to the same object
println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head))
// returns false
// This shows the issue is fixed
```

Closes #40812 from robreeves/roreeves/explain_util.

Authored-by: Rob Reeves <roreeves@linkedin.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5e59995)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…ed plan from referencing same objects

### What changes were proposed in this pull request?
This is the most narrow fix for the issue observed in SPARK-43157. It does not attempt to identify or solve all potential correctness and concurrency issues from TreeNode.tags being modified in multiple places. It solves the issue described in  SPARK-43157 by cloning the cached plan when populating `InMemoryRelation.innerChildren`. I chose to do the clone at this point to limit the scope to tree traversal used for building up the string representation of the plan, which is where we see the issue. I do not see any other uses for `TreeNode.innerChildren`. I did not clone any earlier because the caching objects have mutable state that I wanted to avoid touching to be extra safe.

Another solution I tried was to modify `InMemoryRelation.clone` to create a new `CachedRDDBuilder` and pass in a cloned `cachedPlan`. I opted not to go with this approach because `CachedRDDBuilder` has mutable state that needs to be moved to the new object and I didn't want to add that complexity if not needed.

### Why are the changes needed?
When caching is used the cached part of the SparkPlan is leaked to new clones of the plan. This leakage is an issue because if the TreeNode.tags are modified in one plan, it impacts the other plan. This is a correctness issue and a concurrency issue if the TreeNode.tags are set in different threads for the cloned plans.

See the description of [SPARK-43157](https://issues.apache.org/jira/browse/SPARK-43157) for an example of the concurrency issue.

### Does this PR introduce _any_ user-facing change?
Yes. It fixes a driver hanging issue the user can observe.

### How was this patch tested?
Unit test added and I manually verified `Dataset.explain("formatted")` still had the expected output.
```scala
spark.range(10).cache.filter($"id" > 5).explain("formatted")

== Physical Plan ==
* Filter (4)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- * Range (3)

(1) InMemoryTableScan
Output [1]: [id#0L]
Arguments: [id#0L], [(id#0L > 5)]

(2) InMemoryRelation
Arguments: [id#0L], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer418b946b,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16)
,None), [id#0L ASC NULLS FIRST]

(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (0, 10, step=1, splits=Some(16))

(4) Filter [codegen id : 1]
Input [1]: [id#0L]
Condition : (id#0L > 5)
```

I also verified that the `InMemory.innerChildren` is cloned when the entire plan is cloned.
```scala
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import spark.implicits._

def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
  if (plan.isInstanceOf[InMemoryTableScanExec]) {
    Some(plan.asInstanceOf[InMemoryTableScanExec])
  } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
    None
  } else {
    (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
      plan.children.flatMap(findCacheOperator)).headOption
  }
}

val df = spark.range(10).filter($"id" < 100).cache()
val df1 = df.limit(1)
val df2 = df.limit(1)

// Get the cache operator (InMemoryTableScanExec) in each plan
val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get

// Check if InMemoryTableScanExec references point to the same object
println(plan1.eq(plan2))
// returns false// Check if InMemoryRelation references point to the same object

println(plan1.relation.eq(plan2.relation))
// returns false

// Check if the cached SparkPlan references point to the same object
println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head))
// returns false
// This shows the issue is fixed
```

Closes apache#40812 from robreeves/roreeves/explain_util.

Authored-by: Rob Reeves <roreeves@linkedin.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5e59995)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
GladwinLee pushed a commit to lyft/spark that referenced this pull request Oct 10, 2023
…ed plan from referencing same objects

### What changes were proposed in this pull request?
This is the most narrow fix for the issue observed in SPARK-43157. It does not attempt to identify or solve all potential correctness and concurrency issues from TreeNode.tags being modified in multiple places. It solves the issue described in  SPARK-43157 by cloning the cached plan when populating `InMemoryRelation.innerChildren`. I chose to do the clone at this point to limit the scope to tree traversal used for building up the string representation of the plan, which is where we see the issue. I do not see any other uses for `TreeNode.innerChildren`. I did not clone any earlier because the caching objects have mutable state that I wanted to avoid touching to be extra safe.

Another solution I tried was to modify `InMemoryRelation.clone` to create a new `CachedRDDBuilder` and pass in a cloned `cachedPlan`. I opted not to go with this approach because `CachedRDDBuilder` has mutable state that needs to be moved to the new object and I didn't want to add that complexity if not needed.

### Why are the changes needed?
When caching is used the cached part of the SparkPlan is leaked to new clones of the plan. This leakage is an issue because if the TreeNode.tags are modified in one plan, it impacts the other plan. This is a correctness issue and a concurrency issue if the TreeNode.tags are set in different threads for the cloned plans.

See the description of [SPARK-43157](https://issues.apache.org/jira/browse/SPARK-43157) for an example of the concurrency issue.

### Does this PR introduce _any_ user-facing change?
Yes. It fixes a driver hanging issue the user can observe.

### How was this patch tested?
Unit test added and I manually verified `Dataset.explain("formatted")` still had the expected output.
```scala
spark.range(10).cache.filter($"id" > 5).explain("formatted")

== Physical Plan ==
* Filter (4)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- * Range (3)

(1) InMemoryTableScan
Output [1]: [id#0L]
Arguments: [id#0L], [(id#0L > 5)]

(2) InMemoryRelation
Arguments: [id#0L], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer418b946b,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16)
,None), [id#0L ASC NULLS FIRST]

(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (0, 10, step=1, splits=Some(16))

(4) Filter [codegen id : 1]
Input [1]: [id#0L]
Condition : (id#0L > 5)
```

I also verified that the `InMemory.innerChildren` is cloned when the entire plan is cloned.
```scala
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import spark.implicits._

def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
  if (plan.isInstanceOf[InMemoryTableScanExec]) {
    Some(plan.asInstanceOf[InMemoryTableScanExec])
  } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
    None
  } else {
    (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
      plan.children.flatMap(findCacheOperator)).headOption
  }
}

val df = spark.range(10).filter($"id" < 100).cache()
val df1 = df.limit(1)
val df2 = df.limit(1)

// Get the cache operator (InMemoryTableScanExec) in each plan
val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get

// Check if InMemoryTableScanExec references point to the same object
println(plan1.eq(plan2))
// returns false// Check if InMemoryRelation references point to the same object

println(plan1.relation.eq(plan2.relation))
// returns false

// Check if the cached SparkPlan references point to the same object
println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head))
// returns false
// This shows the issue is fixed
```

Closes apache#40812 from robreeves/roreeves/explain_util.

Authored-by: Rob Reeves <roreeves@linkedin.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5e59995)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Oct 10, 2023
…ed plan from referencing same objects

### What changes were proposed in this pull request?
This is the most narrow fix for the issue observed in SPARK-43157. It does not attempt to identify or solve all potential correctness and concurrency issues from TreeNode.tags being modified in multiple places. It solves the issue described in  SPARK-43157 by cloning the cached plan when populating `InMemoryRelation.innerChildren`. I chose to do the clone at this point to limit the scope to tree traversal used for building up the string representation of the plan, which is where we see the issue. I do not see any other uses for `TreeNode.innerChildren`. I did not clone any earlier because the caching objects have mutable state that I wanted to avoid touching to be extra safe.

Another solution I tried was to modify `InMemoryRelation.clone` to create a new `CachedRDDBuilder` and pass in a cloned `cachedPlan`. I opted not to go with this approach because `CachedRDDBuilder` has mutable state that needs to be moved to the new object and I didn't want to add that complexity if not needed.

### Why are the changes needed?
When caching is used the cached part of the SparkPlan is leaked to new clones of the plan. This leakage is an issue because if the TreeNode.tags are modified in one plan, it impacts the other plan. This is a correctness issue and a concurrency issue if the TreeNode.tags are set in different threads for the cloned plans.

See the description of [SPARK-43157](https://issues.apache.org/jira/browse/SPARK-43157) for an example of the concurrency issue.

### Does this PR introduce _any_ user-facing change?
Yes. It fixes a driver hanging issue the user can observe.

### How was this patch tested?
Unit test added and I manually verified `Dataset.explain("formatted")` still had the expected output.
```scala
spark.range(10).cache.filter($"id" > 5).explain("formatted")

== Physical Plan ==
* Filter (4)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- * Range (3)

(1) InMemoryTableScan
Output [1]: [id#0L]
Arguments: [id#0L], [(id#0L > 5)]

(2) InMemoryRelation
Arguments: [id#0L], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer418b946b,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16)
,None), [id#0L ASC NULLS FIRST]

(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (0, 10, step=1, splits=Some(16))

(4) Filter [codegen id : 1]
Input [1]: [id#0L]
Condition : (id#0L > 5)
```

I also verified that the `InMemory.innerChildren` is cloned when the entire plan is cloned.
```scala
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import spark.implicits._

def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
  if (plan.isInstanceOf[InMemoryTableScanExec]) {
    Some(plan.asInstanceOf[InMemoryTableScanExec])
  } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
    None
  } else {
    (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
      plan.children.flatMap(findCacheOperator)).headOption
  }
}

val df = spark.range(10).filter($"id" < 100).cache()
val df1 = df.limit(1)
val df2 = df.limit(1)

// Get the cache operator (InMemoryTableScanExec) in each plan
val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get

// Check if InMemoryTableScanExec references point to the same object
println(plan1.eq(plan2))
// returns false// Check if InMemoryRelation references point to the same object

println(plan1.relation.eq(plan2.relation))
// returns false

// Check if the cached SparkPlan references point to the same object
println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head))
// returns false
// This shows the issue is fixed
```

Closes apache#40812 from robreeves/roreeves/explain_util.

Authored-by: Rob Reeves <roreeves@linkedin.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5e59995)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@liuzqt
Copy link
Contributor

liuzqt commented Feb 27, 2024

I'm revisiting this PR since it introduced a new issue https://issues.apache.org/jira/browse/SPARK-47177 I don't have a clear idea yet how to fix this, a few ideas out of my head:

  • maybe we just a coarse granularity lock in explain?
  • alternatively, make innerChildren a function: clone the initial plan, every time check for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no: return the cloned initial plan, if it's finalized: clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version.

cc @cloud-fan to provide some feedback

@liuzqt
Copy link
Contributor

liuzqt commented Feb 28, 2024

Had a discussion with @maryannxue , a few thoughts:

  • might not be a good idea to implicitly override innerChildren with clone behavior? it's a general function from base class, even if it's only used for generateString for now. If we're trying to narrowly fix the explain issue, is it possible to hold a lock on callsite? Or explicitly do some clone on callsite
  • apart from explain string, is there any other usage that could be vulnerable to concurrency issue?

@robreeves
Copy link
Contributor Author

@liuzqt in theory this issue is not limited to explain string. The core issue is that TreeNode.tags introduces mutability into an otherwise immutable plan. This is mostly fine because a cloned plan creates all new objects, including the TreeNode.tags object. The problem is the caching operators do not get cloned when the plan is cloned. The object will be referenced by the old and new plan. So any TreeNode.tags mutation now risks concurrency issues. Explain string is not the only place that modifies the tags (look for TreeNode.setTagValue usage) so the more narrow fix does not solve the vulnerability. As a long term solution I think mutability (the tags) should be removed from plans, but I know that is not the most practical suggestion for you right now.

@robreeves
Copy link
Contributor Author

@liuzqt another idea for a narrow explain utils fix is to move the QueryPlan.OP_ID_TAG currently stored in the TreeNode.tags into a thread local map maintained in explain utils. This would keep the same state and eliminate the concurrency issue here by not using the tags.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants