Skip to content

Commit

Permalink
[SPARK-22217][SQL] ParquetFileFormat to support arbitrary OutputCommi…
Browse files Browse the repository at this point in the history
…tters

## What changes were proposed in this pull request?

`ParquetFileFormat` to relax its requirement of output committer class from `org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and so implicitly Hadoop `FileOutputCommitter`) to any committer implementing `org.apache.hadoop.mapreduce.OutputCommitter`

This enables output committers which don't write to the filesystem the way `FileOutputCommitter` does to save parquet data from a dataframe: at present you cannot do this.

Before a committer which isn't a subclass of `ParquetOutputCommitter`, it checks to see if the context has requested summary metadata by setting `parquet.enable.summary-metadata`. If true, and the committer class isn't a parquet committer, it raises a RuntimeException with an error message.

(It could downgrade, of course, but raising an exception makes it clear there won't be an summary. It also makes the behaviour testable.)

Note that `SQLConf` already states that any `OutputCommitter` can be used, but that typically it's a subclass of ParquetOutputCommitter. That's not currently true. This patch will make the code consistent with the docs, adding tests to verify,

## How was this patch tested?

The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, `MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a marker file in the destination directory. The presence of the marker file can be used to verify the new committer was used. The tests then try the combinations of Parquet committer summary/no-summary and marking committer summary/no-summary.

| committer | summary | outcome |
|-----------|---------|---------|
| parquet   | true    | success |
| parquet   | false   | success |
| marking   | false   | success with marker |
| marking   | true    | exception |

All tests are happy.

Author: Steve Loughran <stevel@hortonworks.com>

Closes apache#19448 from steveloughran/cloud/SPARK-22217-committer.
  • Loading branch information
steveloughran authored and MatthewRBruce committed Jul 31, 2018
1 parent ccce328 commit 7854b0b
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ object SQLConf {

val PARQUET_OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.parquet.output.committer.class")
.doc("The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter.")
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. If it is not, then metadata summaries" +
"will never be created, irrespective of the value of parquet.enable.summary-metadata")
.internal()
.stringConf
.createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ParquetFileFormat
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])
classOf[OutputCommitter])

if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
logInfo("Using default output committer for Parquet: " +
Expand All @@ -97,7 +97,7 @@ class ParquetFileFormat
conf.setClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
committerClass,
classOf[ParquetOutputCommitter])
classOf[OutputCommitter])

// We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
// it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
Expand Down Expand Up @@ -137,6 +137,14 @@ class ParquetFileFormat
conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
}

if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
&& !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
// output summary is requested, but the class is not a Parquet Committer
logWarning(s"Committer $committerClass is not a ParquetOutputCommitter and cannot" +
s" create job summaries. " +
s"Set Parquet option ${ParquetOutputFormat.ENABLE_JOB_SUMMARY} to false.")
}

new OutputWriterFactory {
// This OutputWriterFactory instance is deserialized when writing Parquet files on the
// executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import java.io.FileNotFoundException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}

import org.apache.spark.{LocalSparkContext, SparkFunSuite}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils

/**
* Test logic related to choice of output committers.
*/
class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils
with LocalSparkContext {

private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName

protected var spark: SparkSession = _

/**
* Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled.
*/
override def beforeAll(): Unit = {
super.beforeAll()
spark = SparkSession.builder()
.master("local-cluster[2,1,1024]")
.appName("testing")
.getOrCreate()
}

override def afterAll(): Unit = {
try {
if (spark != null) {
spark.stop()
spark = null
}
} finally {
super.afterAll()
}
}

test("alternative output committer, merge schema") {
writeDataFrame(MarkingFileOutput.COMMITTER, summary = true, check = true)
}

test("alternative output committer, no merge schema") {
writeDataFrame(MarkingFileOutput.COMMITTER, summary = false, check = true)
}

test("Parquet output committer, merge schema") {
writeDataFrame(PARQUET_COMMITTER, summary = true, check = false)
}

test("Parquet output committer, no merge schema") {
writeDataFrame(PARQUET_COMMITTER, summary = false, check = false)
}

/**
* Write a trivial dataframe as Parquet, using the given committer
* and job summary option.
* @param committer committer to use
* @param summary create a job summary
* @param check look for a marker file
* @return if a marker file was sought, it's file status.
*/
private def writeDataFrame(
committer: String,
summary: Boolean,
check: Boolean): Option[FileStatus] = {
var result: Option[FileStatus] = None
withSQLConf(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> committer,
ParquetOutputFormat.ENABLE_JOB_SUMMARY -> summary.toString) {
withTempPath { dest =>
val df = spark.createDataFrame(Seq((1, "4"), (2, "2")))
val destPath = new Path(dest.toURI)
df.write.format("parquet").save(destPath.toString)
if (check) {
result = Some(MarkingFileOutput.checkMarker(
destPath,
spark.sparkContext.hadoopConfiguration))
}
}
}
result
}
}

/**
* A file output committer which explicitly touches a file "marker"; this
* is how tests can verify that this committer was used.
* @param outputPath output path
* @param context task context
*/
private class MarkingFileOutputCommitter(
outputPath: Path,
context: TaskAttemptContext) extends FileOutputCommitter(outputPath, context) {

override def commitJob(context: JobContext): Unit = {
super.commitJob(context)
MarkingFileOutput.touch(outputPath, context.getConfiguration)
}
}

private object MarkingFileOutput {

val COMMITTER = classOf[MarkingFileOutputCommitter].getCanonicalName

/**
* Touch the marker.
* @param outputPath destination directory
* @param conf configuration to create the FS with
*/
def touch(outputPath: Path, conf: Configuration): Unit = {
outputPath.getFileSystem(conf).create(new Path(outputPath, "marker")).close()
}

/**
* Get the file status of the marker
*
* @param outputPath destination directory
* @param conf configuration to create the FS with
* @return the status of the marker
* @throws FileNotFoundException if the marker is absent
*/
def checkMarker(outputPath: Path, conf: Configuration): FileStatus = {
outputPath.getFileSystem(conf).getFileStatus(new Path(outputPath, "marker"))
}
}

0 comments on commit 7854b0b

Please sign in to comment.