Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17816] [Core] Fix ConcurrentModificationException issue in BlockStatusesAccumulator #15371

Closed
wants to merge 15 commits into from

Conversation

seyfe
Copy link
Contributor

@seyfe seyfe commented Oct 6, 2016

What changes were proposed in this pull request?

Change the BlockStatusesAccumulator to return immutable object when value method is called.

How was this patch tested?

Existing tests plus I verified this change by running a pipeline which consistently repro this issue.

This is the stack trace for this exception:
java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at java.util.ArrayList$Itr.next(ArrayList.java:851) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) at scala.collection.TraversableLike$class.to(TraversableLike.scala:590) at scala.collection.AbstractTraversable.to(Traversable.scala:104) at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294) at scala.collection.AbstractTraversable.toList(Traversable.scala:104) at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314) at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291) at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291) at scala.Option.map(Option.scala:146) at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291) at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283) at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283) at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)

@seyfe seyfe changed the title [SPARK-17463] [Core] TaskInfoToJson should map using List(immutable) rather than ListBuffer(mutable) [SPARK-17463] [Core] Fix ConcurrentModificationException issue in JsonProtocol Oct 6, 2016
@hvanhovell
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Oct 6, 2016

Test build #66425 has finished for PR 15371 at commit 1934dc9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@seyfe
Copy link
Contributor Author

seyfe commented Oct 6, 2016

Unfortunately, this PR doesn't fix the java.util.ConcurrentModificationException. I can still repro it. I will spend more time on it tomorrow morning.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm not sure this is the issue. It's being modified while it's being serialized.

("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
})
val blockAccumulator =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why this would help. You add a wrapper, but, synchronizing your local access to it doesn't do anything because nothing else is synchronizing on it.

PS can the List[(BlockId,BlockStatus)] type just be part of the match predicate?

