From b20171267d610715d5b0a86b474c903e9bc3a1a3 Mon Sep 17 00:00:00 2001 From: Dan Osipov Date: Tue, 16 Sep 2014 13:40:16 -0700 Subject: [PATCH 1/6] [SPARK-787] Add S3 configuration parameters to the EC2 deploy scripts When deploying to AWS, there is additional configuration that is required to read S3 files. EMR creates it automatically, there is no reason that the Spark EC2 script shouldn't. This PR requires a corresponding PR to the mesos/spark-ec2 to be merged, as it gets cloned in the process of setting up machines: https://github.com/mesos/spark-ec2/pull/58 Author: Dan Osipov Closes #1120 from danosipov/s3_credentials and squashes the following commits: 758da8b [Dan Osipov] Modify documentation to include the new parameter 71fab14 [Dan Osipov] Use a parameter --copy-aws-credentials to enable S3 credential deployment 7e0da26 [Dan Osipov] Get AWS credentials out of boto connection instance 39bdf30 [Dan Osipov] Add S3 configuration parameters to the EC2 deploy scripts --- docs/ec2-scripts.md | 2 +- ec2/deploy.generic/root/spark-ec2/ec2-variables.sh | 2 ++ ec2/spark_ec2.py | 10 ++++++++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md index f5ac6d894e1eb..b2ca6a9b48f32 100644 --- a/docs/ec2-scripts.md +++ b/docs/ec2-scripts.md @@ -156,6 +156,6 @@ If you have a patch or suggestion for one of these limitations, feel free to # Accessing Data in S3 -Spark's file interface allows it to process data in Amazon S3 using the same URI formats that are supported for Hadoop. You can specify a path in S3 as input through a URI of the form `s3n:///path`. You will also need to set your Amazon security credentials, either by setting the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` before your program or through `SparkContext.hadoopConfiguration`. Full instructions on S3 access using the Hadoop input libraries can be found on the [Hadoop S3 page](http://wiki.apache.org/hadoop/AmazonS3). +Spark's file interface allows it to process data in Amazon S3 using the same URI formats that are supported for Hadoop. You can specify a path in S3 as input through a URI of the form `s3n:///path`. To provide AWS credentials for S3 access, launch the Spark cluster with the option `--copy-aws-credentials`. Full instructions on S3 access using the Hadoop input libraries can be found on the [Hadoop S3 page](http://wiki.apache.org/hadoop/AmazonS3). In addition to using a single input file, you can also use a directory of files as input by simply giving the path to the directory. diff --git a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh index 3570891be804e..740c267fd9866 100644 --- a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh +++ b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh @@ -30,3 +30,5 @@ export HADOOP_MAJOR_VERSION="{{hadoop_major_version}}" export SWAP_MB="{{swap}}" export SPARK_WORKER_INSTANCES="{{spark_worker_instances}}" export SPARK_MASTER_OPTS="{{spark_master_opts}}" +export AWS_ACCESS_KEY_ID="{{aws_access_key_id}}" +export AWS_SECRET_ACCESS_KEY="{{aws_secret_access_key}}" \ No newline at end of file diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 5682e96aa8770..abac71eaca595 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -158,6 +158,9 @@ def parse_args(): parser.add_option( "--additional-security-group", type="string", default="", help="Additional security group to place the machines in") + parser.add_option( + "--copy-aws-credentials", action="store_true", default=False, + help="Add AWS credentials to hadoop configuration to allow Spark to access S3") (opts, args) = parser.parse_args() if len(args) != 2: @@ -714,6 +717,13 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): "spark_master_opts": opts.master_opts } + if opts.copy_aws_credentials: + template_vars["aws_access_key_id"] = conn.aws_access_key_id + template_vars["aws_secret_access_key"] = conn.aws_secret_access_key + else: + template_vars["aws_access_key_id"] = "" + template_vars["aws_secret_access_key"] = "" + # Create a temp directory in which we will place all the files to be # deployed after we substitue template parameters in them tmp_dir = tempfile.mkdtemp() From a6e1712f1e9c36deb24c5073aa8edcfc047d76eb Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Tue, 16 Sep 2014 13:46:06 -0700 Subject: [PATCH 2/6] Add a Community Projects page This adds a new page to the docs listing community projects -- those created outside of Apache Spark that are of interest to the community of Spark users. Anybody can add to it just by submitting a PR. There was a discussion thread about alternatives: * Creating a Github organization for Spark projects - we could not find any sponsors for this, and it would be difficult to organize since many folks just create repos in their company organization or personal accounts * Apache has some place for storing community projects, but it was deemed difficult to work with, and again would be some permissions issues -- not everyone could update it. Author: Evan Chan Closes #2219 from velvia/community-projects-page and squashes the following commits: 7316822 [Evan Chan] Point to Spark wiki: supplemental projects page 613b021 [Evan Chan] Add a few more projects a85eaaf [Evan Chan] Add a Community Projects page --- docs/_layouts/global.html | 3 ++- docs/index.md | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index a53e8a775b71f..627ed37de4a9c 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -111,6 +111,7 @@
  • Building Spark
  • Contributing to Spark
  • +
  • Supplemental Projects
  • @@ -151,7 +152,7 @@

    {{ page.title }}

    MathJax.Hub.Config({ tex2jax: { inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], - displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], + displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], processEscapes: true, skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] } diff --git a/docs/index.md b/docs/index.md index e8ebadbd4e427..edd622ec90f64 100644 --- a/docs/index.md +++ b/docs/index.md @@ -107,6 +107,7 @@ options for deployment: * [OpenStack Swift](storage-openstack-swift.html) * [Building Spark](building-spark.html): build Spark using the Maven system * [Contributing to Spark](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) +* [Supplemental Projects](https://cwiki.apache.org/confluence/display/SPARK/Supplemental+Spark+Projects): related third party Spark projects **External Resources:** From 0a7091e689a4c8b1e7b61e9f0873e6557f40d952 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 16 Sep 2014 16:03:20 -0700 Subject: [PATCH 3/6] [SPARK-3555] Fix UISuite race condition The test "jetty selects different port under contention" is flaky. If another process binds to 4040 before the test starts, then the first server we start there will fail, and the subsequent servers we start thereafter may successfully bind to 4040 if it was released between the servers starting. Instead, we should just let Java find a random free port for us and hold onto it for the duration of the test. Author: Andrew Or Closes #2418 from andrewor14/fix-port-contention and squashes the following commits: 0cd4974 [Andrew Or] Stop them servers a7071fe [Andrew Or] Pick random port instead of 4040 --- .../test/scala/org/apache/spark/ui/UISuite.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 48790b59e7fbd..92a21f82f3c21 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -23,7 +23,6 @@ import javax.servlet.http.HttpServletRequest import scala.io.Source import scala.util.{Failure, Success, Try} -import org.eclipse.jetty.server.Server import org.eclipse.jetty.servlet.ServletContextHandler import org.scalatest.FunSuite import org.scalatest.concurrent.Eventually._ @@ -108,14 +107,8 @@ class UISuite extends FunSuite { } test("jetty selects different port under contention") { - val startPort = 4040 - val server = new Server(startPort) - - Try { server.start() } match { - case Success(s) => - case Failure(e) => - // Either case server port is busy hence setup for test complete - } + val server = new ServerSocket(0) + val startPort = server.getLocalPort val serverInfo1 = JettyUtils.startJettyServer( "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) val serverInfo2 = JettyUtils.startJettyServer( @@ -126,6 +119,9 @@ class UISuite extends FunSuite { assert(boundPort1 != startPort) assert(boundPort2 != startPort) assert(boundPort1 != boundPort2) + serverInfo1.server.stop() + serverInfo2.server.stop() + server.close() } test("jetty binds to port 0 correctly") { From 008a5ed4808d1467b47c1d6fa4d950cc6c4976b7 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 16 Sep 2014 21:57:33 -0700 Subject: [PATCH 4/6] [Minor]ignore all config files in conf Some config files in ```conf``` should ignore, such as conf/fairscheduler.xml conf/hive-log4j.properties conf/metrics.properties ... So ignore all ```sh```/```properties```/```conf```/```xml``` files Author: wangfei Closes #2395 from scwf/patch-2 and squashes the following commits: 3dc53f2 [wangfei] duplicate ```conf/*.conf``` 3c2986f [wangfei] ignore all config files --- .gitignore | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 7ec8d45e12c6b..a31bf7e0091f4 100644 --- a/.gitignore +++ b/.gitignore @@ -15,11 +15,10 @@ out/ third_party/libmesos.so third_party/libmesos.dylib conf/java-opts -conf/spark-env.sh -conf/streaming-env.sh -conf/log4j.properties -conf/spark-defaults.conf -conf/hive-site.xml +conf/*.sh +conf/*.properties +conf/*.conf +conf/*.xml docs/_site docs/api target/ @@ -50,7 +49,6 @@ unit-tests.log /lib/ rat-results.txt scalastyle.txt -conf/*.conf scalastyle-output.xml # For Hive From 983609a4dd83e25598455bfce93fa1c1fa9f2c51 Mon Sep 17 00:00:00 2001 From: viper-kun Date: Wed, 17 Sep 2014 00:09:57 -0700 Subject: [PATCH 5/6] [Docs] Correct spark.files.fetchTimeout default value change the value of spark.files.fetchTimeout Author: viper-kun Closes #2406 from viper-kun/master and squashes the following commits: ecb0d46 [viper-kun] [Docs] Correct spark.files.fetchTimeout default value 7cf4c7a [viper-kun] Update configuration.md --- docs/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index af16489a44281..99faf51c6f3db 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -520,10 +520,10 @@ Apart from these, the following properties are also available, and may be useful spark.files.fetchTimeout - false + 60 Communication timeout to use when fetching files added through SparkContext.addFile() from - the driver. + the driver, in seconds. From 7d1a37239c50394025d9f16acf5dcd05cfbe7250 Mon Sep 17 00:00:00 2001 From: chesterxgchen Date: Wed, 17 Sep 2014 10:25:52 -0500 Subject: [PATCH 6/6] SPARK-3177 (on Master Branch) The JIRA and PR was original created for branch-1.1, and move to master branch now. Chester The Issue is due to that yarn-alpha and yarn have different APIs for certain class fields. In this particular case, the ClientBase using reflection to to address this issue, and we need to different way to test the ClientBase's method. Original ClientBaseSuite using getFieldValue() method to do this. But it doesn't work for yarn-alpha as the API returns an array of String instead of just String (which is the case for Yarn-stable API). To fix the test, I add a new method def getFieldValue2[A: ClassTag, A1: ClassTag, B](clazz: Class[_], field: String, defaults: => B) (mapTo: A => B)(mapTo1: A1 => B) : B = Try(clazz.getField(field)).map(_.get(null)).map { case v: A => mapTo(v) case v1: A1 => mapTo1(v1) case _ => defaults }.toOption.getOrElse(defaults) to handle the cases where the field type can be either type A or A1. In this new method the type A or A1 is pattern matched and corresponding mapTo function (mapTo or mapTo1) is used. Author: chesterxgchen Closes #2204 from chesterxgchen/SPARK-3177-master and squashes the following commits: e72a6ea [chesterxgchen] The Issue is due to that yarn-alpha and yarn have different APIs for certain class fields. In this particular case, the ClientBase using reflection to to address this issue, and we need to different way to test the ClientBase's method. Original ClientBaseSuite using getFieldValue() method to do this. But it doesn't work for yarn-alpha as the API returns an array of String instead of just String (which is the case for Yarn-stable API). --- .../spark/deploy/yarn/ClientBaseSuite.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index 5480eca7c832c..c3b7a2c8f02e5 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -38,6 +38,7 @@ import org.scalatest.Matchers import scala.collection.JavaConversions._ import scala.collection.mutable.{ HashMap => MutableHashMap } +import scala.reflect.ClassTag import scala.util.Try import org.apache.spark.{SparkException, SparkConf} @@ -200,9 +201,10 @@ class ClientBaseSuite extends FunSuite with Matchers { val knownDefMRAppCP: Seq[String] = - getFieldValue[String, Seq[String]](classOf[MRJobConfig], - "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", - Seq[String]())(a => a.split(",")) + getFieldValue2[String, Array[String], Seq[String]]( + classOf[MRJobConfig], + "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", + Seq[String]())(a => a.split(","))(a => a.toSeq) val knownYARNAppCP = Some(Seq("/known/yarn/path")) @@ -232,6 +234,17 @@ class ClientBaseSuite extends FunSuite with Matchers { def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults) + def getFieldValue2[A: ClassTag, A1: ClassTag, B]( + clazz: Class[_], + field: String, + defaults: => B)(mapTo: A => B)(mapTo1: A1 => B) : B = { + Try(clazz.getField(field)).map(_.get(null)).map { + case v: A => mapTo(v) + case v1: A1 => mapTo1(v1) + case _ => defaults + }.toOption.getOrElse(defaults) + } + private class DummyClient( val args: ClientArguments, val conf: Configuration,