Skip to content

Commit

Permalink
[SPARK-34731][CORE] Avoid ConcurrentModificationException when redact…
Browse files Browse the repository at this point in the history
…ing properties in EventLoggingListener

### What changes were proposed in this pull request?

Change DAGScheduler to pass a clone of the Properties object, rather than the original object, to the SparkListenerJobStart event.

### Why are the changes needed?

 DAGScheduler might modify the Properties object (e.g., in addPySparkConfigsToProperties) after firing off the SparkListenerJobStart event. Since the handler for that event (onJobStart in EventLoggingListener) will iterate over the elements of the Property object, this sometimes results in a ConcurrentModificationException.

This can be demonstrated using these steps:
```
$ bin/spark-shell --conf spark.ui.showConsoleProgress=false \
--conf spark.executor.cores=1 --driver-memory 4g --conf \
"spark.ui.showConsoleProgress=false" \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/tmp/spark-events
...
scala> (0 to 500).foreach { i =>
     |   val df = spark.range(0, 20000).toDF("a")
     |   df.filter("a > 12").count
     | }
21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
```

I've not actually seen a ConcurrentModificationException in onStageSubmitted, only in onJobStart. However, they both iterate over the Properties object, so for safety's sake I pass a clone to SparkListenerStageSubmitted as well.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By repeatedly running the reproduction steps from above.

Closes #31826 from bersprockets/elconcurrent.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit f8a8b34)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
bersprockets authored and HyukjinKwon committed Mar 18, 2021
1 parent 4bda955 commit dd825e8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -890,18 +890,19 @@ private[spark] class DAGScheduler(
timeout: Long,
properties: Properties): PartialResult[R] = {
val jobId = nextJobId.getAndIncrement()
val clonedProperties = Utils.cloneProperties(properties)
if (rdd.partitions.isEmpty) {
// Return immediately if the job is running 0 tasks
val time = clock.getTimeMillis()
listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties))
listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), clonedProperties))
listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
return new PartialResult(evaluator.currentResult(), true)
}
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener,
Utils.cloneProperties(properties)))
clonedProperties))
listener.awaitResult() // Will throw an exception if the job fails
}

Expand Down Expand Up @@ -1162,7 +1163,8 @@ private[spark] class DAGScheduler(
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
Utils.cloneProperties(properties)))
submitStage(finalStage)
}

Expand Down Expand Up @@ -1200,7 +1202,8 @@ private[spark] class DAGScheduler(
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
Utils.cloneProperties(properties)))
submitStage(finalStage)

// If the whole stage has already finished, tell the listener and remove it
Expand Down Expand Up @@ -1333,7 +1336,8 @@ private[spark] class DAGScheduler(
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
Utils.cloneProperties(properties)))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
Expand All @@ -1347,7 +1351,8 @@ private[spark] class DAGScheduler(
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
Utils.cloneProperties(properties)))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3000,6 +3000,9 @@ private[spark] object Utils extends Logging {

/** Create a new properties object with the same values as `props` */
def cloneProperties(props: Properties): Properties = {
if (props == null) {
return props
}
val resultProps = new Properties()
props.forEach((k, v) => resultProps.put(k, v))
resultProps
Expand Down

0 comments on commit dd825e8

Please sign in to comment.