New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13186][Streaming]migrate away from SynchronizedMap #11250
Conversation
}) | ||
|
||
ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint | ||
ssc.start() | ||
|
||
def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty) | ||
def numBatchesWithData: Int = collectedData.asScala.count(_._2._2.nonEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we have a problem in lines like this, still. I think this is what @holdenk was alluding to. This returns a wrapper on the collection, and then iterates over it to count non-empty elements. But it may be modified by the put
above while that happens, throwing ConcurrentModificationException
. We'd have to clone it, or synchronize on the whole object while counting (the latter is probably better).
In that case, it may not add any value to use Java's ConcurrentHashMap
. Synchronizing access to mutable.HashMap
is the same and doesn't require using a Java type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen
Thanks for your comment. For the 5 files I changed, I will remove the usage of Java ConcurrentHashMap, and use mutable.HashMap instead. I will wrap every mutable.HashMap operation in a synchronized block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That could work, we can also just use things like collectedData.values().asScala.count(_._2.nonEmpty)
@@ -268,9 +270,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun | |||
|
|||
// Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges | |||
// and return the same data | |||
val times = collectedData.keySet | |||
val times = collectedData.synchronized { collectedData.keySet } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'd either have to make a copy of the key set being returned, or synchronize the entire foreach
block below. The set is (I believe) backed by the collection and can be modified during iteratio.
Jenkins test this please |
Test build #51627 has finished for PR 11250 at commit
|
Merged to master |
trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead.