Skip to content

Commit

Permalink
SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHa…
Browse files Browse the repository at this point in the history
…shMap
  • Loading branch information
zsxwing committed Jul 23, 2014
1 parent cd273a2 commit d450053
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.spark

import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.concurrent.Await
import scala.collection.JavaConversions._

import akka.actor._
import akka.pattern.ask
Expand Down Expand Up @@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
* master's corresponding HashMap.
*
* Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
* thread-safe map.
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]

Expand Down Expand Up @@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
* MapOutputTrackerMaster.
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
protected val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]
}

private[spark] object MapOutputTracker {
Expand Down

0 comments on commit d450053

Please sign in to comment.