diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 352ce5e41a3c1..18c2b1c6fbab7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -22,7 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import scala.concurrent.ExecutionContext.Implicits.global -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.util.Random @@ -37,7 +37,6 @@ import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.util._ -import scala.collection.mutable private[spark] sealed trait BlockValues @@ -112,7 +111,7 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) - private val cachedPeers = new mutable.HashSet[BlockManagerId] + private val cachedPeers = new HashSet[BlockManagerId] private var lastPeerFetchTime = 0L initialize() @@ -792,7 +791,7 @@ private[spark] class BlockManager( /** * Get peer block managers in the system. */ - private def getPeers(forceFetch: Boolean): mutable.Set[BlockManagerId] = { + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds def timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl @@ -813,8 +812,8 @@ private[spark] class BlockManager( private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 - val peersReplicatedTo = new mutable.HashSet[BlockManagerId] - val peersFailedToReplicateTo = new mutable.HashSet[BlockManagerId] + val peersReplicatedTo = new HashSet[BlockManagerId] + val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) val startTime = System.nanoTime