@@ -281,7 +281,7 @@ private[spark] object JsonProtocol {
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
("Killed" -> taskInfo.killed) ~
("Accumulables" -> JArray(taskInfo.accumulables.map(accumulableInfoToJson).toList))
("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do -- just puts a copy before the work of mapping? I could see how that would tend to help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't the root cause but it's something nice to have. If you prefer, I can revert this line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK if it's related cleanup, and potentially helps a closely related manifestation

@rxin
Copy link
Contributor

rxin commented Oct 6, 2016

One comment: once you figured out the proper fix, please add some comments inline so they don't get accidentally removed in the future.

@zsxwing
Copy link
Member

zsxwing commented Oct 6, 2016

@seyfe Thanks for reporting this one. Actually, it's different from SPARK-17463. Could you create a new ticket for this issue, please? The cause is we send a mutable TaskInfo to listeners but we may still update TaskInfo's fields (e.g., accumulables) in another thread... Ideally, all events sent to the listeners should be immutable.

@seyfe
Copy link
Contributor Author

seyfe commented Oct 6, 2016

Hi @zsxwing. I have a fix ready and testing it now. I will create a new ticket and send an updated PR today.

@seyfe seyfe changed the title [SPARK-17463] [Core] Fix ConcurrentModificationException issue in JsonProtocol [SPARK-17816] [Core] Fix ConcurrentModificationException issue in JsonProtocol Oct 6, 2016
@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66473 has finished for PR 15371 at commit 5380aff.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Oct 7, 2016

@seyfe The issue is TaskInfo.accumulables is accessed in multiple threads without synchronization. TaskMetrics.updatedBlockStatuses is fine.

@seyfe
Copy link
Contributor Author

seyfe commented Oct 7, 2016

@zsxwing. BlockStatusesAccumulator is using synchronizedList which is _seq for concurrency. But when the value method is executed and the output is used by asScala.toList it caused the ConcurrentModificationException.

Collections.synchronizedList suggests using synchronized keyword when using its iterator which is done by asScala method. Does that make sense?

@zsxwing
Copy link
Member

zsxwing commented Oct 7, 2016

@zsxwing. BlockStatusesAccumulator is using synchronizedList which is _seq for concurrency. But when the value method is executed and the output is used by asScala.toList it caused the ConcurrentModificationException.

Collections.synchronizedList suggests using synchronized keyword when using its iterator which is done by asScala method. Does that make sense?

@seyfe The comment you are deleting explains why it's safe: the driver doesn't modify BlockStatusesAccumulator.

@seyfe
Copy link
Contributor Author

seyfe commented Oct 7, 2016

@zsxwing . JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map this is the line of code which is causing ConcurrentModificationException. And you can see that from call stack. This is coming from BlockStatusesAccumulator. Am I missing something here?

@seyfe
Copy link
Contributor Author

seyfe commented Oct 7, 2016

I also want to point out that below is the core part of fix. Rest of the code changes are side-effects of it.

+  // `asScala` accesses the internal values using `java.util.Iterator` so needs to be synchronized
 +  override def value: List[(BlockId, BlockStatus)] = {
 +    _seq.synchronized {
 +      _seq.asScala.toList
 +    }
 +  }

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumb question but how big is this? the solution here is to copy the data structure, which is a good defensive move, as long as it's not big and nothing is actually relying on observing changes to the underlying data. Is that valid?

@@ -281,7 +281,7 @@ private[spark] object JsonProtocol {
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
("Killed" -> taskInfo.killed) ~
("Accumulables" -> JArray(taskInfo.accumulables.map(accumulableInfoToJson).toList))
("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK if it's related cleanup, and potentially helps a closely related manifestation

@seyfe
Copy link
Contributor Author

seyfe commented Oct 7, 2016

Hi @srowen,

This PR doesn't introduce any extra data copy operations. It moves the data copy code from JsonProtocol:accumValueToJson method to BlockStatusesAccumulator:value method so it can be called inside the synchronized block. But let me try to answer your question.

I checked the data size using 3 different pipelines. 99% of the time ArrayList has less than 4 items. There was one of case where it maxed at 4000 items but that was less than 1% of the time. I ran my test with 4000 executors, I think that is why this 4000 number came up.

I debated other options as well. Moving Json serialization into BlockStatusesAccumulator would help but I didn't want to mix match these classes. I considered introducing Akka Actors for thread safety but I think that would be an overkill in this scenario since data size is small. That is why I proposed this solution but I would love to hear your proposals.

I don't know the answer for the second part of your question (below), but existing behavior is not changed. Only change is that we can convert ArrayList to a Scala List inside a synchronized block so we won't get ConcurrentModificationException . If we do need to rely on observing changes, I think we do need to have a bigger change. @zsxwing, would you please comment on that part?

nothing is actually relying on observing changes to the underlying data

@srowen
Copy link
Member

srowen commented Oct 7, 2016

I see the new copy (of course or else this wouldn't help) but where is a copy removed? I'm probably overlooking it. A toList call was moved, which we discussed above briefly. I still think it's all fine, just worth double-checking both of our logics.

@seyfe
Copy link
Contributor Author

seyfe commented Oct 7, 2016

Hi @srowen , this is the part that I removed extra copy operation. I changed this line because this conversion is already done by BlockStatusesAccumulator.

JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map {
with
JArray(v.map {

@zsxwing
Copy link
Member

zsxwing commented Oct 7, 2016

@seyfe I'm taking my words back. Yea, BlockStatusesAccumulator.merge will be called in driver.

override def value: java.util.List[(BlockId, BlockStatus)] = _seq
// `asScala` accesses the internal values using `java.util.Iterator` so needs to be synchronized
override def value: List[(BlockId, BlockStatus)] = {
_seq.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_seq.synchronized is wrong. Collections.synchronizedList uses its internal mutex to lock instead of this.

Why changes them to Scala List? Just change this one to java.util.Collections.unmodifiableList(new ArrayList[(BlockId, BlockStatus)](_seq)) should be enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @zsxwing . I checked the java doc and it says that getting iterator is not thread safe and suggests below usage. That's why I did _seq.synchronized

https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html

  List list = Collections.synchronizedList(new ArrayList());
      ...
  synchronized (list) {
      Iterator i = list.iterator(); // Must be in synchronized block
      while (i.hasNext())
          foo(i.next());
  }

Regarding your second questions, JsonProtocal is using it as Scala collection that is why I converted it to a Scala collection so we won't need to convert again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Sorry. Didn't noticed this line (mutex = this;) in Collections.synchronizedList...
  2. I just took a look at CollectionAccumulator. I think we can just make BlockStatusesAccumulator extends CollectionAccumulator. This would eliminate these duplicated codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding #2, I take a look at CollectionAccumulator as well and it seems like a good idea. Let me give it a try.

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66534 has finished for PR 15371 at commit 97977ee.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66535 has finished for PR 15371 at commit 9572ebd.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66537 has finished for PR 15371 at commit a5d3eb7.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@seyfe
Copy link
Contributor Author

seyfe commented Oct 7, 2016

Hi @zsxwing ,

The test failed with below error. I don't think that it's related with my change. Should we just re-run the test or do you have any suggestion?

java.lang.RuntimeException: spark-core: Binary compatibility check failed!
[error] (core/*:mimaReportBinaryIssues) spark-core: Binary compatibility check failed!

@zsxwing
Copy link
Member

zsxwing commented Oct 7, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66539 has finished for PR 15371 at commit a5d3eb7.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@seyfe
Copy link
Contributor Author

seyfe commented Oct 7, 2016

I don't know if it's related but I found a bug in the last iteration. We also need to override copy and copyAndReset methods. Otherwise it throws java.lang.ClassCastException error.

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66541 has finished for PR 15371 at commit 2d5656c.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@seyfe
Copy link
Contributor Author

seyfe commented Oct 7, 2016

@zsxwing , I built it and the test pipelines works fine. So fix is good. But I don't know how to fix the MiMa tests. Would you mind helping me on this?

@zsxwing
Copy link
Member

zsxwing commented Oct 10, 2016

@seyfe I think we can remove BlockStatusesAccumulator and just use private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)] instead. BlockStatusesAccumulator doesn't provide more functions than CollectionAccumulator. Sorry that I didn't find that early.

@seyfe
Copy link
Contributor Author

seyfe commented Oct 10, 2016

@zsxwing , I think that is a good idea. I search it and that is the only place we use BlockStatusesAccumulator. Let me remove it.

@SparkQA
Copy link

SparkQA commented Oct 10, 2016

Test build #66627 has finished for PR 15371 at commit da2311a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Oct 10, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Oct 10, 2016

Test build #66632 has finished for PR 15371 at commit da2311a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 10, 2016

Test build #66669 has finished for PR 15371 at commit 5e00dc3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Oct 11, 2016

LGTM. Thanks! Merging to master and 2.0.

@zsxwing
Copy link
Member

zsxwing commented Oct 11, 2016

There are some conflicts with 2.0. @seyfe could you submit a PR for branch-2.0, please? Thanks!

@asfgit asfgit closed this in 19a5bae Oct 11, 2016
seyfe added a commit to seyfe/spark that referenced this pull request Oct 11, 2016
…kStatusesAccumulator

Change the BlockStatusesAccumulator to return immutable object when value method is called.

Existing tests plus I verified this change by running a pipeline which consistently repro this issue.

This is the stack trace for this exception:
`
java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
        at java.util.ArrayList$Itr.next(ArrayList.java:851)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
        at scala.collection.TraversableLike$class.to(TraversableLike.scala:590)
        at scala.collection.AbstractTraversable.to(Traversable.scala:104)
        at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294)
        at scala.collection.AbstractTraversable.toList(Traversable.scala:104)
        at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314)
        at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
        at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291)
        at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
        at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283)
        at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
        at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
`

Author: Ergin Seyfe <eseyfe@fb.com>

Closes apache#15371 from seyfe/race_cond_jsonprotocal.
@seyfe
Copy link
Contributor Author

seyfe commented Oct 11, 2016

Thanks @zsxwing.

Here is the PR for branch-2.0
#15425

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…kStatusesAccumulator

## What changes were proposed in this pull request?
Change the BlockStatusesAccumulator to return immutable object when value method is called.

## How was this patch tested?
Existing tests plus I verified this change by running a pipeline which consistently repro this issue.

This is the stack trace for this exception:
`
java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
        at java.util.ArrayList$Itr.next(ArrayList.java:851)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
        at scala.collection.TraversableLike$class.to(TraversableLike.scala:590)
        at scala.collection.AbstractTraversable.to(Traversable.scala:104)
        at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294)
        at scala.collection.AbstractTraversable.toList(Traversable.scala:104)
        at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314)
        at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
        at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291)
        at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
        at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283)
        at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
        at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
`

Author: Ergin Seyfe <eseyfe@fb.com>

Closes apache#15371 from seyfe/race_cond_jsonprotocal.
@seyfe seyfe deleted the race_cond_jsonprotocal branch February 27, 2017 19:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants