From 3ce4a5dba74399991d5163ee7f4f7e86cf272c77 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 2 Nov 2016 00:41:30 -0700 Subject: [PATCH 1/5] [SPARK-18219] Move commit protocol API from sql/core to core module --- .../internal/io/FileCommitProtocol.scala | 128 +++++++++ .../io/HadoopMapReduceCommitProtocol.scala | 116 ++++++++ .../datasources/FileCommitProtocol.scala | 257 ------------------ .../datasources/FileFormatWriter.scala | 3 +- .../InsertIntoHadoopFsRelationCommand.scala | 1 + .../SQLHadoopMapReduceCommitProtocol.scala | 72 +++++ .../execution/streaming/FileStreamSink.scala | 3 +- .../ManifestFileCommitProtocol.scala | 4 +- .../apache/spark/sql/internal/SQLConf.scala | 4 +- 9 files changed, 325 insertions(+), 263 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala create mode 100644 core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala new file mode 100644 index 0000000000000..257154bbb847a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce._ + +import org.apache.spark.util.Utils + + +/** + * An interface to define how a single Spark job commits its outputs. Two notes: + * + * 1. Implementations must be serializable, as the committer instance instantiated on the driver + * will be used for tasks on executors. + * 2. A committer should not be reused across multiple Spark jobs. + * + * The proper call sequence is: + * + * 1. Driver calls setupJob. + * 2. As part of each task's execution, executor calls setupTask and then commitTask + * (or abortTask if task failed). + * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job + * failed to execute (e.g. too many failed tasks), the job should call abortJob. + */ +abstract class FileCommitProtocol { + import FileCommitProtocol._ + + /** + * Setups up a job. Must be called on the driver before any other methods can be invoked. + */ + def setupJob(jobContext: JobContext): Unit + + /** + * Commits a job after the writes succeed. Must be called on the driver. + */ + def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit + + /** + * Aborts a job after the writes fail. Must be called on the driver. + * + * Calling this function is a best-effort attempt, because it is possible that the driver + * just crashes (or killed) before it can call abort. + */ + def abortJob(jobContext: JobContext): Unit + + /** + * Sets up a task within a job. + * Must be called before any other task related methods can be invoked. + */ + def setupTask(taskContext: TaskAttemptContext): Unit + + /** + * Notifies the commit protocol to add a new file, and gets back the full path that should be + * used. Must be called on the executors when running tasks. + * + * Note that the returned temp file may have an arbitrary path. The commit protocol only + * promises that the file will be at the location specified by the arguments after job commit. + * + * A full file path consists of the following parts: + * 1. the base path + * 2. some sub-directory within the base path, used to specify partitioning + * 3. file prefix, usually some unique job id with the task id + * 4. bucket id + * 5. source specific file extension, e.g. ".snappy.parquet" + * + * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest + * are left to the commit protocol implementation to decide. + */ + def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + + /** + * Commits a task after the writes succeed. Must be called on the executors when running tasks. + */ + def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage + + /** + * Aborts a task after the writes have failed. Must be called on the executors when running tasks. + * + * Calling this function is a best-effort attempt, because it is possible that the executor + * just crashes (or killed) before it can call abort. + */ + def abortTask(taskContext: TaskAttemptContext): Unit +} + + +object FileCommitProtocol { + class TaskCommitMessage(val obj: Any) extends Serializable + + object EmptyTaskCommitMessage extends TaskCommitMessage(null) + + /** + * Instantiates a FileCommitProtocol using the given className. + */ + def instantiate(className: String, outputPath: String, isAppend: Boolean): FileCommitProtocol = { + try { + val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]] + + // First try the one with argument (outputPath: String, isAppend: Boolean). + // If that doesn't exist, try the one with (outputPath: String). + try { + val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[Boolean]) + ctor.newInstance(outputPath, isAppend.asInstanceOf[java.lang.Boolean]) + } catch { + case _: NoSuchMethodException => + val ctor = clazz.getDeclaredConstructor(classOf[String]) + ctor.newInstance(outputPath) + } + } catch { + case e: ClassNotFoundException => + throw e + } + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala new file mode 100644 index 0000000000000..4cb889f8be493 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import java.util.{Date, UUID} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import org.apache.spark.SparkHadoopWriter +import org.apache.spark.internal.Logging +import org.apache.spark.mapred.SparkHadoopMapRedUtil + +/** + * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter + * (from the newer mapreduce API, not the old mapred API). + * + * Unlike Hadoop's OutputCommitter, this implementation is serializable. + */ +class HadoopMapReduceCommitProtocol(path: String) + extends FileCommitProtocol with Serializable with Logging { + + import FileCommitProtocol._ + + /** OutputCommitter from Hadoop is not serializable so marking it transient. */ + @transient private var committer: OutputCommitter = _ + + /** UUID used to identify the job in file name. */ + private val uuid: String = UUID.randomUUID().toString + + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { + context.getOutputFormatClass.newInstance().getOutputCommitter(context) + } + + override def newTaskTempFile( + taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + val filename = f"part-$split%05d-$uuid$ext" + + taskContext.getJobID + + val stagingDir: String = committer match { + // For FileOutputCommitter it has its own staging path called "work path". + case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path) + case _ => path + } + + dir.map { d => + new Path(new Path(stagingDir, d), filename).toString + }.getOrElse { + new Path(stagingDir, filename).toString + } + } + + override def setupJob(jobContext: JobContext): Unit = { + // Setup IDs + val jobId = SparkHadoopWriter.createJobID(new Date, 0) + val taskId = new TaskID(jobId, TaskType.MAP, 0) + val taskAttemptId = new TaskAttemptID(taskId, 0) + + // Set up the configuration object + jobContext.getConfiguration.set("mapred.job.id", jobId.toString) + jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString) + jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString) + jobContext.getConfiguration.setBoolean("mapred.task.is.map", true) + jobContext.getConfiguration.setInt("mapred.task.partition", 0) + + val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) + committer = setupCommitter(taskAttemptContext) + committer.setupJob(jobContext) + } + + override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { + committer.commitJob(jobContext) + } + + override def abortJob(jobContext: JobContext): Unit = { + committer.abortJob(jobContext, JobStatus.State.FAILED) + } + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + setupCommitter(taskContext) + committer.setupTask(taskContext) + } + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { + val attemptId = taskContext.getTaskAttemptID + SparkHadoopMapRedUtil.commitTask( + committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + EmptyTaskCommitMessage + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = { + committer.abortTask(taskContext) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala deleted file mode 100644 index f5dd5ce22919d..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import java.util.{Date, UUID} - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl - -import org.apache.spark.SparkHadoopWriter -import org.apache.spark.internal.Logging -import org.apache.spark.mapred.SparkHadoopMapRedUtil -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.Utils - - -object FileCommitProtocol { - class TaskCommitMessage(val obj: Any) extends Serializable - - object EmptyTaskCommitMessage extends TaskCommitMessage(null) - - /** - * Instantiates a FileCommitProtocol using the given className. - */ - def instantiate(className: String, outputPath: String, isAppend: Boolean): FileCommitProtocol = { - try { - val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]] - - // First try the one with argument (outputPath: String, isAppend: Boolean). - // If that doesn't exist, try the one with (outputPath: String). - try { - val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[Boolean]) - ctor.newInstance(outputPath, isAppend.asInstanceOf[java.lang.Boolean]) - } catch { - case _: NoSuchMethodException => - val ctor = clazz.getDeclaredConstructor(classOf[String]) - ctor.newInstance(outputPath) - } - } catch { - case e: ClassNotFoundException => - throw e - } - } -} - - -/** - * An interface to define how a single Spark job commits its outputs. Two notes: - * - * 1. Implementations must be serializable, as the committer instance instantiated on the driver - * will be used for tasks on executors. - * 2. A committer should not be reused across multiple Spark jobs. - * - * The proper call sequence is: - * - * 1. Driver calls setupJob. - * 2. As part of each task's execution, executor calls setupTask and then commitTask - * (or abortTask if task failed). - * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job - * failed to execute (e.g. too many failed tasks), the job should call abortJob. - */ -abstract class FileCommitProtocol { - import FileCommitProtocol._ - - /** - * Setups up a job. Must be called on the driver before any other methods can be invoked. - */ - def setupJob(jobContext: JobContext): Unit - - /** - * Commits a job after the writes succeed. Must be called on the driver. - */ - def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit - - /** - * Aborts a job after the writes fail. Must be called on the driver. - * - * Calling this function is a best-effort attempt, because it is possible that the driver - * just crashes (or killed) before it can call abort. - */ - def abortJob(jobContext: JobContext): Unit - - /** - * Sets up a task within a job. - * Must be called before any other task related methods can be invoked. - */ - def setupTask(taskContext: TaskAttemptContext): Unit - - /** - * Notifies the commit protocol to add a new file, and gets back the full path that should be - * used. Must be called on the executors when running tasks. - * - * Note that the returned temp file may have an arbitrary path. The commit protocol only - * promises that the file will be at the location specified by the arguments after job commit. - * - * A full file path consists of the following parts: - * 1. the base path - * 2. some sub-directory within the base path, used to specify partitioning - * 3. file prefix, usually some unique job id with the task id - * 4. bucket id - * 5. source specific file extension, e.g. ".snappy.parquet" - * - * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest - * are left to the commit protocol implementation to decide. - */ - def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String - - /** - * Commits a task after the writes succeed. Must be called on the executors when running tasks. - */ - def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage - - /** - * Aborts a task after the writes have failed. Must be called on the executors when running tasks. - * - * Calling this function is a best-effort attempt, because it is possible that the executor - * just crashes (or killed) before it can call abort. - */ - def abortTask(taskContext: TaskAttemptContext): Unit -} - - -/** - * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter - * (from the newer mapreduce API, not the old mapred API). - * - * Unlike Hadoop's OutputCommitter, this implementation is serializable. - */ -class HadoopCommitProtocolWrapper(path: String, isAppend: Boolean) - extends FileCommitProtocol with Serializable with Logging { - - import FileCommitProtocol._ - - /** OutputCommitter from Hadoop is not serializable so marking it transient. */ - @transient private var committer: OutputCommitter = _ - - /** UUID used to identify the job in file name. */ - private val uuid: String = UUID.randomUUID().toString - - private def setupCommitter(context: TaskAttemptContext): Unit = { - committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context) - - if (!isAppend) { - // If we are appending data to an existing dir, we will only use the output committer - // associated with the file output format since it is not safe to use a custom - // committer for appending. For example, in S3, direct parquet output committer may - // leave partial data in the destination dir when the appending job fails. - // See SPARK-8578 for more details. - val configuration = context.getConfiguration - val clazz = - configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) - - if (clazz != null) { - logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") - - // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat - // has an associated output committer. To override this output committer, - // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. - // If a data source needs to override the output committer, it needs to set the - // output committer in prepareForWrite method. - if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) { - // The specified output committer is a FileOutputCommitter. - // So, we will use the FileOutputCommitter-specified constructor. - val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - committer = ctor.newInstance(new Path(path), context) - } else { - // The specified output committer is just an OutputCommitter. - // So, we will use the no-argument constructor. - val ctor = clazz.getDeclaredConstructor() - committer = ctor.newInstance() - } - } - } - logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") - } - - override def newTaskTempFile( - taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet - // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, - // the file name is fine and won't overflow. - val split = taskContext.getTaskAttemptID.getTaskID.getId - val filename = f"part-$split%05d-$uuid$ext" - - val stagingDir: String = committer match { - // For FileOutputCommitter it has its own staging path called "work path". - case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path) - case _ => path - } - - dir.map { d => - new Path(new Path(stagingDir, d), filename).toString - }.getOrElse { - new Path(stagingDir, filename).toString - } - } - - override def setupJob(jobContext: JobContext): Unit = { - // Setup IDs - val jobId = SparkHadoopWriter.createJobID(new Date, 0) - val taskId = new TaskID(jobId, TaskType.MAP, 0) - val taskAttemptId = new TaskAttemptID(taskId, 0) - - // Set up the configuration object - jobContext.getConfiguration.set("mapred.job.id", jobId.toString) - jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString) - jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString) - jobContext.getConfiguration.setBoolean("mapred.task.is.map", true) - jobContext.getConfiguration.setInt("mapred.task.partition", 0) - - val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) - setupCommitter(taskAttemptContext) - - committer.setupJob(jobContext) - } - - override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { - committer.commitJob(jobContext) - } - - override def abortJob(jobContext: JobContext): Unit = { - committer.abortJob(jobContext, JobStatus.State.FAILED) - } - - override def setupTask(taskContext: TaskAttemptContext): Unit = { - setupCommitter(taskContext) - committer.setupTask(taskContext) - } - - override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { - val attemptId = taskContext.getTaskAttemptID - SparkHadoopMapRedUtil.commitTask( - committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) - EmptyTaskCommitMessage - } - - override def abortTask(taskContext: TaskAttemptContext): Unit = { - committer.abortTask(taskContext) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index bc00a0a749c09..e404dcd5452b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -29,6 +29,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -37,7 +39,6 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter} -import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 230c74a47ba2a..881131d11a44d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -21,6 +21,7 @@ import java.io.IOException import org.apache.hadoop.fs.Path +import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala new file mode 100644 index 0000000000000..03e925425e429 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.sql.internal.SQLConf + +/** + * A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual + * Hadoop output committer using an option specified in SQLConf. + */ +class SQLHadoopMapReduceCommitProtocol(path: String, isAppend: Boolean) + extends HadoopMapReduceCommitProtocol(path) with Serializable with Logging { + + override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { + var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context) + + if (!isAppend) { + // If we are appending data to an existing dir, we will only use the output committer + // associated with the file output format since it is not safe to use a custom + // committer for appending. For example, in S3, direct parquet output committer may + // leave partial data in the destination dir when the appending job fails. + // See SPARK-8578 for more details. + val configuration = context.getConfiguration + val clazz = + configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) + + if (clazz != null) { + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat + // has an associated output committer. To override this output committer, + // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. + // If a data source needs to override the output committer, it needs to set the + // output committer in prepareForWrite method. + if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) { + // The specified output committer is a FileOutputCommitter. + // So, we will use the FileOutputCommitter-specified constructor. + val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + committer = ctor.newInstance(new Path(path), context) + } else { + // The specified output committer is just an OutputCommitter. + // So, we will use the no-argument constructor. + val ctor = clazz.getDeclaredConstructor() + committer = ctor.newInstance() + } + } + } + logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") + committer + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index daec2b5450971..1271ab7b8b1cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.streaming import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.datasources.{FileCommitProtocol, FileFormat, FileFormatWriter} +import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter} object FileStreamSink { // The name of the subdirectory that is used to store metadata about which files are valid. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 510312267a98d..df48c722fa7c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -25,8 +25,8 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.datasources.FileCommitProtocol -import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage /** * A [[FileCommitProtocol]] that tracks the list of valid files in a manifest file, used in diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7bb3ac02fa5d0..7b8ed65054c3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.execution.datasources.HadoopCommitProtocolWrapper +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol import org.apache.spark.util.Utils @@ -385,7 +385,7 @@ object SQLConf { SQLConfigBuilder("spark.sql.sources.commitProtocolClass") .internal() .stringConf - .createWithDefault(classOf[HadoopCommitProtocolWrapper].getName) + .createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName) val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold") From 1866ce5569413b2c26691a49627a8b17f30a7bdc Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 2 Nov 2016 00:50:36 -0700 Subject: [PATCH 2/5] Set Job ID. --- .../spark/internal/io/FileCommitProtocol.scala | 13 ++++++++----- .../internal/io/HadoopMapReduceCommitProtocol.scala | 11 ++++------- .../InsertIntoHadoopFsRelationCommand.scala | 5 +++-- .../SQLHadoopMapReduceCommitProtocol.scala | 4 ++-- .../sql/execution/streaming/FileStreamSink.scala | 5 ++++- .../streaming/ManifestFileCommitProtocol.scala | 2 +- 6 files changed, 22 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 257154bbb847a..bcb8dc6cbb597 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -27,7 +27,9 @@ import org.apache.spark.util.Utils * * 1. Implementations must be serializable, as the committer instance instantiated on the driver * will be used for tasks on executors. - * 2. A committer should not be reused across multiple Spark jobs. + * 2. Implementations should have a constructor with either 2 or 3 arguments: + * (jobId: String, path: String) or (jobId: String, path: String, isAppend: Boolean). + * 3. A committer should not be reused across multiple Spark jobs. * * The proper call sequence is: * @@ -106,19 +108,20 @@ object FileCommitProtocol { /** * Instantiates a FileCommitProtocol using the given className. */ - def instantiate(className: String, outputPath: String, isAppend: Boolean): FileCommitProtocol = { + def instantiate(className: String, jobId: String, outputPath: String, isAppend: Boolean) + : FileCommitProtocol = { try { val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]] // First try the one with argument (outputPath: String, isAppend: Boolean). // If that doesn't exist, try the one with (outputPath: String). try { - val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[Boolean]) - ctor.newInstance(outputPath, isAppend.asInstanceOf[java.lang.Boolean]) + val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean]) + ctor.newInstance(jobId, outputPath, isAppend.asInstanceOf[java.lang.Boolean]) } catch { case _: NoSuchMethodException => val ctor = clazz.getDeclaredConstructor(classOf[String]) - ctor.newInstance(outputPath) + ctor.newInstance(jobId, outputPath) } } catch { case e: ClassNotFoundException => diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 4cb889f8be493..12d78947705bb 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -17,7 +17,7 @@ package org.apache.spark.internal.io -import java.util.{Date, UUID} +import java.util.Date import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ @@ -34,7 +34,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * * Unlike Hadoop's OutputCommitter, this implementation is serializable. */ -class HadoopMapReduceCommitProtocol(path: String) +class HadoopMapReduceCommitProtocol(jobId: String, path: String) extends FileCommitProtocol with Serializable with Logging { import FileCommitProtocol._ @@ -42,20 +42,17 @@ class HadoopMapReduceCommitProtocol(path: String) /** OutputCommitter from Hadoop is not serializable so marking it transient. */ @transient private var committer: OutputCommitter = _ - /** UUID used to identify the job in file name. */ - private val uuid: String = UUID.randomUUID().toString - protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { context.getOutputFormatClass.newInstance().getOutputCommitter(context) } override def newTaskTempFile( - taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. val split = taskContext.getTaskAttemptID.getTaskID.getId - val filename = f"part-$split%05d-$uuid$ext" + val filename = f"part-$split%05d-$jobId$ext" taskContext.getJobID diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 881131d11a44d..927c0c5b95a17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -87,8 +87,9 @@ case class InsertIntoHadoopFsRelationCommand( if (doInsertion) { val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, - outputPath.toString, - isAppend) + jobId = java.util.UUID.randomUUID().toString, + outputPath = outputPath.toString, + isAppend = isAppend) FileFormatWriter.write( sparkSession = sparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 03e925425e429..9b9ed28412cac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -29,8 +29,8 @@ import org.apache.spark.sql.internal.SQLConf * A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual * Hadoop output committer using an option specified in SQLConf. */ -class SQLHadoopMapReduceCommitProtocol(path: String, isAppend: Boolean) - extends HadoopMapReduceCommitProtocol(path) with Serializable with Logging { +class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging { override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 1271ab7b8b1cf..e9af745a05662 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -55,7 +55,10 @@ class FileStreamSink( logInfo(s"Skipping already committed batch $batchId") } else { val committer = FileCommitProtocol.instantiate( - sparkSession.sessionState.conf.streamingFileCommitProtocolClass, path, isAppend = false) + sparkSession.sessionState.conf.streamingFileCommitProtocolClass, + jobId = batchId.toString, + outputPath = path, + isAppend = false) committer match { case manifestCommitter: ManifestFileCommitProtocol => manifestCommitter.setupManifestOptions(fileLog, batchId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index df48c722fa7c9..1fe13fa1623fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage * * @param path path to write the final output to. */ -class ManifestFileCommitProtocol(path: String) +class ManifestFileCommitProtocol(jobId: String, path: String) extends FileCommitProtocol with Serializable with Logging { // Track the list of files added by a task, only used on the executors. From 3e12ed34bb8be6a020cfac661c65b6d31cc70dfa Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 2 Nov 2016 00:51:51 -0700 Subject: [PATCH 3/5] Remove extra code --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 12d78947705bb..c3a448f8b3c1f 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -54,8 +54,6 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val split = taskContext.getTaskAttemptID.getTaskID.getId val filename = f"part-$split%05d-$jobId$ext" - taskContext.getJobID - val stagingDir: String = committer match { // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path) From 34629f90f5cc438226dd3e0360cf517e2b592dd4 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 2 Nov 2016 11:30:43 -0700 Subject: [PATCH 4/5] Get rid of NPE --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index c3a448f8b3c1f..66ccb6d437708 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -94,7 +94,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) } override def setupTask(taskContext: TaskAttemptContext): Unit = { - setupCommitter(taskContext) + committer = setupCommitter(taskContext) committer.setupTask(taskContext) } From 1dffb5905981ab4c2ef1cd45178b70638fef344c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 2 Nov 2016 23:48:01 -0700 Subject: [PATCH 5/5] Fix FileStreamSink hanging. --- .../internal/io/FileCommitProtocol.scala | 23 ++++++++----------- .../execution/streaming/FileStreamSink.scala | 3 ++- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index bcb8dc6cbb597..fb8020585cf89 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -110,22 +110,17 @@ object FileCommitProtocol { */ def instantiate(className: String, jobId: String, outputPath: String, isAppend: Boolean) : FileCommitProtocol = { + val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]] + + // First try the one with argument (jobId: String, outputPath: String, isAppend: Boolean). + // If that doesn't exist, try the one with (jobId: string, outputPath: String). try { - val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]] - - // First try the one with argument (outputPath: String, isAppend: Boolean). - // If that doesn't exist, try the one with (outputPath: String). - try { - val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean]) - ctor.newInstance(jobId, outputPath, isAppend.asInstanceOf[java.lang.Boolean]) - } catch { - case _: NoSuchMethodException => - val ctor = clazz.getDeclaredConstructor(classOf[String]) - ctor.newInstance(jobId, outputPath) - } + val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean]) + ctor.newInstance(jobId, outputPath, isAppend.asInstanceOf[java.lang.Boolean]) } catch { - case e: ClassNotFoundException => - throw e + case _: NoSuchMethodException => + val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String]) + ctor.newInstance(jobId, outputPath) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index e9af745a05662..e849cafef4184 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -55,10 +55,11 @@ class FileStreamSink( logInfo(s"Skipping already committed batch $batchId") } else { val committer = FileCommitProtocol.instantiate( - sparkSession.sessionState.conf.streamingFileCommitProtocolClass, + className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, outputPath = path, isAppend = false) + committer match { case manifestCommitter: ManifestFileCommitProtocol => manifestCommitter.setupManifestOptions(fileLog, batchId)