Skip to content

Commit

Permalink
Fixed imports.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Sep 12, 2014
1 parent 4a20531 commit d402506
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.concurrent.ExecutionContext.Implicits.global

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Random
Expand All @@ -37,7 +37,6 @@ import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.util._
import scala.collection.mutable


private[spark] sealed trait BlockValues
Expand Down Expand Up @@ -112,7 +111,7 @@ private[spark] class BlockManager(
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
private val cachedPeers = new mutable.HashSet[BlockManagerId]
private val cachedPeers = new HashSet[BlockManagerId]
private var lastPeerFetchTime = 0L

initialize()
Expand Down Expand Up @@ -792,7 +791,7 @@ private[spark] class BlockManager(
/**
* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): mutable.Set[BlockManagerId] = {
private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds
def timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl

Expand All @@ -813,8 +812,8 @@ private[spark] class BlockManager(
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersReplicatedTo = new mutable.HashSet[BlockManagerId]
val peersFailedToReplicateTo = new mutable.HashSet[BlockManagerId]
val peersReplicatedTo = new HashSet[BlockManagerId]
val peersFailedToReplicateTo = new HashSet[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
val startTime = System.nanoTime
Expand Down

0 comments on commit d402506

Please sign in to comment.