New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe #36496
Conversation
return _cachedColumnBuffers != null && isCachedRDDLoaded | ||
} | ||
} | ||
false | ||
} | ||
|
||
def isCachedRDDLoaded: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is really helpful, is it a good idea to add another synchronized block in isCacheRDDLoaded
too? It seems _cachedColumnBuffersAreLoaded
is also volatile and while there is no other call to isCacheRDDLoaded
, adding that should be safe for another method to call it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait a sec though, _cachedColumnBuffers is set to null in a synchronized block (not synchronized on the object itself). Don't we just need a similar synchronized block? I'm always worried about holding two different locks in this class, as it can lead to deadlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we just need a similar synchronized block?
You are right, updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is really helpful, is it a good idea to add another synchronized block in
isCacheRDDLoaded
too? It seems_cachedColumnBuffersAreLoaded
is also volatile and while there is no other call toisCacheRDDLoaded
, adding that should be safe for another method to call it.
Added, anyway adding a synchronized block is not a bad idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After second thought, I think it doesn't make sense to add synchronized
block in isCacheRDDLoaded
, becuase we need to check _cachedColumnBuffers != null
again and that make isCacheRDDLoaded
as same as isCachedColumnBuffersLoaded
.
Alternatively, I change isCacheRDDLoaded
to private to avoid accident un-thread-safe invokes.
Can one of the admins verify this patch? |
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Is there a chance to add a new test? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks reasonable to me
Sorry I missed this comment, added. |
@@ -563,4 +563,33 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSparkSession { | |||
} | |||
} | |||
} | |||
|
|||
test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested this UT manually, seems it can't reproduce the issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the UT and reproduced the issue on local, but I would say it's not easy to reproduce
[info] InMemoryColumnarQuerySuite:
01:58:25.558 WARN org.apache.spark.util.Utils: Your hostname, Chengs-Mac-mini.local resolves to a loopback address: 127.0.0.1; using 10.221.96.10 instead (on interface en1)
01:58:25.562 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
01:58:25.808 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception: org.scalatest.exceptions.TestFailedException thrown from the UncaughtExceptionHandler in thread "Thread-10"
@@ -563,4 +564,51 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSparkSession { | |||
} | |||
} | |||
} | |||
|
|||
test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pan3793 I try to change the UT as follow:
test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe") {
val plan = spark.range(1).queryExecution.executedPlan
val serializer = new TestCachedBatchSerializer(true, 1)
val cachedRDDBuilder = CachedRDDBuilder(serializer, MEMORY_ONLY, plan, None)
@volatile var isCachedColumnBuffersLoaded = false
@volatile var stopped = false
val th1 = new Thread {
override def run(): Unit = {
while (!isCachedColumnBuffersLoaded && !stopped) {
cachedRDDBuilder.cachedColumnBuffers
cachedRDDBuilder.clearCache()
}
}
}
val th2 = new Thread {
override def run(): Unit = {
while (!isCachedColumnBuffersLoaded && !stopped) {
isCachedColumnBuffersLoaded = cachedRDDBuilder.isCachedColumnBuffersLoaded
}
}
}
val th3 = new Thread {
override def run(): Unit = {
Thread.sleep(3000L)
stopped = true;
}
}
val exceptionCnt = new AtomicInteger
val exceptionHandler: Thread.UncaughtExceptionHandler = (_: Thread, cause: Throwable) => {
exceptionCnt.incrementAndGet
fail(cause)
}
th1.setUncaughtExceptionHandler(exceptionHandler)
th2.setUncaughtExceptionHandler(exceptionHandler)
th1.start()
th2.start()
th3.start()
th1.join()
th2.join()
th3.join()
cachedRDDBuilder.clearCache()
assert(exceptionCnt.get == 0)
}
then
Exception: org.scalatest.exceptions.TestFailedException thrown from the UncaughtExceptionHandler in thread "Thread-16"
1 did not equal 0
ScalaTestFailureLocation: org.apache.spark.sql.execution.columnar.InMemoryColumnarQuerySuite at (InMemoryColumnarQuerySuite.scala:617)
Expected :0
Actual :1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, the change makes it easy to reproduce, adopted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
Hm, try tests again? I'm having trouble seeing the error. I thought it might be MiMa, because you make a method private, but not sure that is it |
Hmm, let me check the error message first |
Two jobs failed, hive slow test failed because of OOM, another is pyspark(not familiar with python), re-triggered |
All tests past now https://github.com/pan3793/spark/runs/6471801942?check_suite_focus=true |
…d be thread-safe ### What changes were proposed in this pull request? Add `synchronized` on method `isCachedColumnBuffersLoaded` ### Why are the changes needed? `isCachedColumnBuffersLoaded` should has `synchronized` wrapped, otherwise may cause NPE when modify `_cachedColumnBuffers` concurrently. ``` def isCachedColumnBuffersLoaded: Boolean = { _cachedColumnBuffers != null && isCachedRDDLoaded } def isCachedRDDLoaded: Boolean = { _cachedColumnBuffersAreLoaded || { val bmMaster = SparkEnv.get.blockManager.master val rddLoaded = _cachedColumnBuffers.partitions.forall { partition => bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, partition.index), false) .exists { case(_, blockStatus) => blockStatus.isCached } } if (rddLoaded) { _cachedColumnBuffersAreLoaded = rddLoaded } rddLoaded } } ``` ``` java.lang.NullPointerException at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176) at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:395) at scala.collection.TraversableLike.filter$(TraversableLike.scala:395) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219) at org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT. Closes #36496 from pan3793/SPARK-39104. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit 3c8d8d7) Signed-off-by: Sean Owen <srowen@gmail.com>
…d be thread-safe ### What changes were proposed in this pull request? Add `synchronized` on method `isCachedColumnBuffersLoaded` ### Why are the changes needed? `isCachedColumnBuffersLoaded` should has `synchronized` wrapped, otherwise may cause NPE when modify `_cachedColumnBuffers` concurrently. ``` def isCachedColumnBuffersLoaded: Boolean = { _cachedColumnBuffers != null && isCachedRDDLoaded } def isCachedRDDLoaded: Boolean = { _cachedColumnBuffersAreLoaded || { val bmMaster = SparkEnv.get.blockManager.master val rddLoaded = _cachedColumnBuffers.partitions.forall { partition => bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, partition.index), false) .exists { case(_, blockStatus) => blockStatus.isCached } } if (rddLoaded) { _cachedColumnBuffersAreLoaded = rddLoaded } rddLoaded } } ``` ``` java.lang.NullPointerException at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176) at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:395) at scala.collection.TraversableLike.filter$(TraversableLike.scala:395) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219) at org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT. Closes #36496 from pan3793/SPARK-39104. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit 3c8d8d7) Signed-off-by: Sean Owen <srowen@gmail.com>
Merged to master/3.3/3.2 |
…d be thread-safe ### What changes were proposed in this pull request? Add `synchronized` on method `isCachedColumnBuffersLoaded` ### Why are the changes needed? `isCachedColumnBuffersLoaded` should has `synchronized` wrapped, otherwise may cause NPE when modify `_cachedColumnBuffers` concurrently. ``` def isCachedColumnBuffersLoaded: Boolean = { _cachedColumnBuffers != null && isCachedRDDLoaded } def isCachedRDDLoaded: Boolean = { _cachedColumnBuffersAreLoaded || { val bmMaster = SparkEnv.get.blockManager.master val rddLoaded = _cachedColumnBuffers.partitions.forall { partition => bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, partition.index), false) .exists { case(_, blockStatus) => blockStatus.isCached } } if (rddLoaded) { _cachedColumnBuffersAreLoaded = rddLoaded } rddLoaded } } ``` ``` java.lang.NullPointerException at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176) at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:395) at scala.collection.TraversableLike.filter$(TraversableLike.scala:395) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219) at org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT. Closes apache#36496 from pan3793/SPARK-39104. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit 3c8d8d7) Signed-off-by: Sean Owen <srowen@gmail.com>
What changes were proposed in this pull request?
Add
synchronized
on methodisCachedColumnBuffersLoaded
Why are the changes needed?
isCachedColumnBuffersLoaded
should hassynchronized
wrapped, otherwise may cause NPE when modify_cachedColumnBuffers
concurrently.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UT.