From ae3998d2192703fec1596d0eb92bb7a58950e474 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 5 Apr 2016 21:47:10 -0700 Subject: [PATCH] Revert "[SPARK-13579][BUILD] Stop building the main Spark assembly." to resupport spark-class and its users (like spark-shell). This reverts commit 24d7d2e453ab5eef6099a32fb9e8ed60f6ada93a. Conflicts: project/SparkBuild.scala --- assembly/pom.xml | 101 ++++++++++++------ bin/spark-class | 11 +- bin/spark-class2.cmd | 5 +- .../scala/org/apache/spark/util/Utils.scala | 4 +- .../apache/spark/util/FileAppenderSuite.scala | 72 ++++++------- dev/deps/spark-deps-hadoop-2.2 | 4 +- dev/deps/spark-deps-hadoop-2.3 | 4 +- dev/deps/spark-deps-hadoop-2.4 | 4 +- dev/deps/spark-deps-hadoop-2.6 | 4 +- dev/deps/spark-deps-hadoop-2.7 | 4 +- dev/make-distribution.sh | 25 ++--- dev/mima | 6 +- dev/run-tests.py | 11 +- docs/sql-programming-guide.md | 7 +- examples/pom.xml | 80 ++++++++++++-- .../launcher/AbstractCommandBuilder.java | 47 ++++---- .../spark/launcher/CommandBuilderUtils.java | 4 +- .../launcher/SparkSubmitCommandBuilder.java | 11 +- pom.xml | 44 +++----- project/SparkBuild.scala | 61 +++++------ python/pyspark/streaming/tests.py | 6 +- python/run-tests.py | 18 +--- .../HiveThriftServer2Suites.scala | 4 - sql/hive/pom.xml | 24 +++++ .../org/apache/spark/deploy/yarn/Client.scala | 3 + .../spark/deploy/yarn/ClientSuite.scala | 2 +- 26 files changed, 320 insertions(+), 246 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 22cbac06cad6..477d4931c3a8 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -33,8 +33,9 @@ assembly - none - package + scala-${scala.binary.version} + spark-assembly-${project.version}-hadoop${hadoop.version}.jar + ${project.build.directory}/${spark.jar.dir}/${spark.jar.basename} @@ -68,17 +69,6 @@ spark-repl_${scala.binary.version} ${project.version} - - - - com.google.guava - guava - ${hadoop.deps.scope} - @@ -97,26 +87,75 @@ true - + + + org.apache.maven.plugins + maven-antrun-plugin + + + package + + run + + + + + + + + + + + + + org.apache.maven.plugins - maven-antrun-plugin - - - package - - run - - - - - - - - - - - + maven-shade-plugin + + false + ${spark.jar} + + + *:* + + + + + *:* + + org/datanucleus/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + META-INF/services/org.apache.hadoop.fs.FileSystem + + + reference.conf + + + log4j.properties + + + + + + + diff --git a/bin/spark-class b/bin/spark-class index b489591778cb..e710e388be1b 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -36,20 +36,21 @@ else fi # Find Spark jars. +# TODO: change the directory name when Spark jars move from "lib". if [ -f "${SPARK_HOME}/RELEASE" ]; then - SPARK_JARS_DIR="${SPARK_HOME}/jars" + SPARK_JARS_DIR="${SPARK_HOME}/lib" else - SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" + SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION" fi -if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then +if [ ! -d "$SPARK_JARS_DIR" ]; then echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 echo "You need to build Spark before running this program." 1>&2 exit 1 -else - LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" fi +LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" + # Add the launcher build dir to the classpath if requested. if [ -n "$SPARK_PREPEND_CLASSES" ]; then LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd index 579efff90953..565b87c102b1 100644 --- a/bin/spark-class2.cmd +++ b/bin/spark-class2.cmd @@ -29,10 +29,11 @@ if "x%1"=="x" ( ) rem Find Spark jars. +rem TODO: change the directory name when Spark jars move from "lib". if exist "%SPARK_HOME%\RELEASE" ( - set SPARK_JARS_DIR="%SPARK_HOME%\jars" + set SPARK_JARS_DIR="%SPARK_HOME%\lib" ) else ( - set SPARK_JARS_DIR="%SPARK_HOME%\assembly\target\scala-%SPARK_SCALA_VERSION%\jars" + set SPARK_JARS_DIR="%SPARK_HOME%\assembly\target\scala-%SPARK_SCALA_VERSION%" ) if not exist "%SPARK_JARS_DIR%"\ ( diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c304629bcdbe..50bcf8580597 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1121,9 +1121,9 @@ private[spark] object Utils extends Logging { extraEnvironment: Map[String, String] = Map.empty, redirectStderr: Boolean = true): String = { val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr) - val output = new StringBuilder + val output = new StringBuffer val threadName = "read stdout for " + command(0) - def appendToOutput(s: String): Unit = output.append(s).append("\n") + def appendToOutput(s: String): Unit = output.append(s) val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput) val exitCode = process.waitFor() stdoutThread.join() // Wait for it to finish reading output diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 4fa9f9a8f590..280e4964980d 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -201,29 +201,24 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { // Make sure only logging errors val logger = Logger.getRootLogger - val oldLogLevel = logger.getLevel logger.setLevel(Level.ERROR) - try { - logger.addAppender(mockAppender) + logger.addAppender(mockAppender) - val testOutputStream = new PipedOutputStream() - val testInputStream = new PipedInputStream(testOutputStream) + val testOutputStream = new PipedOutputStream() + val testInputStream = new PipedInputStream(testOutputStream) - // Close the stream before appender tries to read will cause an IOException - testInputStream.close() - testOutputStream.close() - val appender = FileAppender(testInputStream, testFile, new SparkConf) + // Close the stream before appender tries to read will cause an IOException + testInputStream.close() + testOutputStream.close() + val appender = FileAppender(testInputStream, testFile, new SparkConf) - appender.awaitTermination() + appender.awaitTermination() - // If InputStream was closed without first stopping the appender, an exception will be logged - verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture) - val loggingEvent = loggingEventCaptor.getValue - assert(loggingEvent.getThrowableInformation !== null) - assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException]) - } finally { - logger.setLevel(oldLogLevel) - } + // If InputStream was closed without first stopping the appender, an exception will be logged + verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture) + val loggingEvent = loggingEventCaptor.getValue + assert(loggingEvent.getThrowableInformation !== null) + assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException]) } test("file appender async close stream gracefully") { @@ -233,35 +228,30 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { // Make sure only logging errors val logger = Logger.getRootLogger - val oldLogLevel = logger.getLevel logger.setLevel(Level.ERROR) - try { - logger.addAppender(mockAppender) + logger.addAppender(mockAppender) - val testOutputStream = new PipedOutputStream() - val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream + val testOutputStream = new PipedOutputStream() + val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream - // Close the stream before appender tries to read will cause an IOException - testInputStream.close() - testOutputStream.close() - val appender = FileAppender(testInputStream, testFile, new SparkConf) + // Close the stream before appender tries to read will cause an IOException + testInputStream.close() + testOutputStream.close() + val appender = FileAppender(testInputStream, testFile, new SparkConf) - // Stop the appender before an IOException is called during read - testInputStream.latchReadStarted.await() - appender.stop() - testInputStream.latchReadProceed.countDown() + // Stop the appender before an IOException is called during read + testInputStream.latchReadStarted.await() + appender.stop() + testInputStream.latchReadProceed.countDown() - appender.awaitTermination() + appender.awaitTermination() - // Make sure no IOException errors have been logged as a result of appender closing gracefully - verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture) - import scala.collection.JavaConverters._ - loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent => - assert(loggingEvent.getThrowableInformation === null - || !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException]) - } - } finally { - logger.setLevel(oldLogLevel) + // Make sure no IOException errors have been logged as a result of appender closing gracefully + verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture) + import scala.collection.JavaConverters._ + loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent => + assert(loggingEvent.getThrowableInformation === null + || !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException]) } } diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index 2c24366cc3a1..3865a9fb1612 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -12,6 +12,7 @@ asm-3.1.jar asm-commons-3.1.jar asm-tree-3.1.jar avro-1.7.7.jar +avro-ipc-1.7.7-tests.jar avro-ipc-1.7.7.jar avro-mapred-1.7.7-hadoop2.jar bonecp-0.8.0.RELEASE.jar @@ -60,7 +61,6 @@ grizzly-http-2.1.2.jar grizzly-http-server-2.1.2.jar grizzly-http-servlet-2.1.2.jar grizzly-rcm-2.1.2.jar -guava-14.0.1.jar guice-3.0.jar guice-servlet-3.0.jar hadoop-annotations-2.2.0.jar @@ -164,6 +164,7 @@ scala-parser-combinators_2.11-1.0.4.jar scala-reflect-2.11.8.jar scala-xml_2.11-1.0.2.jar scalap-2.11.8.jar +servlet-api-2.5.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar @@ -176,6 +177,7 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-1.5.6.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar xz-1.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index e9cb0d8f3eac..4313799da783 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -12,6 +12,7 @@ asm-3.1.jar asm-commons-3.1.jar asm-tree-3.1.jar avro-1.7.7.jar +avro-ipc-1.7.7-tests.jar avro-ipc-1.7.7.jar avro-mapred-1.7.7-hadoop2.jar base64-2.3.8.jar @@ -55,7 +56,6 @@ eigenbase-properties-1.1.5.jar geronimo-annotation_1.0_spec-1.1.1.jar geronimo-jaspic_1.0_spec-1.0.jar geronimo-jta_1.1_spec-1.1.1.jar -guava-14.0.1.jar guice-3.0.jar guice-servlet-3.0.jar hadoop-annotations-2.3.0.jar @@ -155,6 +155,7 @@ scala-parser-combinators_2.11-1.0.4.jar scala-reflect-2.11.8.jar scala-xml_2.11-1.0.2.jar scalap-2.11.8.jar +servlet-api-2.5.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar @@ -167,6 +168,7 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-1.5.6.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar xz-1.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index d8d1840da553..910ea685f26f 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -12,6 +12,7 @@ asm-3.1.jar asm-commons-3.1.jar asm-tree-3.1.jar avro-1.7.7.jar +avro-ipc-1.7.7-tests.jar avro-ipc-1.7.7.jar avro-mapred-1.7.7-hadoop2.jar base64-2.3.8.jar @@ -55,7 +56,6 @@ eigenbase-properties-1.1.5.jar geronimo-annotation_1.0_spec-1.1.1.jar geronimo-jaspic_1.0_spec-1.0.jar geronimo-jta_1.1_spec-1.1.1.jar -guava-14.0.1.jar guice-3.0.jar guice-servlet-3.0.jar hadoop-annotations-2.4.0.jar @@ -156,6 +156,7 @@ scala-parser-combinators_2.11-1.0.4.jar scala-reflect-2.11.8.jar scala-xml_2.11-1.0.2.jar scalap-2.11.8.jar +servlet-api-2.5.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar @@ -168,6 +169,7 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-1.5.6.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar xz-1.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 8beede1e38d2..0692f24e47db 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -16,6 +16,7 @@ asm-3.1.jar asm-commons-3.1.jar asm-tree-3.1.jar avro-1.7.7.jar +avro-ipc-1.7.7-tests.jar avro-ipc-1.7.7.jar avro-mapred-1.7.7-hadoop2.jar base64-2.3.8.jar @@ -60,7 +61,6 @@ geronimo-annotation_1.0_spec-1.1.1.jar geronimo-jaspic_1.0_spec-1.0.jar geronimo-jta_1.1_spec-1.1.1.jar gson-2.2.4.jar -guava-14.0.1.jar guice-3.0.jar guice-servlet-3.0.jar hadoop-annotations-2.6.0.jar @@ -162,6 +162,7 @@ scala-parser-combinators_2.11-1.0.4.jar scala-reflect-2.11.8.jar scala-xml_2.11-1.0.2.jar scalap-2.11.8.jar +servlet-api-2.5.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar @@ -174,6 +175,7 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-1.5.6.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xercesImpl-2.9.1.jar xmlenc-0.52.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index a9d814f94487..e397558e058d 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -16,6 +16,7 @@ asm-3.1.jar asm-commons-3.1.jar asm-tree-3.1.jar avro-1.7.7.jar +avro-ipc-1.7.7-tests.jar avro-ipc-1.7.7.jar avro-mapred-1.7.7-hadoop2.jar base64-2.3.8.jar @@ -60,7 +61,6 @@ geronimo-annotation_1.0_spec-1.1.1.jar geronimo-jaspic_1.0_spec-1.0.jar geronimo-jta_1.1_spec-1.1.1.jar gson-2.2.4.jar -guava-14.0.1.jar guice-3.0.jar guice-servlet-3.0.jar hadoop-annotations-2.7.0.jar @@ -163,6 +163,7 @@ scala-parser-combinators_2.11-1.0.4.jar scala-reflect-2.11.8.jar scala-xml_2.11-1.0.2.jar scalap-2.11.8.jar +servlet-api-2.5.jar slf4j-api-1.7.16.jar slf4j-log4j12-1.7.16.jar snappy-0.2.jar @@ -175,6 +176,7 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-1.5.6.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xercesImpl-2.9.1.jar xmlenc-0.52.jar diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index 4f7544f6ea78..dbdd42ff9e08 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -160,35 +160,28 @@ echo -e "\$ ${BUILD_COMMAND[@]}\n" # Make directories rm -rf "$DISTDIR" -mkdir -p "$DISTDIR/jars" +mkdir -p "$DISTDIR/lib" echo "Spark $VERSION$GITREVSTRING built for Hadoop $SPARK_HADOOP_VERSION" > "$DISTDIR/RELEASE" echo "Build flags: $@" >> "$DISTDIR/RELEASE" # Copy jars -cp "$SPARK_HOME"/assembly/target/scala*/jars/* "$DISTDIR/jars/" - -# Only create the yarn directory if the yarn artifacts were build. -if [ -f "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar ]; then - mkdir "$DISTDIR"/yarn - cp "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/yarn" -fi +cp "$SPARK_HOME"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/" +# This will fail if the -Pyarn profile is not provided +# In this case, silence the error and ignore the return code of this command +cp "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/lib/" &> /dev/null || : # Copy examples and dependencies mkdir -p "$DISTDIR/examples/jars" cp "$SPARK_HOME"/examples/target/scala*/jars/* "$DISTDIR/examples/jars" -# Deduplicate jars that have already been packaged as part of the main Spark dependencies. -for f in "$DISTDIR/examples/jars/"*; do - name=$(basename "$f") - if [ -f "$DISTDIR/jars/$name" ]; then - rm "$DISTDIR/examples/jars/$name" - fi -done - # Copy example sources (needed for python and SQL) mkdir -p "$DISTDIR/examples/src/main" cp -r "$SPARK_HOME"/examples/src/main "$DISTDIR/examples/src/" +if [ "$SPARK_HIVE" == "1" ]; then + cp "$SPARK_HOME"/lib_managed/jars/datanucleus*.jar "$DISTDIR/lib/" +fi + # Copy license and ASF files cp "$SPARK_HOME/LICENSE" "$DISTDIR" cp -r "$SPARK_HOME/licenses" "$DISTDIR" diff --git a/dev/mima b/dev/mima index c3553490451c..ea746e6f01b4 100755 --- a/dev/mima +++ b/dev/mima @@ -25,8 +25,8 @@ FWDIR="$(cd "`dirname "$0"`"/..; pwd)" cd "$FWDIR" SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" -TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)" -OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)" +TOOLS_CLASSPATH="$(build/sbt "export tools/fullClasspath" | tail -n1)" +OLD_DEPS_CLASSPATH="$(build/sbt $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)" rm -f .generated-mima* @@ -36,7 +36,7 @@ java \ -cp "$TOOLS_CLASSPATH:$OLD_DEPS_CLASSPATH" \ org.apache.spark.tools.GenerateMIMAIgnore -echo -e "q\n" | build/sbt -DcopyDependencies=false "$@" mimaReportBinaryIssues | grep -v -e "info.*Resolving" +echo -e "q\n" | build/sbt mimaReportBinaryIssues | grep -v -e "info.*Resolving" ret_val=$? if [ $ret_val != 0 ]; then diff --git a/dev/run-tests.py b/dev/run-tests.py index cbe347274e62..c2944747ee5c 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -350,7 +350,7 @@ def build_spark_sbt(hadoop_version): def build_spark_assembly_sbt(hadoop_version): # Enable all of the profiles for the build: build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags - sbt_goals = ["assembly/package"] + sbt_goals = ["assembly/assembly"] profiles_and_goals = build_profiles + sbt_goals print("[info] Building Spark assembly (w/Hive 1.2.1) using SBT with these arguments: ", " ".join(profiles_and_goals)) @@ -371,10 +371,9 @@ def build_apache_spark(build_tool, hadoop_version): build_spark_sbt(hadoop_version) -def detect_binary_inop_with_mima(hadoop_version): - build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags +def detect_binary_inop_with_mima(): set_title_and_block("Detecting binary incompatibilities with MiMa", "BLOCK_MIMA") - run_cmd([os.path.join(SPARK_HOME, "dev", "mima")] + build_profiles) + run_cmd([os.path.join(SPARK_HOME, "dev", "mima")]) def run_scala_tests_maven(test_profiles): @@ -572,8 +571,8 @@ def main(): # backwards compatibility checks if build_tool == "sbt": # Note: compatibility tests only supported in sbt for now - detect_binary_inop_with_mima(hadoop_version) - # Since we did not build assembly/package before running dev/mima, we need to + detect_binary_inop_with_mima() + # Since we did not build assembly/assembly before running dev/mima, we need to # do it here because the tests still rely on it; see SPARK-13294 for details. build_spark_assembly_sbt(hadoop_version) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 274a8edb0c77..2fdc97f8a02d 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1687,7 +1687,12 @@ on all of the worker nodes, as they will need access to the Hive serialization a (SerDes) in order to access data stored in Hive. Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` (for security configuration), -`hdfs-site.xml` (for HDFS configuration) file in `conf/`. + `hdfs-site.xml` (for HDFS configuration) file in `conf/`. Please note when running +the query on a YARN cluster (`cluster` mode), the `datanucleus` jars under the `lib` directory +and `hive-site.xml` under `conf/` directory need to be available on the driver and all executors launched by the +YARN cluster. The convenient way to do this is adding them through the `--jars` option and `--file` option of the +`spark-submit` command. +
diff --git a/examples/pom.xml b/examples/pom.xml index 4a20370f0668..b7f37978b945 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -27,15 +27,12 @@ org.apache.spark spark-examples_2.11 - jar - Spark Project Examples - http://spark.apache.org/ - examples - none - package + jar + Spark Project Examples + http://spark.apache.org/ @@ -78,6 +75,23 @@ spark-streaming-kafka_${scala.binary.version} ${project.version} + + org.apache.hbase + hbase-testing-util + ${hbase.version} + ${hbase.deps.scope} + + + + org.apache.hbase + hbase-annotations + + + org.jruby + jruby-complete + + + org.apache.hbase hbase-protocol @@ -125,10 +139,6 @@ org.apache.hbase hbase-annotations - - org.apache.hbase - hbase-common - org.apache.hadoop hadoop-core @@ -198,6 +208,13 @@ ${hbase.version} ${hbase.deps.scope} + + org.apache.hbase + hbase-hadoop-compat + ${hbase.version} + test-jar + test + org.apache.commons commons-math3 @@ -277,6 +294,17 @@ scopt_${scala.binary.version} 3.3.0 + + + + org.scala-lang + scala-library + provided + + @@ -297,6 +325,38 @@ true + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + none + + test-jar + + + + + ${jars.target.dir} + + + + org.apache.maven.plugins + maven-dependency-plugin + + + package + + copy-dependencies + + + runtime + ${jars.target.dir} + + + + diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 7a5e37c50163..d02b2a499455 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -144,26 +144,10 @@ List buildClassPath(String appClassPath) throws IOException { boolean isTesting = "1".equals(getenv("SPARK_TESTING")); if (prependClasses || isTesting) { String scala = getScalaVersion(); - List projects = Arrays.asList( - "common/network-common", - "common/network-shuffle", - "common/network-yarn", - "common/sketch", - "common/tags", - "common/unsafe", - "core", - "examples", - "graphx", - "launcher", - "mllib", - "repl", - "sql/catalyst", - "sql/core", - "sql/hive", - "sql/hive-thriftserver", - "streaming", - "yarn" - ); + List projects = Arrays.asList("core", "repl", "mllib", "graphx", + "streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver", + "yarn", "launcher", + "common/network-common", "common/network-shuffle", "common/network-yarn"); if (prependClasses) { if (!isTesting) { System.err.println( @@ -190,12 +174,31 @@ List buildClassPath(String appClassPath) throws IOException { // Add Spark jars to the classpath. For the testing case, we rely on the test code to set and // propagate the test classpath appropriately. For normal invocation, look for the jars // directory under SPARK_HOME. - boolean isTestingSql = "1".equals(getenv("SPARK_SQL_TESTING")); - String jarsDir = findJarsDir(getSparkHome(), getScalaVersion(), !isTesting && !isTestingSql); + String jarsDir = findJarsDir(getSparkHome(), getScalaVersion(), !isTesting); if (jarsDir != null) { addToClassPath(cp, join(File.separator, jarsDir, "*")); } + // Datanucleus jars must be included on the classpath. Datanucleus jars do not work if only + // included in the uber jar as plugin.xml metadata is lost. Both sbt and maven will populate + // "lib_managed/jars/" with the datanucleus jars when Spark is built with Hive + File libdir; + if (new File(sparkHome, "RELEASE").isFile()) { + libdir = new File(sparkHome, "lib"); + } else { + libdir = new File(sparkHome, "lib_managed/jars"); + } + + if (libdir.isDirectory()) { + for (File jar : libdir.listFiles()) { + if (jar.getName().startsWith("datanucleus-")) { + addToClassPath(cp, jar.getAbsolutePath()); + } + } + } else { + checkState(isTesting, "Library directory '%s' does not exist.", libdir.getAbsolutePath()); + } + addToClassPath(cp, getenv("HADOOP_CONF_DIR")); addToClassPath(cp, getenv("YARN_CONF_DIR")); addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH")); diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java index 91586aad7b70..a08c8dcba402 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java +++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java @@ -358,12 +358,12 @@ static String findJarsDir(String sparkHome, String scalaVersion, boolean failIfN // TODO: change to the correct directory once the assembly build is changed. File libdir; if (new File(sparkHome, "RELEASE").isFile()) { - libdir = new File(sparkHome, "jars"); + libdir = new File(sparkHome, "lib"); checkState(!failIfNotFound || libdir.isDirectory(), "Library directory '%s' does not exist.", libdir.getAbsolutePath()); } else { - libdir = new File(sparkHome, String.format("assembly/target/scala-%s/jars", scalaVersion)); + libdir = new File(sparkHome, String.format("assembly/target/scala-%s", scalaVersion)); if (!libdir.isDirectory()) { checkState(!failIfNotFound, "Library directory '%s' does not exist; make sure Spark is built.", diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index c31c42cd3a41..56e4107c5a0c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -336,7 +336,6 @@ private boolean isThriftServer(String mainClass) { } private List findExamplesJars() { - boolean isTesting = "1".equals(getenv("SPARK_TESTING")); List examplesJars = new ArrayList<>(); String sparkHome = getSparkHome(); @@ -347,15 +346,11 @@ private List findExamplesJars() { jarsDir = new File(sparkHome, String.format("examples/target/scala-%s/jars", getScalaVersion())); } - - boolean foundDir = jarsDir.isDirectory(); - checkState(isTesting || foundDir, "Examples jars directory '%s' does not exist.", + checkState(jarsDir.isDirectory(), "Examples jars directory '%s' does not exist.", jarsDir.getAbsolutePath()); - if (foundDir) { - for (File f: jarsDir.listFiles()) { - examplesJars.add(f.getAbsolutePath()); - } + for (File f: jarsDir.listFiles()) { + examplesJars.add(f.getAbsolutePath()); } return examplesJars; } diff --git a/pom.xml b/pom.xml index 984b2859efbe..e135c92c0786 100644 --- a/pom.xml +++ b/pom.xml @@ -185,10 +185,6 @@ ${project.build.directory}/scala-${scala.binary.version}/jars - - prepare-package - none - + + org.spark-project.spark + unused + 1.0.0 + - - org.apache.avro - avro-ipc - tests - ${avro.version} - test - org.apache.avro avro-mapred @@ -1524,10 +1521,6 @@ org.codehaus.groovy groovy-all - - javax.servlet - servlet-api - @@ -1923,7 +1916,6 @@ --> ${test_classpath} 1 - ${scala.binary.version} 1 ${test.java.home} @@ -1972,7 +1964,6 @@ --> ${test_classpath} 1 - ${scala.binary.version} 1 ${test.java.home} @@ -2155,7 +2146,6 @@ 2.10 - generate-test-classpath test-compile build-classpath @@ -2165,17 +2155,6 @@ test_classpath - - copy-module-dependencies - ${build.copyDependenciesPhase} - - copy-dependencies - - - runtime - ${jars.target.dir} - - @@ -2190,6 +2169,9 @@ false + + org.spark-project.spark:unused + org.eclipse.jetty:jetty-io org.eclipse.jetty:jetty-http org.eclipse.jetty:jetty-continuation @@ -2320,7 +2302,7 @@ prepare-test-jar - ${build.testJarPhase} + prepare-package test-jar diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 60124ef0a13b..5d62b688b9ea 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -20,7 +20,6 @@ import java.nio.file.Files import scala.util.Properties import scala.collection.JavaConverters._ -import scala.collection.mutable.Stack import sbt._ import sbt.Classpaths.publishTask @@ -58,12 +57,11 @@ object BuildCommons { Seq("yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests").map(ProjectRef(buildLocation, _)) - val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) = - Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly") + val assemblyProjects@Seq(assembly, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) = + Seq("assembly", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly") .map(ProjectRef(buildLocation, _)) - val copyJarsProjects@Seq(assembly, examples) = Seq("assembly", "examples") - .map(ProjectRef(buildLocation, _)) + val copyJarsProjects@Seq(examples) = Seq("examples").map(ProjectRef(buildLocation, _)) val tools = ProjectRef(buildLocation, "tools") // Root project. @@ -265,14 +263,8 @@ object SparkBuild extends PomBuild { /* Unsafe settings */ enable(Unsafe.settings)(unsafe) - /* - * Set up tasks to copy dependencies during packaging. This step can be disabled in the command - * line, so that dev/mima can run without trying to copy these files again and potentially - * causing issues. - */ - if (!"false".equals(System.getProperty("copyDependencies"))) { - copyJarsProjects.foreach(enable(CopyDependencies.settings)) - } + /* Set up tasks to copy dependencies during packaging. */ + copyJarsProjects.foreach(enable(CopyDependencies.settings)) /* Enable Assembly for all assembly projects */ assemblyProjects.foreach(enable(Assembly.settings)) @@ -485,6 +477,8 @@ object Assembly { val hadoopVersion = taskKey[String]("The version of hadoop that spark is compiled against.") + val deployDatanucleusJars = taskKey[Unit]("Deploy datanucleus jars to the spark/lib_managed/jars directory") + lazy val settings = assemblySettings ++ Seq( test in assembly := {}, hadoopVersion := { @@ -503,13 +497,27 @@ object Assembly { s"${mName}-test-${v}.jar" }, mergeStrategy in assembly := { + case PathList("org", "datanucleus", xs @ _*) => MergeStrategy.discard case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "log4j.properties" => MergeStrategy.discard case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first - } + }, + deployDatanucleusJars := { + val jars: Seq[File] = (fullClasspath in assembly).value.map(_.data) + .filter(_.getPath.contains("org.datanucleus")) + var libManagedJars = new File(BuildCommons.sparkHome, "lib_managed/jars") + libManagedJars.mkdirs() + jars.foreach { jar => + val dest = new File(libManagedJars, jar.getName) + if (!dest.exists()) { + Files.copy(jar.toPath, dest.toPath) + } + } + }, + assembly <<= assembly.dependsOn(deployDatanucleusJars) ) } @@ -690,13 +698,6 @@ object Java8TestSettings { object TestSettings { import BuildCommons._ - private val scalaBinaryVersion = - if (System.getProperty("scala-2.10") == "true") { - "2.10" - } else { - "2.11" - } - lazy val settings = Seq ( // Fork new JVMs for tests and set Java options for those fork := true, @@ -706,7 +707,6 @@ object TestSettings { "SPARK_DIST_CLASSPATH" -> (fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"), "SPARK_PREPEND_CLASSES" -> "1", - "SPARK_SCALA_VERSION" -> scalaBinaryVersion, "SPARK_TESTING" -> "1", "JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))), javaOptions in Test += s"-Djava.io.tmpdir=$testTempDir", @@ -743,21 +743,8 @@ object TestSettings { parallelExecution in Test := false, // Make sure the test temp directory exists. resourceGenerators in Test <+= resourceManaged in Test map { outDir: File => - var dir = new File(testTempDir) - if (!dir.isDirectory()) { - // Because File.mkdirs() can fail if multiple callers are trying to create the same - // parent directory, this code tries to create parents one at a time, and avoids - // failures when the directories have been created by somebody else. - val stack = new Stack[File]() - while (!dir.isDirectory()) { - stack.push(dir) - dir = dir.getParentFile() - } - - while (stack.nonEmpty) { - val d = stack.pop() - require(d.mkdir() || d.isDirectory(), s"Failed to create directory $d") - } + if (!new File(testTempDir).isDirectory()) { + require(new File(testTempDir).mkdirs()) } Seq[File]() }, diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 148bf7e8ff5c..d010c0e0080c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1482,7 +1482,7 @@ def search_kafka_assembly_jar(): raise Exception( ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) + "You need to build Spark with " - "'build/sbt assembly/package streaming-kafka-assembly/assembly' or " + "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or " "'build/mvn package' before running this test.") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please " @@ -1548,7 +1548,7 @@ def search_kinesis_asl_assembly_jar(): elif are_kinesis_tests_enabled is False: sys.stderr.write("Skipping all Kinesis Python tests as the optional Kinesis project was " "not compiled into a JAR. To run these tests, " - "you need to build Spark with 'build/sbt -Pkinesis-asl assembly/package " + "you need to build Spark with 'build/sbt -Pkinesis-asl assembly/assembly " "streaming-kinesis-asl-assembly/assembly' or " "'build/mvn -Pkinesis-asl package' before running this test.") else: @@ -1556,7 +1556,7 @@ def search_kinesis_asl_assembly_jar(): ("Failed to find Spark Streaming Kinesis assembly jar in %s. " % kinesis_asl_assembly_dir) + "You need to build Spark with 'build/sbt -Pkinesis-asl " - "assembly/package streaming-kinesis-asl-assembly/assembly'" + "assembly/assembly streaming-kinesis-asl-assembly/assembly'" "or 'build/mvn -Pkinesis-asl package' before running this test.") sys.stderr.write("Running tests: %s \n" % (str(testcases))) diff --git a/python/run-tests.py b/python/run-tests.py index 38b3bb84c10b..a9f8854e6f66 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -53,25 +53,11 @@ def print_red(text): FAILURE_REPORTING_LOCK = Lock() LOGGER = logging.getLogger() -# Find out where the assembly jars are located. -for scala in ["2.11", "2.10"]: - build_dir = os.path.join(SPARK_HOME, "assembly", "target", "scala-" + scala) - if os.path.isdir(build_dir): - SPARK_DIST_CLASSPATH = os.path.join(build_dir, "jars", "*") - break -else: - raise Exception("Cannot find assembly build directory, please build Spark first.") - def run_individual_python_test(test_name, pyspark_python): env = dict(os.environ) - env.update({ - 'SPARK_DIST_CLASSPATH': SPARK_DIST_CLASSPATH, - 'SPARK_TESTING': '1', - 'SPARK_PREPEND_CLASSES': '1', - 'PYSPARK_PYTHON': which(pyspark_python), - 'PYSPARK_DRIVER_PYTHON': which(pyspark_python) - }) + env.update({'SPARK_TESTING': '1', 'PYSPARK_PYTHON': which(pyspark_python), + 'PYSPARK_DRIVER_PYTHON': which(pyspark_python)}) LOGGER.debug("Starting test(%s): %s", pyspark_python, test_name) start_time = time.time() try: diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index a1268b8e94f5..f15ad4ed720c 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -771,15 +771,11 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl extraEnvironment = Map( // Disables SPARK_TESTING to exclude log4j.properties in test directories. "SPARK_TESTING" -> "0", - // But set SPARK_SQL_TESTING to make spark-class happy. - "SPARK_SQL_TESTING" -> "1", // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance can be // started at a time, which is not Jenkins friendly. "SPARK_PID_DIR" -> pidDir.getCanonicalPath), redirectStderr = true) - logInfo(s"COMMAND: $command") - logInfo(s"OUTPUT: $lines") lines.split("\n").collectFirst { case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length)) }.getOrElse { diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 61504becf1f3..58efd80512a5 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -225,6 +225,30 @@ -da -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + + ${basedir}/../../lib_managed/jars + false + false + true + org.datanucleus + + + + + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 5e7e3be08d0f..1c2f4294a60c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -447,6 +447,9 @@ private[spark] class Client( * * Note that the archive cannot be a "local" URI. If none of the above settings are found, * then upload all files found in $SPARK_HOME/jars. + * + * TODO: currently the code looks in $SPARK_HOME/lib while the work to replace assemblies + * with a directory full of jars is ongoing. */ val sparkArchive = sparkConf.get(SPARK_ARCHIVE) if (sparkArchive.isDefined) { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 74e268dc4847..2eaafa072a3a 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -273,7 +273,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll test("distribute local spark jars") { val temp = Utils.createTempDir() - val jarsDir = new File(temp, "jars") + val jarsDir = new File(temp, "lib") assert(jarsDir.mkdir()) val jar = TestUtils.createJarWithFiles(Map(), jarsDir) new FileOutputStream(new File(temp, "RELEASE")).close()