Skip to content

Commit

Permalink
[SPARK-12588] Remove HttpBroadcast in Spark 2.0.
Browse files Browse the repository at this point in the history
We switched to TorrentBroadcast in Spark 1.1, and HttpBroadcast has been undocumented since then. It's time to remove it in Spark 2.0.

Author: Reynold Xin <rxin@databricks.com>

Closes #10531 from rxin/SPARK-12588.
  • Loading branch information
rxin committed Dec 31, 2015
1 parent f76ee10 commit ee8f8d3
Show file tree
Hide file tree
Showing 12 changed files with 22 additions and 491 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* An interface for all the broadcast implementations in Spark (to allow
* multiple broadcast implementations). SparkContext uses a user-specified
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
@DeveloperApi
trait BroadcastFactory {
private[spark] trait BroadcastFactory {

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong

import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf, SecurityManager}


private[spark] class BroadcastManager(
val isDriver: Boolean,
Expand All @@ -39,15 +39,8 @@ private[spark] class BroadcastManager(
private def initialize() {
synchronized {
if (!initialized) {
val broadcastFactoryClass =
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")

broadcastFactory =
Utils.classForName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

// Initialize appropriate BroadcastFactory and BroadcastObject
broadcastFactory = new TorrentBroadcastFactory
broadcastFactory.initialize(isDriver, conf, securityManager)

initialized = true
}
}
Expand Down
269 changes: 0 additions & 269 deletions core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.io.ByteArrayChunkOutputStream
* BlockManager, ready for other executors to fetch from.
*
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
* broadcast data (one per executor).
*
* When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
* protocol to do a distributed transfer of the broadcasted data to the executors. Refer to
* [[org.apache.spark.broadcast.TorrentBroadcast]] for more details.
*/
class TorrentBroadcastFactory extends BroadcastFactory {
private[spark] class TorrentBroadcastFactory extends BroadcastFactory {

override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.roaringbitmap.RoaringBitmap

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
Expand Down Expand Up @@ -107,7 +106,6 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer())
kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
Expand Down

0 comments on commit ee8f8d3

Please sign in to comment.