Skip to content

Commit

Permalink
[SPARK-8139] [SQL] Updates docs and comments of data sources and Parq…
Browse files Browse the repository at this point in the history
…uet output committer options

This PR only applies to master branch (1.5.0-SNAPSHOT) since it references `org.apache.parquet` classes which only appear in Parquet 1.7.0.

Author: Cheng Lian <lian@databricks.com>

Closes #6683 from liancheng/output-committer-docs and squashes the following commits:

b4648b8 [Cheng Lian] Removes spark.sql.sources.outputCommitterClass as it's not a public option
ee63923 [Cheng Lian] Updates docs and comments of data sources and Parquet output committer options
  • Loading branch information
liancheng committed Jun 24, 2015
1 parent 7fb5ae5 commit 111d6b9
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 20 deletions.
30 changes: 29 additions & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,34 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
support.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.output.committer.class</code></td>
<td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
<td>
<p>
The output committer class used by Parquet. The specified class needs to be a subclass of
<code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a
subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
</p>
<p>
<b>Note:</b>
<ul>
<li>
This option must be set via Hadoop <code>Configuration</code> rather than Spark
<code>SQLConf</code>.
</li>
<li>
This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
</li>
</ul>
</p>
<p>
Spark SQL comes with a builtin
<code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
efficient then the default Parquet output committer when writing data to S3.
</p>
</td>
</tr>
</table>

## JSON Datasets
Expand Down Expand Up @@ -1876,7 +1904,7 @@ that these options will be deprecated in future release as more optimizations ar
Configures the number of partitions to use when shuffling data for joins or aggregations.
</td>
</tr>
<tr>
<tr>
<td><code>spark.sql.planner.externalSort</code></td>
<td>false</td>
<td>
Expand Down
30 changes: 23 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.util.Properties
import scala.collection.immutable
import scala.collection.JavaConversions._

import org.apache.parquet.hadoop.ParquetOutputCommitter

import org.apache.spark.sql.catalyst.CatalystConf

private[spark] object SQLConf {
Expand Down Expand Up @@ -252,21 +254,31 @@ private[spark] object SQLConf {

val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
defaultValue = Some(false),
doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default" +
" because of a known bug in Paruet 1.6.0rc3 " +
"(<a href=\"https://issues.apache.org/jira/browse/PARQUET-136\">PARQUET-136</a>). However, " +
doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default " +
"because of a known bug in Parquet 1.6.0rc3 " +
"(PARQUET-136, https://issues.apache.org/jira/browse/PARQUET-136). However, " +
"if your table doesn't contain any nullable string or binary columns, it's still safe to " +
"turn this feature on.")

val PARQUET_USE_DATA_SOURCE_API = booleanConf("spark.sql.parquet.useDataSourceApi",
defaultValue = Some(true),
doc = "<TODO>")

val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
key = "spark.sql.parquet.output.committer.class",
defaultValue = Some(classOf[ParquetOutputCommitter].getName),
doc = "The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
"option must be set in Hadoop Configuration. 2. This option overrides " +
"\"spark.sql.sources.outputCommitterClass\"."
)

val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
defaultValue = Some(false),
doc = "<TODO>")

val HIVE_VERIFY_PARTITIONPATH = booleanConf("spark.sql.hive.verifyPartitionPath",
val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
defaultValue = Some(true),
doc = "<TODO>")

Expand Down Expand Up @@ -325,9 +337,13 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")

// The output committer class used by FSBasedRelation. The specified class needs to be a
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
// NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
//
// NOTE:
//
// 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
// 2. This option can be overriden by "spark.sql.parquet.output.committer.class".
val OUTPUT_COMMITTER_CLASS =
stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)

Expand Down Expand Up @@ -415,7 +431,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

/** When true uses verifyPartitionPath to prune the path which is not exists. */
private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITIONPATH)
private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,35 @@

package org.apache.spark.sql.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.Log
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}

/**
* An output committer for writing Parquet files. In stead of writing to the `_temporary` folder
* like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the
* destination folder. This can be useful for data stored in S3, where directory operations are
* relatively expensive.
*
* To enable this output committer, users may set the "spark.sql.parquet.output.committer.class"
* property via Hadoop [[Configuration]]. Not that this property overrides
* "spark.sql.sources.outputCommitterClass".
*
* *NOTE*
*
* NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
* left * empty).
*/
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
val LOG = Log.getLog(classOf[ParquetOutputCommitter])

override def getWorkPath(): Path = outputPath
override def getWorkPath: Path = outputPath
override def abortTask(taskContext: TaskAttemptContext): Unit = {}
override def commitTask(taskContext: TaskAttemptContext): Unit = {}
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
Expand All @@ -46,13 +62,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
try {
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
} catch {
case e: Exception => {
LOG.warn("could not write summary file for " + outputPath, e)
val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
if (fileSystem.exists(metadataPath)) {
fileSystem.delete(metadataPath, true)
}
} catch { case e: Exception =>
LOG.warn("could not write summary file for " + outputPath, e)
val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
if (fileSystem.exists(metadataPath)) {
fileSystem.delete(metadataPath, true)
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ private[sql] class ParquetRelation2(

val committerClass =
conf.getClass(
"spark.sql.parquet.output.committer.class",
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])

if (conf.get("spark.sql.parquet.output.committer.class") == null) {
if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
logInfo("Using default output committer for Parquet: " +
classOf[ParquetOutputCommitter].getCanonicalName)
} else {
Expand Down

0 comments on commit 111d6b9

Please sign in to comment.