Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into hive-distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed May 5, 2014
2 parents 32f6826 + f2eb070 commit c16bbfd
Show file tree
Hide file tree
Showing 143 changed files with 1,520 additions and 268 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,9 @@ unit-tests.log
/lib/
rat-results.txt
scalastyle.txt

# For Hive
metastore_db/
metastore/
warehouse/
TempStatsStore/
15 changes: 15 additions & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"

ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

if [ -n "${JAVA_HOME}" ]; then
JAR_CMD="${JAVA_HOME}/bin/jar"
else
JAR_CMD="jar"
fi

# First check if we have a dependencies jar. If so, include binary classes with the deps jar
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
Expand All @@ -44,6 +50,7 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"

DEPS_ASSEMBLY_JAR=$(ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar 2>/dev/null)
CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR"
Expand All @@ -54,6 +61,14 @@ else
else
ASSEMBLY_JAR=$(ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*.jar 2>/dev/null)
fi
jar_error_check=$($JAR_CMD -tf $ASSEMBLY_JAR org/apache/spark/SparkContext 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
echo "Loading Spark jar with '$JAR_CMD' failed. "
echo "This is likely because Spark was compiled with Java 7 and run "
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark "
echo "or build Spark with Java 6."
exit 1
fi
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
fi

Expand Down
9 changes: 8 additions & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ if [ -e "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar ]; then
fi

# Compute classpath using external script
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
classpath_output=$($FWDIR/bin/compute-classpath.sh)
if [[ "$?" != "0" ]]; then
echo "$classpath_output"
exit 1
else
CLASSPATH=$classpath_output
fi

if [[ "$1" =~ org.apache.spark.tools.* ]]; then
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
fi
Expand Down
4 changes: 2 additions & 2 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#

export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
ORIG_ARGS=$@
ORIG_ARGS=("$@")

while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
Expand All @@ -39,5 +39,5 @@ if [ ! -z $DRIVER_MEMORY ] && [ ! -z $DEPLOY_MODE ] && [ $DEPLOY_MODE = "client"
export SPARK_MEM=$DRIVER_MEMORY
fi

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit $ORIG_ARGS
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"

4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
val worker = env.createPythonWorker(pythonExec, envVars.toMap)
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)

// Ensure worker socket is closed on task completion. Closing sockets is idempotent.
context.addOnCompleteCallback(() =>
try {
worker.close()
} catch {
case e: Exception => logWarning("Failed to close worker socket", e)
}
)

@volatile var readerException: Exception = null

Expand Down
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,36 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkException}

import scala.collection.JavaConversions._

/**
* Contains util methods to interact with Hadoop from Spark.
*/
class SparkHadoopUtil {
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
/**
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
* (distributed to child threads), used for authenticating HDFS and YARN calls.
*
* IMPORTANT NOTE: If this function is going to be called repeated in the same process
* you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
*/
def runAsSparkUser(func: () => Unit) {
val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER) {
logDebug("running as user: " + user)
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} else {
logDebug("running as SPARK_UNKNOWN_USER")
func()
}
}
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy

import java.io.{File, PrintStream}
import java.lang.reflect.InvocationTargetException
import java.net.{URI, URL}

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
Expand Down Expand Up @@ -137,7 +138,7 @@ object SparkSubmit {
throw new Exception(msg)
}
}

// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

Expand Down Expand Up @@ -253,7 +254,14 @@ object SparkSubmit {

val mainClass = Class.forName(childMainClass, true, loader)
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
mainMethod.invoke(null, childArgs.toArray)
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case e: InvocationTargetException => e.getCause match {
case cause: Throwable => throw cause
case null => throw e
}
}
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
| --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
| --class CLASS_NAME Name of your app's main class (required for Java apps).
| --arg ARG Argument to be passed to your application's main class. This
| option can be specified multiple times for multiple args.
| --name NAME The name of your application (Default: 'Spark').
| --jars JARS A comma-separated list of local jars to include on the
| driver classpath and that SparkContext.addJar will work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ private[spark] class AppClient(
if (registered) {
retryTimer.cancel()
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
markDead()
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
Expand Down Expand Up @@ -126,8 +125,7 @@ private[spark] class AppClient(
listener.connected(appId)

case ApplicationRemoved(message) =>
logError("Master removed our application: %s; stopping client".format(message))
markDisconnected()
markDead("Master removed our application: %s".format(message))
context.stop(self)

case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
Expand Down Expand Up @@ -158,7 +156,7 @@ private[spark] class AppClient(
logWarning(s"Could not connect to $address: $cause")

case StopAppClient =>
markDead()
markDead("Application has been stopped.")
sender ! true
context.stop(self)
}
Expand All @@ -173,9 +171,9 @@ private[spark] class AppClient(
}
}

def markDead() {
def markDead(reason: String) {
if (!alreadyDead) {
listener.dead()
listener.dead(reason)
alreadyDead = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ private[spark] trait AppClientListener {
/** Disconnection may be a temporary state, as we fail over to a new Master. */
def disconnected(): Unit

/** Dead means that we couldn't find any Masters to connect to, and have given up. */
def dead(): Unit
/** An application death is an unrecoverable failure condition. */
def dead(reason: String): Unit

def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ private[spark] object TestClient {
System.exit(0)
}

def dead() {
logInfo("Could not connect to master")
def dead(reason: String) {
logInfo("Application died with error: " + reason)
System.exit(0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ private[spark] class Worker(
// TTL for app folders/data; after TTL expires it will be cleaned up
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

// Index into masterUrls that we're currently trying to register with.
var masterIndex = 0

val masterLock: Object = new Object()
var master: ActorSelection = null
Expand Down Expand Up @@ -244,7 +242,7 @@ private[spark] class Worker(
}
} catch {
case e: Exception => {
logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._

import org.apache.spark.{SecurityManager, SparkConf, Logging}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
Expand Down Expand Up @@ -94,25 +95,30 @@ private[spark] class CoarseGrainedExecutorBackend(

private[spark] object CoarseGrainedExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
workerUrl: Option[String]) {
// Debug code
Utils.checkHost(hostname)

val conf = new SparkConf
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach{ url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
workerUrl: Option[String]) {

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)

val conf = new SparkConf
// Create a new ActorSystem to run the backend, because we can't create a
// SparkEnv / Executor before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach {
url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
}
actorSystem.awaitTermination()

}
actorSystem.awaitTermination()
}

def main(args: Array[String]) {
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
Expand Down Expand Up @@ -172,7 +170,7 @@ private[spark] class Executor(
}
}

override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run() {
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import com.google.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}

import org.apache.spark.Logging
import org.apache.spark.TaskState
import org.apache.spark.{Logging, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil

private[spark] class MesosExecutorBackend
extends MesosExecutor
Expand Down Expand Up @@ -95,9 +95,11 @@ private[spark] class MesosExecutorBackend
*/
private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
SparkHadoopUtil.get.runAsSparkUser { () =>
MesosNativeLibrary.load()
// Create a new Executor and start it running
val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
}
}
}
Loading

0 comments on commit c16bbfd

Please sign in to comment.