Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dt-python-consist…
Browse files Browse the repository at this point in the history
…ency

Conflicts:
	python/pyspark/mllib/tree.py
(not a real conflict, merged)
  • Loading branch information
jkbradley committed Aug 6, 2014
2 parents a0d7dbe + e537b33 commit 6f7edf8
Show file tree
Hide file tree
Showing 80 changed files with 1,225 additions and 491 deletions.
29 changes: 7 additions & 22 deletions bin/beeline
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,14 @@
# limitations under the License.
#

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
#
# Shell script for starting BeeLine

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Enter posix mode for bash
set -o posix

# Compute classpath using external script
classpath_output=$($FWDIR/bin/compute-classpath.sh)
if [[ "$?" != "0" ]]; then
echo "$classpath_output"
exit 1
else
CLASSPATH=$classpath_output
fi
# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

CLASS="org.apache.hive.beeline.BeeLine"
exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
exec "$FWDIR/bin/spark-class" $CLASS "$@"
66 changes: 62 additions & 4 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,72 @@
# Enter posix mode for bash
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/spark-sql [options]"
function usage {
echo "Usage: ./sbin/spark-sql [options] [cli option]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
pattern+="\|Spark Command: "
pattern+="\|--help"
pattern+="\|======="

$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
echo
echo "CLI options:"
$FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
}

function ensure_arg_number {
arg_number=$1
at_least=$2

if [[ $arg_number -lt $at_least ]]; then
usage
exit 1
fi
}

if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
usage
exit 0
fi

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
CLI_ARGS=()
SUBMISSION_ARGS=()

while (($#)); do
case $1 in
-d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=($1); shift
;;

-e)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=(\"$1\"); shift
;;

-s | --silent)
CLI_ARGS+=($1); shift
;;

-v | --verbose)
# Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
CLI_ARGS+=($1)
SUBMISSION_ARGS+=($1); shift
;;

*)
SUBMISSION_ARGS+=($1); shift
;;
esac
done

eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import com.google.common.io.Files

import org.apache.spark.util.Utils

private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
private[spark] class HttpFileServer(
securityManager: SecurityManager,
requestedPort: Int = 0)
extends Logging {

var baseDir : File = null
var fileDir : File = null
Expand All @@ -38,7 +41,7 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir, securityManager)
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
Expand Down
88 changes: 54 additions & 34 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.bio.SocketConnector
Expand All @@ -41,48 +41,68 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
extends Logging {
private[spark] class HttpServer(
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
serverName: String = "HTTP server")
extends Logging {

private var server: Server = null
private var port: Int = -1
private var port: Int = requestedPort

def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(0)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
port = server.getConnectors()(0).getLocalPort()
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
server = actualServer
port = actualPort
}
}

/**
* Actually start the HTTP server on the given port.
*
* Note that this is only best effort in the sense that we may end up binding to a nearby port
* in the event of port collision. Return the bound server and the actual port used.
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
val actualPort = server.getConnectors()(0).getLocalPort

(server, actualPort)
}

/**
* Setup Jetty to the HashLoginService using a single user with our
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
Expand Down Expand Up @@ -134,7 +154,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
return "http://" + Utils.localIpAddress + ":" + port
"http://" + Utils.localIpAddress + ":" + port
}
}
}
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ private[spark] object SparkConf {
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
isSparkPortConf(name)
}

/**
* Return whether the given config is a Spark port config.
*/
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
}
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.net.Socket

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.concurrent.Await
import scala.util.Properties

import akka.actor._
Expand Down Expand Up @@ -151,10 +150,10 @@ object SparkEnv extends Logging {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
securityManager = securityManager)

// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.driver.port to it.
if (isDriver && port == 0) {
conf.set("spark.driver.port", boundPort.toString)
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to.
if (isDriver) {
conf.set("spark.driver.port", boundPort.toString)
}

// Create an instance of the class named by the given Java system property, or by
Expand Down Expand Up @@ -222,7 +221,8 @@ object SparkEnv extends Logging {

val httpFileServer =
if (isDriver) {
val server = new HttpFileServer(securityManager)
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {

private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir, securityManager)
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ object Client {
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)

// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
var inSparkOpts = true
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r

// Delineates parsing of Spark options from parsing of user options.
parse(opts)
Expand Down Expand Up @@ -322,33 +323,21 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
verbose = true
parse(tail)

case EQ_SEPARATED_OPT(opt, value) :: tail =>
parse(opt :: value :: tail)

case value :: tail if value.startsWith("-") =>
SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")

case value :: tail =>
if (inSparkOpts) {
value match {
// convert --foo=bar to --foo bar
case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
val parts = v.split("=")
parse(Seq(parts(0), parts(1)) ++ tail)
case v if v.startsWith("-") =>
val errMessage = s"Unrecognized option '$value'."
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource =
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
Utils.resolveURI(v).toString
} else {
v
}
inSparkOpts = false
isPython = SparkSubmit.isPython(v)
parse(tail)
primaryResource =
if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
Utils.resolveURI(value).toString
} else {
value
}
} else {
if (!value.isEmpty) {
childArgs += value
}
parse(tail)
}
isPython = SparkSubmit.isPython(value)
childArgs ++= tail

case Nil =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {

val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
Expand Down
Loading

0 comments on commit 6f7edf8

Please sign in to comment.