From 6bac1531929e914764d980e4eb4228a10436876b Mon Sep 17 00:00:00 2001 From: zhoukang Date: Mon, 11 Jun 2018 16:57:11 +0800 Subject: [PATCH 1/4] [SPARK][CORE] No need to warning when output commit coordination enabled --- .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index e68c6b1366c7f..bbb0d90f3004b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1053,7 +1053,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // users that they may loss data if they are using a direct output committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", false) val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") - if (speculationEnabled && outputCommitterClass.contains("Direct")) { + val outputCommitCoordinationEnabled = self.conf.getBoolean( + "spark.hadoop.outputCommitCoordination.enabled", true) + if (speculationEnabled && outputCommitterClass.contains("Direct") + && !outputCommitCoordinationEnabled) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + From ee450f517b3df5c61ed6cce5513ec07fc898590b Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 12 Jul 2018 17:07:28 +0800 Subject: [PATCH 2/4] Refine comment --- .../scala/org/apache/spark/internal/config/package.scala | 5 +++++ .../org/apache/spark/mapred/SparkHadoopMapRedUtil.scala | 5 +++-- .../scala/org/apache/spark/rdd/PairRDDFunctions.scala | 4 ++-- .../OutputCommitCoordinatorIntegrationSuite.scala | 3 ++- .../spark/scheduler/OutputCommitCoordinatorSuite.scala | 3 ++- .../apache/spark/sql/hive/execution/HiveFileFormat.scala | 8 +++++--- 6 files changed, 19 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a54b091a64d50..d9d52c48669af 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -357,6 +357,11 @@ package object config { .intConf .createWithDefault(256) + private[spark] val HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED = + ConfigBuilder("spark.hadoop.outputCommitCoordination.enabled") + .booleanConf + .createWithDefault(true) + private[spark] val NETWORK_AUTH_ENABLED = ConfigBuilder("spark.authenticate") .booleanConf diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 764735dc4eae7..3936d4b8a68e4 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter} import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.executor.CommitDeniedException import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ object SparkHadoopMapRedUtil extends Logging { /** @@ -33,7 +34,7 @@ object SparkHadoopMapRedUtil extends Logging { * the driver in order to determine whether this attempt can commit (please see SPARK-4879 for * details). * - * Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled` + * Output commit coordinator is only used when [[HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED]] * is set to true (which is the default). */ def commitTask( @@ -64,7 +65,7 @@ object SparkHadoopMapRedUtil extends Logging { // We only need to coordinate with the driver if there are concurrent task attempts. // Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029). // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs. - sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true) + sparkConf.get(HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED) } if (shouldCoordinateWithDriver) { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index bbb0d90f3004b..72c7371fa3a65 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -36,6 +36,7 @@ import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.internal.io._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer @@ -1053,8 +1054,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // users that they may loss data if they are using a direct output committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", false) val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") - val outputCommitCoordinationEnabled = self.conf.getBoolean( - "spark.hadoop.outputCommitCoordination.enabled", true) + val outputCommitCoordinationEnabled = self.conf.get(HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED) if (speculationEnabled && outputCommitterClass.contains("Direct") && !outputCommitCoordinationEnabled) { val warningMessage = diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index d6ff5bb33055c..fb65d8bf25310 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Seconds, Span} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext} +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils /** @@ -40,7 +41,7 @@ class OutputCommitCoordinatorIntegrationSuite override def beforeAll(): Unit = { super.beforeAll() val conf = new SparkConf() - .set("spark.hadoop.outputCommitCoordination.enabled", "true") + .set(HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED.key, "true") .set("spark.hadoop.mapred.output.committer.class", classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName) sc = new SparkContext("local[2, 4]", "test", conf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 03b1903902491..e0c2c47300b1c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -33,6 +33,7 @@ import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter import org.apache.spark._ +import org.apache.spark.internal.config._ import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapRedCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.rdd.{FakeOutputCommitter, RDD} import org.apache.spark.util.{ThreadUtils, Utils} @@ -79,7 +80,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val conf = new SparkConf() .setMaster("local[4]") .setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName) - .set("spark.hadoop.outputCommitCoordination.enabled", "true") + .set(HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED.key, "true") sc = new SparkContext(conf) { override private[spark] def createSparkEnv( conf: SparkConf, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 4a7cd6901923b..0a1060933a6ff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ - import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} @@ -28,8 +27,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{JobConf, Reporter} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} - import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory} @@ -71,7 +70,10 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) // users that they may loss data if they are using a direct output committer. val speculationEnabled = sparkSession.sparkContext.conf.getBoolean("spark.speculation", false) val outputCommitterClass = conf.get("mapred.output.committer.class", "") - if (speculationEnabled && outputCommitterClass.contains("Direct")) { + val outputCommitCoordinationEnabled = + sparkSession.sparkContext.conf.get(HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED) + if (speculationEnabled && outputCommitterClass.contains("Direct") + && !outputCommitCoordinationEnabled) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + From 63a62dbf984aa760e48a16c014cfcf4a91fcfd7e Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 12 Jul 2018 19:00:12 +0800 Subject: [PATCH 3/4] Fix code style --- .../org/apache/spark/sql/hive/execution/HiveFileFormat.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 0a1060933a6ff..bcf0908ed9406 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{JobConf, Reporter} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} + import org.apache.spark.internal.Logging import org.apache.spark.internal.config.HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED import org.apache.spark.sql.SparkSession From c233c725110d66fc712a96a253684cdb02b19e23 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 12 Jul 2018 19:48:37 +0800 Subject: [PATCH 4/4] Add coc for new conf --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d9d52c48669af..322baf8603045 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -359,6 +359,8 @@ package object config { private[spark] val HADOOP_OUTPUTCOMMITCOORDINATION_ENABLED = ConfigBuilder("spark.hadoop.outputCommitCoordination.enabled") + .doc("when enabled, tasks will coordinate with the driver to make sure that," + + " for a certain partition, at most one task attempt can commit.") .booleanConf .createWithDefault(true)