Skip to content

Commit

Permalink
[SPARK-16967] move mesos to module
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Move Mesos code into a mvn module

## How was this patch tested?

unit tests
manually submitting a client mode and cluster mode job
spark/mesos integration test suite

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #14637 from mgummelt/mesos-module.
  • Loading branch information
Michael Gummelt authored and Marcelo Vanzin committed Aug 26, 2016
1 parent c0949dc commit 8e5475b
Show file tree
Hide file tree
Showing 43 changed files with 305 additions and 118 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -44,7 +44,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
10 changes: 10 additions & 0 deletions assembly/pom.xml
Expand Up @@ -138,6 +138,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>mesos</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mesos_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
5 changes: 0 additions & 5 deletions core/pom.xml
Expand Up @@ -215,11 +215,6 @@
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
<classifier>${mesos.classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
18 changes: 1 addition & 17 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -42,7 +42,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
Expand All @@ -56,7 +55,6 @@ import org.apache.spark.rdd._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
Expand Down Expand Up @@ -2512,18 +2510,6 @@ object SparkContext extends Logging {
}
(backend, scheduler)

case MESOS_REGEX(mesosUrl) =>
MesosNativeLibrary.load()
val scheduler = new TaskSchedulerImpl(sc)
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
val backend = if (coarseGrained) {
new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
} else {
new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl)
}
scheduler.initialize(backend)
(backend, scheduler)

case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
Expand All @@ -2545,7 +2531,7 @@ object SparkContext extends Logging {
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
if (serviceLoaders.size > 1) {
throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
s"for the url $url:")
Expand All @@ -2566,8 +2552,6 @@ private object SparkMasterRegex {
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url
val MESOS_REGEX = """mesos://(.*)""".r
}

/**
Expand Down
20 changes: 0 additions & 20 deletions core/src/main/scala/org/apache/spark/TaskState.scala
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark

import org.apache.mesos.Protos.{TaskState => MesosTaskState}

private[spark] object TaskState extends Enumeration {

val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
Expand All @@ -30,22 +28,4 @@ private[spark] object TaskState extends Enumeration {
def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == state)

def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains(state)

def toMesos(state: TaskState): MesosTaskState = state match {
case LAUNCHING => MesosTaskState.TASK_STARTING
case RUNNING => MesosTaskState.TASK_RUNNING
case FINISHED => MesosTaskState.TASK_FINISHED
case FAILED => MesosTaskState.TASK_FAILED
case KILLED => MesosTaskState.TASK_KILLED
case LOST => MesosTaskState.TASK_LOST
}

def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING
case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
case MesosTaskState.TASK_FINISHED => FINISHED
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
}
}
Expand Up @@ -22,7 +22,6 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend


Expand Down Expand Up @@ -130,31 +129,4 @@ class SparkContextSchedulerCreationSuite
case _ => fail()
}
}

def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
try {
val sched = createTaskScheduler(master, "client", conf)
assert(sched.backend.getClass === expectedClass)
} catch {
case e: UnsatisfiedLinkError =>
assert(e.getMessage.contains("mesos"))
logWarning("Mesos not available, could not test actual Mesos scheduler creation")
case e: Throwable => fail(e)
}
}

test("mesos fine-grained") {
testMesos("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false)
}

test("mesos coarse-grained") {
testMesos("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
}

test("mesos with zookeeper") {
testMesos("mesos://zk://localhost:1234,localhost:2345",
classOf[MesosFineGrainedSchedulerBackend], coarse = false)
}

}
15 changes: 8 additions & 7 deletions dev/create-release/release-build.sh
Expand Up @@ -80,7 +80,7 @@ NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads
BASE_DIR=$(pwd)

MVN="build/mvn --force"
PUBLISH_PROFILES="-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
PUBLISH_PROFILES="$PUBLISH_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"

rm -rf spark
Expand Down Expand Up @@ -186,12 +186,13 @@ if [[ "$1" == "package" ]]; then

# We increment the Zinc port each time to avoid OOM's and other craziness if multiple builds
# share the same Zinc server.
make_binary_release "hadoop2.3" "-Psparkr -Phadoop-2.3 -Phive -Phive-thriftserver -Pyarn" "3033" &
make_binary_release "hadoop2.4" "-Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn" "3034" &
make_binary_release "hadoop2.6" "-Psparkr -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn" "3035" &
make_binary_release "hadoop2.7" "-Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn" "3036" &
make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn" "3037" &
make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn" "3038" &
FLAGS="-Psparkr -Phive -Phive-thriftserver -Pyarn -Pmesos"
make_binary_release "hadoop2.3" "-Phadoop2.3 $FLAGS" "3033" &
make_binary_release "hadoop2.4" "-Phadoop2.4 $FLAGS" "3034" &
make_binary_release "hadoop2.6" "-Phadoop2.6 $FLAGS" "3035" &
make_binary_release "hadoop2.7" "-Phadoop2.7 $FLAGS" "3036" &
make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn -Pmesos" "3037" &
make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn -Pmesos" "3038" &
wait
rm -rf spark-$SPARK_VERSION-bin-*/

Expand Down
2 changes: 1 addition & 1 deletion dev/lint-java
Expand Up @@ -20,7 +20,7 @@
SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)"

ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pyarn -Phive -Phive-thriftserver checkstyle:check | grep ERROR)
ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pmesos -Pyarn -Phive -Phive-thriftserver checkstyle:check | grep ERROR)

