diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 9107c9b67681f..2786e3d2cd6bf 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1348,6 +1348,34 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext` support. + + spark.sql.parquet.output.committer.class + org.apache.parquet.hadoop.
ParquetOutputCommitter
+ +

+ The output committer class used by Parquet. The specified class needs to be a subclass of + org.apache.hadoop.
mapreduce.OutputCommitter
. Typically, it's also a + subclass of org.apache.parquet.hadoop.ParquetOutputCommitter. +

+

+ Note: +

+

+

+ Spark SQL comes with a builtin + org.apache.spark.sql.
parquet.DirectParquetOutputCommitter
, which can be more + efficient then the default Parquet output committer when writing data to S3. +

+ + ## JSON Datasets @@ -1876,7 +1904,7 @@ that these options will be deprecated in future release as more optimizations ar Configures the number of partitions to use when shuffling data for joins or aggregations. - + spark.sql.planner.externalSort false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 16493c3d7c19c..265352647fa9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -22,6 +22,8 @@ import java.util.Properties import scala.collection.immutable import scala.collection.JavaConversions._ +import org.apache.parquet.hadoop.ParquetOutputCommitter + import org.apache.spark.sql.catalyst.CatalystConf private[spark] object SQLConf { @@ -252,9 +254,9 @@ private[spark] object SQLConf { val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown", defaultValue = Some(false), - doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default" + - " because of a known bug in Paruet 1.6.0rc3 " + - "(PARQUET-136). However, " + + doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default " + + "because of a known bug in Parquet 1.6.0rc3 " + + "(PARQUET-136, https://issues.apache.org/jira/browse/PARQUET-136). However, " + "if your table doesn't contain any nullable string or binary columns, it's still safe to " + "turn this feature on.") @@ -262,11 +264,21 @@ private[spark] object SQLConf { defaultValue = Some(true), doc = "") + val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf( + key = "spark.sql.parquet.output.committer.class", + defaultValue = Some(classOf[ParquetOutputCommitter].getName), + doc = "The output committer class used by Parquet. The specified class needs to be a " + + "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " + + "of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " + + "option must be set in Hadoop Configuration. 2. This option overrides " + + "\"spark.sql.sources.outputCommitterClass\"." + ) + val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown", defaultValue = Some(false), doc = "") - val HIVE_VERIFY_PARTITIONPATH = booleanConf("spark.sql.hive.verifyPartitionPath", + val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath", defaultValue = Some(true), doc = "") @@ -325,9 +337,13 @@ private[spark] object SQLConf { defaultValue = Some(true), doc = "") - // The output committer class used by FSBasedRelation. The specified class needs to be a + // The output committer class used by HadoopFsRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. - // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf` + // + // NOTE: + // + // 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*. + // 2. This option can be overriden by "spark.sql.parquet.output.committer.class". val OUTPUT_COMMITTER_CLASS = stringConf("spark.sql.sources.outputCommitterClass", isPublic = false) @@ -415,7 +431,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) /** When true uses verifyPartitionPath to prune the path which is not exists. */ - private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITIONPATH) + private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) /** When true the planner will use the external sort, which may spill to disk. */ private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala index 62c4e92ebec68..1551afd7b7bf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala @@ -17,19 +17,35 @@ package org.apache.spark.sql.parquet +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.Log import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat} +/** + * An output committer for writing Parquet files. In stead of writing to the `_temporary` folder + * like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the + * destination folder. This can be useful for data stored in S3, where directory operations are + * relatively expensive. + * + * To enable this output committer, users may set the "spark.sql.parquet.output.committer.class" + * property via Hadoop [[Configuration]]. Not that this property overrides + * "spark.sql.sources.outputCommitterClass". + * + * *NOTE* + * + * NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's + * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are + * left * empty). + */ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) { val LOG = Log.getLog(classOf[ParquetOutputCommitter]) - override def getWorkPath(): Path = outputPath + override def getWorkPath: Path = outputPath override def abortTask(taskContext: TaskAttemptContext): Unit = {} override def commitTask(taskContext: TaskAttemptContext): Unit = {} override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true @@ -46,13 +62,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus) try { ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers) - } catch { - case e: Exception => { - LOG.warn("could not write summary file for " + outputPath, e) - val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE) - if (fileSystem.exists(metadataPath)) { - fileSystem.delete(metadataPath, true) - } + } catch { case e: Exception => + LOG.warn("could not write summary file for " + outputPath, e) + val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE) + if (fileSystem.exists(metadataPath)) { + fileSystem.delete(metadataPath, true) } } } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index e049d54bf55dc..1d353bd8e1114 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -178,11 +178,11 @@ private[sql] class ParquetRelation2( val committerClass = conf.getClass( - "spark.sql.parquet.output.committer.class", + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter]) - if (conf.get("spark.sql.parquet.output.committer.class") == null) { + if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) { logInfo("Using default output committer for Parquet: " + classOf[ParquetOutputCommitter].getCanonicalName) } else {