Skip to content

Commit

Permalink
[SPARK-28967][CORE] Include cloned version of "properties" to avoid C…
Browse files Browse the repository at this point in the history
…oncurrentModificationException

### What changes were proposed in this pull request?

This patch fixes the bug which throws ConcurrentModificationException when job with 0 partition is submitted via DAGScheduler.

### Why are the changes needed?

Without this patch, structured streaming query throws ConcurrentModificationException, like below stack trace:

```
19/09/04 09:48:49 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:237)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.JsonProtocol$.mapToJson(JsonProtocol.scala:514)
	at org.apache.spark.util.JsonProtocol$.$anonfun$propertiesToJson$1(JsonProtocol.scala:520)
	at scala.Option.map(Option.scala:163)
	at org.apache.spark.util.JsonProtocol$.propertiesToJson(JsonProtocol.scala:519)
	at org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:155)
	at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:79)
	at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:149)
	at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:217)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:99)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:84)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:102)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:102)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:97)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:93)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:93)
```

Please refer https://issues.apache.org/jira/browse/SPARK-28967 for detailed reproducer.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Newly added UT. Also manually tested via running simple structured streaming query in spark-shell.

Closes apache#25672 from HeartSaVioR/SPARK-28967.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
HeartSaVioR authored and PavithraRamachandran committed Sep 14, 2019
1 parent 2a6ed4d commit 4284696
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ private[spark] class DAGScheduler(
if (partitions.isEmpty) {
val time = clock.getTimeMillis()
listenerBus.post(
SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties))
SparkListenerJobStart(jobId, time, Seq[StageInfo](), SerializationUtils.clone(properties)))
listenerBus.post(
SparkListenerJobEnd(jobId, time, JobSucceeded))
// Return immediately if the job is running 0 tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference}

import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.util.control.NonFatal

import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.exceptions.TestFailedException
import org.scalatest.time.SpanSugar._

import org.apache.spark._
Expand All @@ -36,7 +37,7 @@ import org.apache.spark.rdd.{DeterministicLevel, RDD}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, ThreadUtils, Utils}

class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
extends DAGSchedulerEventProcessLoop(dagScheduler) {
Expand Down Expand Up @@ -788,6 +789,36 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
}

test("SPARK-28967 properties must be cloned before posting to listener bus for 0 partition") {
val properties = new Properties()
val func = (context: TaskContext, it: Iterator[(_)]) => 1
val resultHandler = (taskIndex: Int, result: Int) => {}
val assertionError = new AtomicReference[TestFailedException](
new TestFailedException("Listener didn't receive expected JobStart event", 0))
val listener = new SparkListener() {
override def onJobStart(event: SparkListenerJobStart): Unit = {
try {
assert(event.properties.equals(properties), "Expected same content of properties, " +
s"but got properties with different content. props in caller ${properties} /" +
s" props in event ${event.properties}")
assert(event.properties.ne(properties), "Expected instance with different identity, " +
"but got same instance.")
assertionError.set(null)
} catch {
case e: TestFailedException => assertionError.set(e)
}
}
}
sc.addSparkListener(listener)

// 0 partition
val testRdd = new MyRDD(sc, 0, Nil)
val waiter = scheduler.submitJob(testRdd, func, Seq.empty, CallSite.empty,
resultHandler, properties)
sc.listenerBus.waitUntilEmpty(1000L)
assert(assertionError.get() === null)
}

// Helper function to validate state when creating tests for task failures
private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
assert(stageAttempt.stageId === stageId)
Expand Down

0 comments on commit 4284696

Please sign in to comment.