if test ! -z "$ERRORS"; then
echo -e "Checkstyle checks failed at following occurrences:\n$ERRORS"
Expand Down
2 changes: 1 addition & 1 deletion dev/mima
Expand Up @@ -24,7 +24,7 @@ set -e
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"

SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
SPARK_PROFILES="-Pmesos -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)"
OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"

Expand Down
1 change: 1 addition & 0 deletions dev/scalastyle
Expand Up @@ -22,6 +22,7 @@
ERRORS=$(echo -e "q\n" \
| build/sbt \
-Pkinesis-asl \
-Pmesos \
-Pyarn \
-Phive \
-Phive-thriftserver \
Expand Down
7 changes: 7 additions & 0 deletions dev/sparktestsupport/modules.py
Expand Up @@ -458,6 +458,13 @@ def __hash__(self):
]
)

mesos = Module(
name="mesos",
dependencies=[],
source_file_regexes=["mesos/"],
sbt_test_goals=["mesos/test"]
)

# The root module is a dummy module which is used to run all of the tests.
# No other modules should directly depend on this module.
root = Module(
Expand Down
2 changes: 1 addition & 1 deletion dev/test-dependencies.sh
Expand Up @@ -29,7 +29,7 @@ export LC_ALL=C
# TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.

# NOTE: These should match those in the release publishing script
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pyarn -Phive"
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pyarn -Phive"
MVN="build/mvn"
HADOOP_PROFILES=(
hadoop-2.2
Expand Down
24 changes: 14 additions & 10 deletions docs/building-spark.md
Expand Up @@ -50,7 +50,7 @@ To create a Spark distribution like those distributed by the
to be runnable, use `./dev/make-distribution.sh` in the project root directory. It can be configured
with Maven profile settings and so on like the direct Maven build. Example:

./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn
./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pmesos -Pyarn

For more information on usage, run `./dev/make-distribution.sh --help`

Expand Down Expand Up @@ -105,13 +105,17 @@ By default Spark will build with Hive 1.2.1 bindings.

## Packaging without Hadoop Dependencies for YARN

The assembly directory produced by `mvn package` will, by default, include all of Spark's
dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this
causes multiple versions of these to appear on executor classpaths: the version packaged in
The assembly directory produced by `mvn package` will, by default, include all of Spark's
dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this
causes multiple versions of these to appear on executor classpaths: the version packaged in
the Spark assembly and the version on each node, included with `yarn.application.classpath`.
The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects,
The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects,
like ZooKeeper and Hadoop itself.

## Building with Mesos support

./build/mvn -Pmesos -DskipTests clean package

## Building for Scala 2.10
To produce a Spark package compiled with Scala 2.10, use the `-Dscala-2.10` property:

Expand Down Expand Up @@ -263,17 +267,17 @@ The run-tests script also can be limited to a specific Python version or a speci

## Running R Tests

To run the SparkR tests you will need to install the R package `testthat`
(run `install.packages(testthat)` from R shell). You can run just the SparkR tests using
To run the SparkR tests you will need to install the R package `testthat`
(run `install.packages(testthat)` from R shell). You can run just the SparkR tests using
the command:

./R/run-tests.sh

## Running Docker-based Integration Test Suites

In order to run Docker integration tests, you have to install the `docker` engine on your box.
The instructions for installation can be found at [the Docker site](https://docs.docker.com/engine/installation/).
Once installed, the `docker` service needs to be started, if not already running.
In order to run Docker integration tests, you have to install the `docker` engine on your box.
The instructions for installation can be found at [the Docker site](https://docs.docker.com/engine/installation/).
Once installed, the `docker` service needs to be started, if not already running.
On Linux, this can be done by `sudo service docker start`.

./build/mvn install -DskipTests
Expand Down

0 comments on commit 8e5475b

Please sign in to comment.