Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
	sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
  • Loading branch information
yhuai committed Jul 16, 2014
2 parents b8b7db4 + c2048a5 commit 2e58dbd
Show file tree
Hide file tree
Showing 194 changed files with 1,728 additions and 398 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Apache Spark

Lightning-Fast Cluster Computing - <http://spark.apache.org/>
Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and structured
data processing, MLLib for machine learning, GraphX for graph processing,
and Spark Streaming.

<http://spark.apache.org/>


## Online Documentation
Expand Down Expand Up @@ -81,7 +88,7 @@ versions without YARN, use:
$ sbt/sbt -Dhadoop.version=2.0.0-mr1-cdh4.2.0 assembly

For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_YARN=true`:
with YARN, also set `-Pyarn`:

# Apache Hadoop 2.0.5-alpha
$ sbt/sbt -Dhadoop.version=2.0.5-alpha -Pyarn assembly
Expand Down
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
val backend = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
scheduler

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ private[spark] object TestUtils {
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val sourceFile = new JavaSourceFromString(className,
"public class " + className + " { @Override public String toString() { " +
"return \"" + value + "\";}}")
"public class " + className + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + value + "\"; }}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private[spark] class Master(
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
Expand Down Expand Up @@ -741,6 +742,10 @@ private[spark] class Master(
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
completedDrivers += driver
persistenceEngine.removeDriver(driver)
driver.state = finalState
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.io
import java.io.{InputStream, OutputStream}

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -59,6 +60,27 @@ private[spark] object CompressionCodec {
}


/**
* :: DeveloperApi ::
* LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
* Block size can be configured by `spark.io.compression.lz4.block.size`.
*
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
* application.
*/
@DeveloperApi
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768)
new LZ4BlockOutputStream(s, blockSize)
}

override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
}


/**
* :: DeveloperApi ::
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]]
* will provide much better performance.
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
// groupByKey shouldn't use map side combine because map side combine does not
Expand All @@ -373,9 +373,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]]
* will provide much better performance.
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
groupByKey(new HashPartitioner(numPartitions))
Expand Down Expand Up @@ -462,9 +462,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]]
* will provide much better performance,
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupByKey(): RDD[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(self))
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -509,20 +509,32 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
groupBy[K](f, defaultPartitioner(this))

/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
groupBy(f, new HashPartitioner(numPartitions))

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend {

def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
def isReady(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl(
}
}

override def postStartHook() {
waitBackendReady()
}

override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
Expand Down Expand Up @@ -437,6 +441,17 @@ private[spark] class TaskSchedulerImpl(

// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None

private def waitBackendReady(): Unit = {
if (backend.isReady) {
return
}
while (!backend.isReady) {
synchronized {
this.wait(100)
}
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {

case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage

case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String)
extends CoarseGrainedClusterMessage

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.ui.JettyUtils

/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
Expand All @@ -46,9 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
var totalExpectedExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
// Submit tasks only after (registered executors / total expected executors)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
if (minRegisteredRatio > 1) minRegisteredRatio = 1
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
val createTime = System.currentTimeMillis()
var ready = if (minRegisteredRatio <= 0) true else false

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
Expand Down Expand Up @@ -83,6 +94,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
ready = true
logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
", minRegisteredExecutorsRatio: " + minRegisteredRatio)
}
makeOffers()
}

Expand Down Expand Up @@ -120,6 +137,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
removeExecutor(executorId, reason)
sender ! true

case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
sender ! true
case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_,
"remote Akka client disassociated"))
Expand Down Expand Up @@ -247,6 +267,33 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
}
}

override def isReady(): Boolean = {
if (ready) {
return true
}
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
ready = true
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
return true
}
false
}

// Add filters to the SparkUI
def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) {
if (proxyBase != null && proxyBase.nonEmpty) {
System.setProperty("spark.ui.proxyBase", proxyBase)
}

if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
conf.set("spark.ui.filters", filterName)
conf.set(s"spark.$filterName.params", filterParams)
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
}
}
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(

override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
memory: Int) {
totalExpectedExecutors.addAndGet(1)
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,16 @@ private[spark] object UIUtils extends Logging {
}

// Yarn has to go through a proxy so the base uri is provided and has to be on all links
val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
def uiRoot: String = {
if (System.getenv("APPLICATION_WEB_PROXY_BASE") != null) {
System.getenv("APPLICATION_WEB_PROXY_BASE")
} else if (System.getProperty("spark.ui.proxyBase") != null) {
System.getProperty("spark.ui.proxyBase")
}
else {
""
}
}

def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource

Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,5 @@ private[spark] class FileLogger(
def stop() {
hadoopDataStream.foreach(_.close())
writer.foreach(_.close())
fileSystem.close()
}
}
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,12 @@ private[spark] object Utils extends Logging {
*/
def getCallSite: CallSite = {
val trace = Thread.currentThread.getStackTrace()
.filterNot(_.getMethodName.contains("getStackTrace"))
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
// frames. This is intended to ensure that we don't crash in these situations by
// ignoring any frames that we can't examine.
(ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
}

// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,31 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
}

test("object files of classes from a JAR") {
val original = Thread.currentThread().getContextClassLoader
val className = "FileSuiteObjectFileTest"
val jar = TestUtils.createJarWithClasses(Seq(className))
val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
Thread.currentThread().setContextClassLoader(loader)
try {
sc = new SparkContext("local", "test")
val objs = sc.makeRDD(1 to 3).map { x =>
val loader = Thread.currentThread().getContextClassLoader
Class.forName(className, true, loader).newInstance()
}
val outputDir = new File(tempDir, "output").getAbsolutePath
objs.saveAsObjectFile(outputDir)
// Try reading the output back as an object file
val ct = reflect.ClassTag[Any](Class.forName(className, true, loader))
val output = sc.objectFile[Any](outputDir)
assert(output.collect().size === 3)
assert(output.collect().head.getClass.getName === className)
}
finally {
Thread.currentThread().setContextClassLoader(original)
}
}

test("write SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
sc = new SparkContext("local", "test")
Expand Down
Loading

0 comments on commit 2e58dbd

Please sign in to comment.