Skip to content

Commit

Permalink
[SPARK-34125][CORE][2.4] Make EventLoggingListener.codecMap thread-safe
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
`EventLoggingListener.codecMap` change `mutable.HashMap` to `ConcurrentHashMap`

### Why are the changes needed?
2.x version of history server
`EventLoggingListener.codecMap` is of type mutable.HashMap, which is not thread safe.
This will cause the history server to suddenly get stuck and not work.
The 3.x version was changed to `EventLogFileReader.codecMap` to `ConcurrentHashMap` type, so there is no such problem.(SPARK-28869)

Multiple threads call `openEventLog`, `codecMap` is updated by multiple threads, `mutable.HashMap` may fall into an infinite loop during `resize`, resulting in history server not working.
scala/bug#10436

PID 117049 0x1c939
![image](https://user-images.githubusercontent.com/3898450/104753904-9239c280-5793-11eb-8a2d-89324ccfb92c.png)

![image](https://user-images.githubusercontent.com/3898450/104753921-9534b300-5793-11eb-99e6-51ac66051d2a.png)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exist ut

Closes #31194 from cxzl25/SPARK-34125.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
cxzl25 authored and HeartSaVioR committed Jan 18, 2021
1 parent 7ae6c8d commit e0e1e21
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -290,7 +291,7 @@ private[spark] object EventLoggingListener extends Logging {
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)

// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
private val codecMap = new ConcurrentHashMap[String, CompressionCodec]

/**
* Write metadata about an event log to the given stream.
Expand Down Expand Up @@ -357,7 +358,10 @@ private[spark] object EventLoggingListener extends Logging {
val in = new BufferedInputStream(fs.open(log))
try {
val codec = codecName(log).map { c =>
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
codecMap.computeIfAbsent(c, new Function[String, CompressionCodec] {
override def apply(key: String): CompressionCodec =
CompressionCodec.createCodec(new SparkConf, key)
})
}
codec.map(_.compressedInputStream(in)).getOrElse(in)
} catch {
Expand Down

0 comments on commit e0e1e21

Please sign in to comment.