Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-10117
Browse files Browse the repository at this point in the history
  • Loading branch information
Lewuathe committed Sep 9, 2015
2 parents 9ce63c7 + c1bc4f4 commit 21600a4
Show file tree
Hide file tree
Showing 109 changed files with 1,231 additions and 307 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
MLlib for machine learning, GraphX for graph processing,
Expand Down Expand Up @@ -94,5 +94,5 @@ distribution.

## Configuration

Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html)
Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)
in the online documentation for an overview on how to configure Spark.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] private[spark] (
@transient initialValue: R,
initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
@transient partitions: Int,
@transient rdd: RDD[_ <: Product2[K, V]],
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
class SparkHadoopWriter(@transient jobConf: JobConf)
class SparkHadoopWriter(jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.util.control.NonFatal

private[spark] class PythonRDD(
@transient parent: RDD[_],
parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
Expand Down Expand Up @@ -785,7 +785,7 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By
* Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
* collects a list of pickled strings that we pass to Python through a socket.
*/
private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
private class PythonAccumulatorParam(@transient private val serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {

Utils.checkHost(serverHost, "Expected hostname")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
*/
@Experimental
class PortableDataStream(
@transient isplit: CombineFileSplit,
@transient context: TaskAttemptContext,
isplit: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
new RpcResponseCallback {
override def onSuccess(response: Array[Byte]): Unit = {
logTrace(s"Successfully uploaded block $blockId")
result.success()
result.success((): Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ private[spark] class BinaryFileRDD[T](
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
configurable.setConf(getConf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends P
}

private[spark]
class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {

@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
Expand Down Expand Up @@ -64,7 +64,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
*/
private[spark] def removeBlocks() {
blockIds.foreach { blockId =>
sc.env.blockManager.master.removeBlock(blockId)
sparkContext.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.util.Utils
private[spark]
class CartesianPartition(
idx: Int,
@transient rdd1: RDD[_],
@transient rdd2: RDD[_],
@transient private val rdd1: RDD[_],
@transient private val rdd2: RDD[_],
s1Index: Int,
s2Index: Int
) extends Partition {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition
/**
* An RDD that recovers checkpointed data from storage.
*/
private[spark] abstract class CheckpointRDD[T: ClassTag](@transient sc: SparkContext)
private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
extends RDD[T](sc, Nil) {

// CheckpointRDD should not be checkpointed again
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.storage.StorageLevel
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit)
extends Partition {

val inputSplit = new SerializableWritable[InputSplit](s)
Expand Down Expand Up @@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
*/
@DeveloperApi
class HadoopRDD[K, V](
@transient sc: SparkContext,
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand All @@ -109,7 +109,7 @@ class HadoopRDD[K, V](
extends RDD[(K, V)](sc, Nil) with Logging {

if (initLocalJobConfFuncOpt.isDefined) {
sc.clean(initLocalJobConfFuncOpt.get)
sparkContext.clean(initLocalJobConfFuncOpt.get)
}

def this(
Expand Down Expand Up @@ -137,7 +137,7 @@ class HadoopRDD[K, V](
// used to build JobTracker ID
private val createTime = new Date()

private val shouldCloneJobConf = sc.conf.getBoolean("spark.hadoop.cloneConf", false)
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.storage.RDDBlockId
* @param numPartitions the number of partitions in the checkpointed RDD
*/
private[spark] class LocalCheckpointRDD[T: ClassTag](
@transient sc: SparkContext,
sc: SparkContext,
rddId: Int,
numPartitions: Int)
extends CheckpointRDD[T](sc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {

/**
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.storage.StorageLevel
private[spark] class NewHadoopPartition(
rddId: Int,
val index: Int,
@transient rawSplit: InputSplit with Writable)
rawSplit: InputSplit with Writable)
extends Partition {

val serializableHadoopSplit = new SerializableWritable(rawSplit)
Expand Down Expand Up @@ -68,14 +68,14 @@ class NewHadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
@transient conf: Configuration)
@transient private val _conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {

// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
// private val serializableConf = new SerializableWritable(conf)
private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
// private val serializableConf = new SerializableWritable(_conf)

private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
Expand All @@ -88,10 +88,10 @@ class NewHadoopRDD[K, V](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
configurable.setConf(_conf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
val jobContext = newJobContext(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
Expand Down Expand Up @@ -262,18 +262,18 @@ private[spark] class WholeTextFileRDD(
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
keyClass: Class[String],
valueClass: Class[String],
@transient conf: Configuration,
conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
configurable.setConf(getConf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
}

private[spark] class ParallelCollectionRDD[T: ClassTag](
@transient sc: SparkContext,
@transient data: Seq[T],
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Par
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {

@transient
Expand All @@ -55,8 +55,8 @@ private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterF
*/
@DeveloperApi
class PartitionPruningRDD[T: ClassTag](
@transient prev: RDD[T],
@transient partitionFilterFunc: Int => Boolean)
prev: RDD[T],
partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
@transient preservesPartitioning: Boolean,
@transient seed: Long = Utils.random.nextLong)
preservesPartitioning: Boolean,
@transient private val seed: Long = Utils.random.nextLong)
extends RDD[U](prev) {

@transient override val partitioner = if (preservesPartitioning) prev.partitioner else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] object CheckpointState extends Enumeration {
* as well as, manages the post-checkpoint state by providing the updated partitions,
* iterator and preferred locations of the checkpointed RDD.
*/
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends Serializable {

import CheckpointState._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* An RDD that reads from checkpoint files previously written to reliable storage.
*/
private[spark] class ReliableCheckpointRDD[T: ClassTag](
@transient sc: SparkContext,
sc: SparkContext,
val checkpointPath: String)
extends CheckpointRDD[T](sc) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.SerializableConfiguration
* An implementation of checkpointing that writes the RDD data to reliable storage.
* This allows drivers to be restarted on failure with previously computed state.
*/
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {

// The directory to which the associated RDD has been checkpointed to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Ut
private[spark] class SqlNewHadoopPartition(
rddId: Int,
val index: Int,
@transient rawSplit: InputSplit with Writable)
rawSplit: InputSplit with Writable)
extends SparkPartition {

val serializableHadoopSplit = new SerializableWritable(rawSplit)
Expand All @@ -61,9 +61,9 @@ private[spark] class SqlNewHadoopPartition(
* changes based on [[org.apache.spark.rdd.HadoopRDD]].
*/
private[spark] class SqlNewHadoopRDD[V: ClassTag](
@transient sc : SparkContext,
sc : SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
@transient initDriverSideJobFuncOpt: Option[Job => Unit],
@transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[Void, V]],
valueClass: Class[V])
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import org.apache.spark.util.Utils
*/
private[spark] class UnionPartition[T: ClassTag](
idx: Int,
@transient rdd: RDD[T],
@transient private val rdd: RDD[T],
val parentRddIndex: Int,
@transient parentRddPartitionIndex: Int)
@transient private val parentRddPartitionIndex: Int)
extends Partition {

var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils

private[spark] class ZippedPartitionsPartition(
idx: Int,
@transient rdds: Seq[RDD[_]],
@transient private val rdds: Seq[RDD[_]],
@transient val preferredLocations: Seq[String])
extends Partition {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
* @tparam T parent RDD item type
*/
private[spark]
class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {
class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {

/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.{SparkException, Logging, SparkConf}
/**
* A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
*/
private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {

private[this] val maxRetries = RpcUtils.numRetries(conf)
Expand Down
Loading

0 comments on commit 21600a4

Please sign in to comment.