Skip to content

Commit

Permalink
Merge pull request #3 from apache/master
Browse files Browse the repository at this point in the history
Merge upstream updates
  • Loading branch information
nchammas committed Jun 20, 2014
2 parents 89fde08 + 171ebb3 commit 2e4fe00
Show file tree
Hide file tree
Showing 155 changed files with 5,996 additions and 1,116 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ spark-env.sh.template
log4j-defaults.properties
sorttable.js
.*txt
.*json
.*data
.*log
cloudpickle.py
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,24 @@ span.kill-link {
span.kill-link a {
color: gray;
}

span.expand-details {
font-size: 10pt;
cursor: pointer;
color: grey;
float: right;
}

.stage-details {
max-height: 100px;
overflow-y: auto;
margin: 0;
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
}

.stage-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
}
19 changes: 10 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -224,7 +224,6 @@ class SparkContext(config: SparkConf) extends Logging {

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
val env = SparkEnv.get
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
Expand Down Expand Up @@ -1036,9 +1035,11 @@ class SparkContext(config: SparkConf) extends Logging {
* Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version.
*/
private[spark] def getCallSite(): String = {
val defaultCallSite = Utils.getCallSiteInfo
Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
private[spark] def getCallSite(): CallSite = {
Option(getLocalProperty("externalCallSite")) match {
case Some(callSite) => CallSite(callSite, long = "")
case None => Utils.getCallSite
}
}

/**
Expand All @@ -1058,11 +1059,11 @@ class SparkContext(config: SparkConf) extends Logging {
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
logInfo("Starting job: " + callSite.short)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}

Expand Down Expand Up @@ -1143,11 +1144,11 @@ class SparkContext(config: SparkConf) extends Logging {
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
val callSite = getCallSite
logInfo("Starting job: " + callSite)
logInfo("Starting job: " + callSite.short)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}

Expand Down
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
other2: JavaPairRDD[K, W2],
other3: JavaPairRDD[K, W3],
partitioner: Partitioner)
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, partitioner)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
Expand All @@ -558,6 +570,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
other2: JavaPairRDD[K, W2],
other3: JavaPairRDD[K, W3])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
Expand All @@ -574,6 +597,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
other2: JavaPairRDD[K, W2],
other3: JavaPairRDD[K, W3],
numPartitions: Int)
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, numPartitions)))

/** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
Expand All @@ -583,6 +618,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))

/** Alias for cogroup. */
def groupWith[W1, W2, W3](other1: JavaPairRDD[K, W1],
other2: JavaPairRDD[K, W2],
other3: JavaPairRDD[K, W3])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
fromRDD(cogroupResult3ToJava(rdd.groupWith(other1, other2, other3)))

/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
Expand Down Expand Up @@ -786,6 +828,15 @@ object JavaPairRDD {
.mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
}

private[spark]
def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3](
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))])
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = {
rddToPairRDDFunctions(rdd)
.mapValues(x =>
(asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), asJavaIterable(x._4)))
}

def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
new JavaPairRDD[K, V](rdd)
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.api.java

import java.util.Comparator

import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -172,6 +175,19 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
rdd.setName(name)
this
}

/**
* Return this RDD sorted by the given key function.
*/
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x)
import com.google.common.collect.Ordering // shadows scala.math.Ordering
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
implicit val ctag: ClassTag[S] = fakeClassTag
wrapRDD(rdd.sortBy(fn, ascending, numPartitions))
}

}

object JavaRDD {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import scala.collection.mutable.ListBuffer

import org.apache.log4j.Level

import org.apache.spark.util.MemoryParam

/**
* Command-line parser for the driver client.
*/
Expand Down Expand Up @@ -51,8 +53,8 @@ private[spark] class ClientArguments(args: Array[String]) {
cores = value.toInt
parse(tail)

case ("--memory" | "-m") :: value :: tail =>
memory = value.toInt
case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
memory = value
parse(tail)

case ("--supervise" | "-s") :: tail =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

type ExecutorState = Value

def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import java.util.Date

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import akka.actor.ActorRef

Expand All @@ -36,6 +37,7 @@ private[spark] class ApplicationInfo(

@transient var state: ApplicationState.Value = _
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
Expand All @@ -51,6 +53,7 @@ private[spark] class ApplicationInfo(
endTime = -1L
appSource = new ApplicationSource(this)
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorInfo]
}

private def newExecutorId(useID: Option[Int] = None): Int = {
Expand All @@ -74,6 +77,7 @@ private[spark] class ApplicationInfo(

def removeExecutor(exec: ExecutorInfo) {
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
coresGranted -= exec.cores
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,19 @@ private[spark] class ExecutorInfo(
}

def fullId: String = application.id + "/" + id

override def equals(other: Any): Boolean = {
other match {
case info: ExecutorInfo =>
fullId == info.fullId &&
worker.id == info.worker.id &&
cores == info.cores &&
memory == info.memory
case _ => false
}
}

override def toString: String = fullId

override def hashCode: Int = toString.hashCode()
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,11 @@ private[spark] class Master(
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)

val normalExit = exitStatus.exists(_ == 0)
// Only retry certain number of times so we don't go into an infinite loop.
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
} else if (!normalExit) {
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo, ApplicationState.FAILED)
Expand Down
Loading

0 comments on commit 2e4fe00

Please sign in to comment.