Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37315][ML][TEST] Mitigate ConcurrentModificationException thrown from a test in MLEventSuite #34583

Conversation

sarutak
Copy link
Member

@sarutak sarutak commented Nov 13, 2021

What changes were proposed in this pull request?

This PR is to mitigate ConcurrentModificationException sometimes thrown from a test.
Recently, I notice the exception is thrown from the following part of the test pipeline read/write events in MLEventSuite when Scala 2.13 is used.

events.map(JsonProtocol.sparkEventToJson).foreach { event =>
  assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
}

We can also find this issue from the scheduled build.
https://github.com/apache/spark/runs/4196812399?check_suite_focus=true#step:9:17616

I think the root cause is the ArrayBuffer (events) is updated asynchronously by the following part.

private val listener: SparkListener = new SparkListener {
  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    case e: MLEvent => events.append(e)
    case _ =>
  }
}

You can easily reproduce this issue by applying the following diff to the commit hash 4d29bec.

diff --git a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
index f2343b7a88..ff63639e00 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
@@ -42,7 +42,9 @@ class MLEventsSuite
   private val events = mutable.ArrayBuffer.empty[MLEvent]
   private val listener: SparkListener = new SparkListener {
     override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
-      case e: MLEvent => events.append(e)
+      case e: MLEvent =>
+        events.append(e)
+        Thread.sleep(500)
       case _ =>
     }
   }
@@ -235,11 +237,13 @@ class MLEventsSuite
       }
       // Test if they can be ser/de via JSON protocol.
       assert(events.nonEmpty)
-      events.map(JsonProtocol.sparkEventToJson).foreach { event =>
-        assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
-      }
+        events.map { x =>
+          Thread.sleep(500)
+          JsonProtocol.sparkEventToJson(x)
+        }.foreach { event =>
+          assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+        }
       sc.listenerBus.waitUntilEmpty(timeoutMillis = 10000)
-
       events.clear()
       val pipelineReader = Pipeline.read
       assert(events.isEmpty)

This is a kind of race condition but I think we can mitigate by retrying.

Actually, I have never seen this issue when I used Scala 2.13.5 and recently we upgrade to 2.13.7.
Scala 2.13.7 includes an update to detect ConcurrentModificationException more precisely.
scala/scala#9786

Why are the changes needed?

For test stability.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

I manually modified the test code, inserting sleep like the diff shown above, and confirmed no ConcurrentModificationException is thrown.

@github-actions github-actions bot added the ML label Nov 13, 2021
@SparkQA
Copy link

SparkQA commented Nov 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49659/

@SparkQA
Copy link

SparkQA commented Nov 13, 2021

Test build #145190 has finished for PR 34583 at commit f2443e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak sarutak changed the title [SPARK-37315][ML][TEST] Mitigate a ConcurrentModificationException thrown from a test in MLEventSuite [SPARK-37315][ML][TEST] Mitigate ConcurrentModificationException thrown from a test in MLEventSuite Nov 13, 2021
@SparkQA
Copy link

SparkQA commented Nov 13, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49659/

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. I also observed this flakiness. Thank you, @sarutak .
Merged to master for Apache Spark 3.3.

@sarutak
Copy link
Member Author

sarutak commented Nov 14, 2021

Thank you @dongjoon-hyun !

dongjoon-hyun pushed a commit that referenced this pull request Dec 17, 2021
…hrown from tests in SparkContextSuite

### What changes were proposed in this pull request?

This PR fixes an issue that some tests in `SparkContextSuite` can throw `ConcurrentModificationException` with Scala 2.13.
https://github.com/apache/spark/runs/4543047740?check_suite_focus=true#step:9:20851
The cause seems to be same as SPARK-37315 (#34583).
> Scala 2.13.7 includes an update to detect ConcurrentModificationException more precisely.
scala/scala#9786

You can easily reproduce this issue by applying the following diff to `master`.
```
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index bc809f11cc..e5dde84c6e 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
 -1130,9 +1130,11  class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
       sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?" +
         "invalidParam1=foo&invalidParam2=boo")
       assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
-      assert(logAppender.loggingEvents.exists(_.getRenderedMessage.contains(
-        "Invalid parameters `invalidParam1,invalidParam2` found in Ivy URI query " +
-          "`invalidParam1=foo&invalidParam2=boo`.")))
+      assert(logAppender.loggingEvents.exists { x =>
+        Thread.sleep(1000)
+        x.getRenderedMessage.contains(
+          "Invalid parameters `invalidParam1,invalidParam2` found in Ivy URI query " +
+            "`invalidParam1=foo&invalidParam2=boo`.")})
     }
   }
```

### Why are the changes needed?

Fix the flaky test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Locally checked that no `ConcurrentModificationException` is thrown even if a sleep is inserted like the diff shown above.

Closes #34922 from sarutak/fix-concurrent-access-issue.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants