From 8b6be94801ca33bca32aa574b1a8f6a76760869d Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Mon, 15 Sep 2014 11:38:59 -0400 Subject: [PATCH 1/4] Add ability to specify OutputCommitter, espcially useful when writing to an S3 bucket from an EMR cluster --- .../apache/spark/rdd/PairRDDFunctions.scala | 7 +- .../spark/rdd/PairRDDFunctionsSuite.scala | 98 ++++++++++++++----- .../org/apache/spark/examples/AwsTest.scala | 52 ++++++++++ 3 files changed, 133 insertions(+), 24 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/AwsTest.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f6d9d12fe9006..5ba8a47c7b785 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } - hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + + // Useful on EMR where direct output committer is set by default + if (conf.getOutputCommitter == null) { + hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + } + FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriter.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 63d3ddb4af98a..3cc60b41016e8 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -17,17 +17,21 @@ package org.apache.spark.rdd -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashSet +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.mapred._ +import org.apache.hadoop.util.Progressable + +import scala.collection.mutable.{ArrayBuffer, HashSet} import scala.util.Random -import org.scalatest.FunSuite import com.google.common.io.Files -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.conf.{Configuration, Configurable} - -import org.apache.spark.SparkContext._ +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter, +OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, +TaskAttemptContext => NewTaskAttempContext} import org.apache.spark.{Partitioner, SharedSparkContext} +import org.apache.spark.SparkContext._ +import org.scalatest.FunSuite class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { test("aggregateByKey") { @@ -467,7 +471,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) // No error, non-configurable formats still work - pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored") + pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored") /* Check that configurable formats get configured: @@ -478,6 +482,15 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") } + test("saveAsHadoopFile should respect configured output committers") { + val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) + val conf = new JobConf(sc.hadoopConfiguration) + conf.setOutputCommitter(classOf[FakeOutputCommitter]) + pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf) + val ran = sys.props.remove("mapred.committer.ran") + assert(ran.isDefined, "OutputCommitter was never called") + } + test("lookup") { val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) @@ -621,40 +634,79 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile tries to instantiate them with Class.newInstance. */ + +/* + * Original Hadoop API + */ class FakeWriter extends RecordWriter[Integer, Integer] { + override def write(key: Integer, value: Integer): Unit = () + + override def close(reporter: Reporter): Unit = () +} + +class FakeOutputCommitter() extends OutputCommitter() { + override def setupJob(jobContext: JobContext): Unit = () + + override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true - def close(p1: TaskAttemptContext) = () + override def setupTask(taskContext: TaskAttemptContext): Unit = () + + override def commitTask(taskContext: TaskAttemptContext): Unit = { + sys.props("mapred.committer.ran") = "true" + () + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = () +} + +class FakeOutputFormat() extends OutputFormat[Integer, Integer]() { + override def getRecordWriter( + ignored: FileSystem, + job: JobConf, name: String, + progress: Progressable): RecordWriter[Integer, Integer] = { + new FakeWriter() + } + + override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = () +} + +/* + * New-style Hadoop API + */ +class NewFakeWriter extends NewRecordWriter[Integer, Integer] { + + def close(p1: NewTaskAttempContext) = () def write(p1: Integer, p2: Integer) = () } -class FakeCommitter extends OutputCommitter { - def setupJob(p1: JobContext) = () +class NewFakeCommitter extends NewOutputCommitter { + def setupJob(p1: NewJobContext) = () - def needsTaskCommit(p1: TaskAttemptContext): Boolean = false + def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false - def setupTask(p1: TaskAttemptContext) = () + def setupTask(p1: NewTaskAttempContext) = () - def commitTask(p1: TaskAttemptContext) = () + def commitTask(p1: NewTaskAttempContext) = () - def abortTask(p1: TaskAttemptContext) = () + def abortTask(p1: NewTaskAttempContext) = () } -class FakeFormat() extends OutputFormat[Integer, Integer]() { +class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() { - def checkOutputSpecs(p1: JobContext) = () + def checkOutputSpecs(p1: NewJobContext) = () - def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = { - new FakeWriter() + def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { + new NewFakeWriter() } - def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = { - new FakeCommitter() + def getOutputCommitter(p1: NewTaskAttempContext): NewOutputCommitter = { + new NewFakeCommitter() } } -class ConfigTestFormat() extends FakeFormat() with Configurable { +class ConfigTestFormat() extends NewFakeFormat() with Configurable { var setConfCalled = false def setConf(p1: Configuration) = { @@ -664,7 +716,7 @@ class ConfigTestFormat() extends FakeFormat() with Configurable { def getConf: Configuration = null - override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = { + override def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { assert(setConfCalled, "setConf was never called") super.getRecordWriter(p1) } diff --git a/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala b/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala new file mode 100644 index 0000000000000..361f5b5ccaafa --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ + +object AwsTest { + def main(args: Array[String]) { + val (ak, sk, in, out) = args match { + case Array(ak, sk, in, out) ⇒ (ak, sk, in, out) + case _ ⇒ sys.error("Usage: AwsTest AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY s3://INPUT s3://OUTPUT") + } + + val sparkConf = new SparkConf().setAppName("AwsTest") + val sc = new SparkContext(sparkConf) + + /* + * Example setup that closely resembles Elastic MapReduce configuration + */ + sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", ak) + sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", sk) + //sc.hadoopConfiguration.set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter") + sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") + + val file = sc.textFile(in) + val counts = file.flatMap { line => + line.split("\\s") + }.map { + word => (word, 1) + }.reduceByKey(_ + _) + + counts.saveAsTextFile(out) + + sc.stop() + } +} From 4359664b1d557d55b0579023df809542386d5b8c Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Thu, 18 Sep 2014 16:18:57 -0400 Subject: [PATCH 2/4] Add an example showing usage --- .../org/apache/spark/SparkHadoopWriter.scala | 2 +- .../org/apache/spark/examples/AwsTest.scala | 46 +++++++++++++++++-- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index f6703986bdf11..376e69cd997d5 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } } } else { - logWarning ("No need to commit output of task: " + taID.value) + logInfo ("No need to commit output of task: " + taID.value) } } diff --git a/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala b/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala index 361f5b5ccaafa..6dbd7791bf1c4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala @@ -17,14 +17,49 @@ package org.apache.spark.examples +import org.apache.commons.logging.LogFactory +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.mapred._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +/** + * An OutputCommitter similar to the one used by default for s3:// URLs in EMR. + */ +class DirectOutputCommitter extends OutputCommitter { + private final val LOG = LogFactory.getLog("org.apache.spark.examples.DirectOutputCommitter") + + override def setupJob(jobContext: JobContext): Unit = { + LOG.info("Nothing to do in setupJob") + } + + override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = { + LOG.info("Nothing to do in needsTaskCommit"); false + } + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + LOG.info("Nothing to do in setupTask") + } + + override def commitTask(taskContext: TaskAttemptContext): Unit = { + LOG.info("Nothing to do in commitTask") + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = { + LOG.info("Nothing to do in abortTask") + } +} + +/** + * Run word count on text files stored in S3. + */ object AwsTest { def main(args: Array[String]) { val (ak, sk, in, out) = args match { case Array(ak, sk, in, out) ⇒ (ak, sk, in, out) - case _ ⇒ sys.error("Usage: AwsTest AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY s3://INPUT s3://OUTPUT") + case _ ⇒ { + sys.error("Usage: AwsTest AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY s3://INPUT s3://OUTPUT") + } } val sparkConf = new SparkConf().setAppName("AwsTest") @@ -35,15 +70,16 @@ object AwsTest { */ sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", ak) sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", sk) - //sc.hadoopConfiguration.set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter") + sc.hadoopConfiguration.set("mapred.output.committer.class", + "org.apache.spark.examples.DirectOutputCommitter") sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") val file = sc.textFile(in) val counts = file.flatMap { line => line.split("\\s") - }.map { - word => (word, 1) - }.reduceByKey(_ + _) + }.map { word => + (word, 1) + }.reduceByKey(_ + _, 10) counts.saveAsTextFile(out) From a11d9f3806e6a8d06d13417af9f27bfd3795334b Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Thu, 18 Sep 2014 16:52:17 -0400 Subject: [PATCH 3/4] Fix formatting --- examples/src/main/scala/org/apache/spark/examples/AwsTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala b/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala index 6dbd7791bf1c4..0b429dd73fd2c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala @@ -58,7 +58,7 @@ object AwsTest { val (ak, sk, in, out) = args match { case Array(ak, sk, in, out) ⇒ (ak, sk, in, out) case _ ⇒ { - sys.error("Usage: AwsTest AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY s3://INPUT s3://OUTPUT") + sys.error("Usage: AwsTest AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY s3://INPUT s3://OUTPUT") } } From f37a0e5755ba37042489a36f150ffa4ecf7e4629 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Fri, 19 Sep 2014 17:33:33 -0400 Subject: [PATCH 4/4] Update based on comments from pwendell --- .../apache/spark/rdd/PairRDDFunctions.scala | 2 +- .../spark/rdd/PairRDDFunctionsSuite.scala | 17 +++- .../org/apache/spark/examples/AwsTest.scala | 88 ------------------- 3 files changed, 14 insertions(+), 93 deletions(-) delete mode 100644 examples/src/main/scala/org/apache/spark/examples/AwsTest.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 5ba8a47c7b785..51ba8c2d17834 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -873,7 +873,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } - // Useful on EMR where direct output committer is set by default + // Use configured output committer if already set if (conf.getOutputCommitter == null) { hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 3cc60b41016e8..e84cc69592339 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -484,11 +484,13 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { test("saveAsHadoopFile should respect configured output committers") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) - val conf = new JobConf(sc.hadoopConfiguration) + val conf = new JobConf() conf.setOutputCommitter(classOf[FakeOutputCommitter]) + + FakeOutputCommitter.ran = false pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf) - val ran = sys.props.remove("mapred.committer.ran") - assert(ran.isDefined, "OutputCommitter was never called") + + assert(FakeOutputCommitter.ran, "OutputCommitter was never called") } test("lookup") { @@ -652,13 +654,20 @@ class FakeOutputCommitter() extends OutputCommitter() { override def setupTask(taskContext: TaskAttemptContext): Unit = () override def commitTask(taskContext: TaskAttemptContext): Unit = { - sys.props("mapred.committer.ran") = "true" + FakeOutputCommitter.ran = true () } override def abortTask(taskContext: TaskAttemptContext): Unit = () } +/* + * Used to communicate state between the test harness and the OutputCommitter. + */ +object FakeOutputCommitter { + var ran = false +} + class FakeOutputFormat() extends OutputFormat[Integer, Integer]() { override def getRecordWriter( ignored: FileSystem, diff --git a/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala b/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala deleted file mode 100644 index 0b429dd73fd2c..0000000000000 --- a/examples/src/main/scala/org/apache/spark/examples/AwsTest.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples - -import org.apache.commons.logging.LogFactory -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapred._ -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.SparkContext._ - -/** - * An OutputCommitter similar to the one used by default for s3:// URLs in EMR. - */ -class DirectOutputCommitter extends OutputCommitter { - private final val LOG = LogFactory.getLog("org.apache.spark.examples.DirectOutputCommitter") - - override def setupJob(jobContext: JobContext): Unit = { - LOG.info("Nothing to do in setupJob") - } - - override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = { - LOG.info("Nothing to do in needsTaskCommit"); false - } - - override def setupTask(taskContext: TaskAttemptContext): Unit = { - LOG.info("Nothing to do in setupTask") - } - - override def commitTask(taskContext: TaskAttemptContext): Unit = { - LOG.info("Nothing to do in commitTask") - } - - override def abortTask(taskContext: TaskAttemptContext): Unit = { - LOG.info("Nothing to do in abortTask") - } -} - -/** - * Run word count on text files stored in S3. - */ -object AwsTest { - def main(args: Array[String]) { - val (ak, sk, in, out) = args match { - case Array(ak, sk, in, out) ⇒ (ak, sk, in, out) - case _ ⇒ { - sys.error("Usage: AwsTest AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY s3://INPUT s3://OUTPUT") - } - } - - val sparkConf = new SparkConf().setAppName("AwsTest") - val sc = new SparkContext(sparkConf) - - /* - * Example setup that closely resembles Elastic MapReduce configuration - */ - sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", ak) - sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", sk) - sc.hadoopConfiguration.set("mapred.output.committer.class", - "org.apache.spark.examples.DirectOutputCommitter") - sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") - - val file = sc.textFile(in) - val counts = file.flatMap { line => - line.split("\\s") - }.map { word => - (word, 1) - }.reduceByKey(_ + _, 10) - - counts.saveAsTextFile(out) - - sc.stop() - } -}