Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
* Actually get the broadcasted value. Concrete implementations of Broadcast class must
* define their own way to get the value.
*/
private[spark] def getValue(): T
protected def getValue(): T

/**
* Actually unpersist the broadcasted value on the executors. Concrete implementations of
* Broadcast class must define their own logic to unpersist their own data.
*/
private[spark] def doUnpersist(blocking: Boolean)
protected def doUnpersist(blocking: Boolean)

/**
* Actually destroy all data and metadata related to this broadcast variable.
* Implementation of Broadcast class must define their own logic to destroy their own
* state.
*/
private[spark] def doDestroy(blocking: Boolean)
protected def doDestroy(blocking: Boolean)

/** Check if this broadcast is valid. If not valid, exception is thrown. */
private[spark] def assertValid() {
protected def assertValid() {
if (!_isValid) {
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ private[spark] class HttpBroadcast[T: ClassTag](
@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

def getValue = value_
override protected def getValue() = value_

val blockId = BroadcastBlockId(id)
private val blockId = BroadcastBlockId(id)

/*
* Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
Expand All @@ -60,14 +60,14 @@ private[spark] class HttpBroadcast[T: ClassTag](
/**
* Remove all persisted state associated with this HTTP broadcast on the executors.
*/
def doUnpersist(blocking: Boolean) {
override protected def doUnpersist(blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
}

/**
* Remove all persisted state associated with this HTTP broadcast on the executors and driver.
*/
def doDestroy(blocking: Boolean) {
override protected def doDestroy(blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
}

Expand Down Expand Up @@ -102,7 +102,7 @@ private[spark] class HttpBroadcast[T: ClassTag](
}
}

private[spark] object HttpBroadcast extends Logging {
private[broadcast] object HttpBroadcast extends Logging {
private var initialized = false
private var broadcastDir: File = null
private var compress: Boolean = false
Expand Down Expand Up @@ -160,7 +160,7 @@ private[spark] object HttpBroadcast extends Logging {

def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name)

def write(id: Long, value: Any) {
private def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
Expand All @@ -176,7 +176,7 @@ private[spark] object HttpBroadcast extends Logging {
files += file
}

def read[T: ClassTag](id: Long): T = {
private def read[T: ClassTag](id: Long): T = {
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
val url = serverUri + "/" + BroadcastBlockId(id).name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
* [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
*/
class HttpBroadcastFactory extends BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
HttpBroadcast.initialize(isDriver, conf, securityMgr)
}

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)

def stop() { HttpBroadcast.stop() }
override def stop() { HttpBroadcast.stop() }

/**
* Remove all persisted state associated with the HTTP broadcast with the given ID.
* @param removeFromDriver Whether to remove state from the driver
* @param blocking Whether to block until unbroadcasted
*/
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver, blocking)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.broadcast
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}

import scala.reflect.ClassTag
import scala.math
import scala.util.Random

import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
Expand Down Expand Up @@ -49,19 +48,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

def getValue = value_
override protected def getValue() = value_

val broadcastId = BroadcastBlockId(id)
private val broadcastId = BroadcastBlockId(id)

TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}

@transient var arrayOfBlocks: Array[TorrentBlock] = null
@transient var totalBlocks = -1
@transient var totalBytes = -1
@transient var hasBlocks = 0
@transient private var arrayOfBlocks: Array[TorrentBlock] = null
@transient private var totalBlocks = -1
@transient private var totalBytes = -1
@transient private var hasBlocks = 0

if (!isLocal) {
sendBroadcast()
Expand All @@ -70,19 +69,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
/**
* Remove all persisted state associated with this Torrent broadcast on the executors.
*/
def doUnpersist(blocking: Boolean) {
override protected def doUnpersist(blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
}

/**
* Remove all persisted state associated with this Torrent broadcast on the executors
* and driver.
*/
def doDestroy(blocking: Boolean) {
override protected def doDestroy(blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
}

def sendBroadcast() {
private def sendBroadcast() {
val tInfo = TorrentBroadcast.blockifyObject(value_)
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
Expand Down Expand Up @@ -159,7 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
hasBlocks = 0
}

def receiveBroadcast(): Boolean = {
private def receiveBroadcast(): Boolean = {
// Receive meta-info about the size of broadcast data,
// the number of chunks it is divided into, etc.
val metaId = BroadcastBlockId(id, "meta")
Expand Down Expand Up @@ -211,7 +210,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](

}

private[spark] object TorrentBroadcast extends Logging {
private[broadcast] object TorrentBroadcast extends Logging {
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
private var initialized = false
private var conf: SparkConf = null
Expand Down Expand Up @@ -272,17 +271,19 @@ private[spark] object TorrentBroadcast extends Logging {
* Remove all persisted blocks associated with this torrent broadcast on the executors.
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}
}

private[spark] case class TorrentBlock(
private[broadcast] case class TorrentBlock(
blockID: Int,
byteArray: Array[Byte])
extends Serializable

private[spark] case class TorrentInfo(
private[broadcast] case class TorrentInfo(
@transient arrayOfBlocks: Array[TorrentBlock],
totalBlocks: Int,
totalBytes: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
*/
class TorrentBroadcastFactory extends BroadcastFactory {

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
TorrentBroadcast.initialize(isDriver, conf)
}

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)

def stop() { TorrentBroadcast.stop() }
override def stop() { TorrentBroadcast.stop() }

/**
* Remove all persisted state associated with the torrent broadcast with the given ID.
* @param removeFromDriver Whether to remove state from the driver.
* @param blocking Whether to block until unbroadcasted
*/
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
}
}