From f75e261c5652e4d6fa69e6f790f5b4a9238ad29e Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Mon, 16 Mar 2015 14:44:00 +0800 Subject: [PATCH 01/10] DirectParquetOutputCommitter --- .../DirectParquetOutputCommitter.scala | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala new file mode 100644 index 0000000000000..da4802d40f0b1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala @@ -0,0 +1,49 @@ +package org.apache.spark.sql.parquet + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + +import parquet.Log +import parquet.hadoop.util.ContextUtil +import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter} + +class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) + extends ParquetOutputCommitter(outputPath, context) { + val LOG = Log.getLog(classOf[ParquetOutputCommitter]) + + override def getWorkPath(): Path = outputPath + override def abortTask(taskContext: TaskAttemptContext): Unit = {} + override def commitTask(taskContext: TaskAttemptContext): Unit = {} + override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true + override def setupJob(jobContext: JobContext): Unit = {} + override def setupTask(taskContext: TaskAttemptContext): Unit = {} + + override def commitJob(jobContext: JobContext) { + try { + val configuration = ContextUtil.getConfiguration(jobContext) + val fileSystem = outputPath.getFileSystem(configuration) + val outputStatus = fileSystem.getFileStatus(outputPath) + val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus) + try { + ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers) + if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { + val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) + fileSystem.create(successPath).close() + } + } catch { + case e: Exception => { + LOG.warn("could not write summary file for " + outputPath, e) + val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE) + if (fileSystem.exists(metadataPath)) { + fileSystem.delete(metadataPath, true) + } + } + } + } catch { + case e: Exception => LOG.warn("could not write summary file for " + outputPath, e) + } + } + +} + From 769bd6737acc3f13a8678688e842e902fc38e802 Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Fri, 13 Mar 2015 11:48:03 +0800 Subject: [PATCH 02/10] DirectParquetOutputCommitter --- .../sql/parquet/ParquetTableOperations.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 62813a981e685..38abd5faa24d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -379,6 +379,8 @@ private[sql] case class InsertIntoParquetTable( */ private[parquet] class AppendingParquetOutputFormat(offset: Int) extends parquet.hadoop.ParquetOutputFormat[Row] { + var committer: OutputCommitter = null + // override to accept existing directories as valid output directory override def checkOutputSpecs(job: JobContext): Unit = {} @@ -403,6 +405,27 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = { context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID] } + + // override to choose between ParquetOutputCommitter and DirectParquetOutputCommitter + override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = { + if (committer == null) { + val output = getOutputPath(context) + val marker = "spark.sql.parquet.useDirectParquetOutputCommitter" + committer = context.getConfiguration.getBoolean(marker, false) match { + case true => new DirectParquetOutputCommitter(output, context) + case false => new ParquetOutputCommitter(output, context) + } + } + committer + } + + // FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext in hadoop-2 + private def getOutputPath(context: TaskAttemptContext): Path = { + context.getConfiguration().get("mapred.output.dir") match { + case null => null + case name => new Path(name) + } + } } /** From 0fc03ca563c38ae0b625898c59730cdedfb0534b Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Tue, 17 Mar 2015 15:27:44 +0800 Subject: [PATCH 03/10] [SPARK-6532] [SQL] hide class DirectParquetOutputCommitter --- .../apache/spark/sql/parquet/DirectParquetOutputCommitter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala index da4802d40f0b1..890765267c747 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala @@ -8,7 +8,7 @@ import parquet.Log import parquet.hadoop.util.ContextUtil import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter} -class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) +private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) { val LOG = Log.getLog(classOf[ParquetOutputCommitter]) From c42468c9b207edd995524afc2ebb1f723e375d20 Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Tue, 17 Mar 2015 15:28:28 +0800 Subject: [PATCH 04/10] [SPARK-6352] [SQL] add test case --- .../spark/sql/parquet/ParquetIOSuite.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 36f3406a7825f..a94dddaa3f89c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -329,6 +329,24 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { checkAnswer(parquetFile(file), (data ++ newData).map(Row.fromTuple)) } } + + test("SPARK-6352 DirectParquetOutputCommitter") { + try { + configuration.set("spark.sql.parquet.useDirectParquetOutputCommitter", "true") + sqlContext.udf.register("div0", (x: Int) => x / 0) + withTempPath { dir => + intercept[org.apache.spark.SparkException] { + sqlContext.sql("select div0(1)").saveAsParquetFile(dir.getCanonicalPath) + } + val path = new Path(dir.getCanonicalPath, "_temporary") + val fs = path.getFileSystem(configuration) + assert(!fs.exists(path)) + } + } + finally { + configuration.set("spark.sql.parquet.useDirectParquetOutputCommitter", "false") + } + } } class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll { From 0d540b9cfc03fc71d228616123f8cad4602e8f14 Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Tue, 17 Mar 2015 15:56:02 +0800 Subject: [PATCH 05/10] [SPARK-6352] [SQL] add license --- .../parquet/DirectParquetOutputCommitter.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala index 890765267c747..25a66cb488103 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.parquet import org.apache.hadoop.fs.Path From 9ae7545701f522702f2d0240367fc6fba06b7c26 Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Mon, 23 Mar 2015 18:32:15 +0800 Subject: [PATCH 06/10] [SPARL-6352] [SQL] Change to allow custom parquet output committer. Add a new configuration key: spark.sql.parquet.output.committer.class which should be a sub-class of ParquetOutputCommitter --- .../spark/sql/parquet/ParquetTableOperations.scala | 11 +++++------ .../org/apache/spark/sql/parquet/ParquetIOSuite.scala | 5 +++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 38abd5faa24d6..5e0be7a98cc17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -406,15 +406,14 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID] } - // override to choose between ParquetOutputCommitter and DirectParquetOutputCommitter + // override to create output committer from configuration override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = { if (committer == null) { val output = getOutputPath(context) - val marker = "spark.sql.parquet.useDirectParquetOutputCommitter" - committer = context.getConfiguration.getBoolean(marker, false) match { - case true => new DirectParquetOutputCommitter(output, context) - case false => new ParquetOutputCommitter(output, context) - } + val cls = context.getConfiguration.getClass("spark.sql.parquet.output.committer.class", + classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter]) + val ctor = cls.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + committer = ctor.newInstance(output, context).asInstanceOf[ParquetOutputCommitter] } committer } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index a94dddaa3f89c..5065ee842d97a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -332,7 +332,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { test("SPARK-6352 DirectParquetOutputCommitter") { try { - configuration.set("spark.sql.parquet.useDirectParquetOutputCommitter", "true") + configuration.set("spark.sql.parquet.output.committer.class", + "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") sqlContext.udf.register("div0", (x: Int) => x / 0) withTempPath { dir => intercept[org.apache.spark.SparkException] { @@ -344,7 +345,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } } finally { - configuration.set("spark.sql.parquet.useDirectParquetOutputCommitter", "false") + configuration.unset("spark.sql.parquet.output.committer.class") } } } From fe659151c7f8e2547404fe8a93c6010ceebb865a Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Tue, 14 Apr 2015 16:22:07 +0800 Subject: [PATCH 07/10] add support for parquet config parquet.enable.summary-metadata --- .../DirectParquetOutputCommitter.scala | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala index 25a66cb488103..0ebb0988c0ddc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import parquet.Log import parquet.hadoop.util.ContextUtil -import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter} +import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat} private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) { @@ -40,23 +40,25 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T try { val configuration = ContextUtil.getConfiguration(jobContext) val fileSystem = outputPath.getFileSystem(configuration) - val outputStatus = fileSystem.getFileStatus(outputPath) - val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus) - try { - ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers) - if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { - val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) - fileSystem.create(successPath).close() - } - } catch { - case e: Exception => { - LOG.warn("could not write summary file for " + outputPath, e) - val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE) - if (fileSystem.exists(metadataPath)) { - fileSystem.delete(metadataPath, true) + if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) { + val outputStatus = fileSystem.getFileStatus(outputPath) + val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus) + try { + ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers) + } catch { + case e: Exception => { + LOG.warn("could not write summary file for " + outputPath, e) + val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE) + if (fileSystem.exists(metadataPath)) { + fileSystem.delete(metadataPath, true) + } } } } + if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { + val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) + fileSystem.create(successPath).close() + } } catch { case e: Exception => LOG.warn("could not write summary file for " + outputPath, e) } From 9ece5c5cb366ba34fd542fe207dcdd6564385448 Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Wed, 15 Apr 2015 14:10:28 +0800 Subject: [PATCH 08/10] compatibility with hadoop 1.x --- .../scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 4d0bf7cf99cdf..b504842053690 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -399,7 +399,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } } finally { - configuration.unset("spark.sql.parquet.output.committer.class") + configuration.set("spark.sql.parquet.output.committer.class", + "parquet.hadoop.ParquetOutputCommitter") } } } From 472870e290ffb9e264889d72d6275e7abc6c231f Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Wed, 15 Apr 2015 18:19:02 +0800 Subject: [PATCH 09/10] add back custom parquet output committer --- .../sql/parquet/ParquetTableOperations.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 1c868da23e060..532dca2dbfae6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -381,6 +381,7 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) extends parquet.hadoop.ParquetOutputFormat[Row] { // override to accept existing directories as valid output directory override def checkOutputSpecs(job: JobContext): Unit = {} + var committer: OutputCommitter = null // override to choose output filename so not overwrite existing ones override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { @@ -403,6 +404,26 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = { context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID] } + + // override to create output committer from configuration + override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = { + if (committer == null) { + val output = getOutputPath(context) + val cls = context.getConfiguration.getClass("spark.sql.parquet.output.committer.class", + classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter]) + val ctor = cls.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + committer = ctor.newInstance(output, context).asInstanceOf[ParquetOutputCommitter] + } + committer + } + + // FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext in hadoop-2 + private def getOutputPath(context: TaskAttemptContext): Path = { + context.getConfiguration().get("mapred.output.dir") match { + case null => null + case name => new Path(name) + } + } } /** From 54c6b157547ea16cc5482e9dfd396179022d5948 Mon Sep 17 00:00:00 2001 From: Pei-Lun Lee Date: Mon, 27 Apr 2015 18:28:07 +0800 Subject: [PATCH 10/10] error handling --- .../DirectParquetOutputCommitter.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala index 0ebb0988c0ddc..f5ce2718bec4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala @@ -37,10 +37,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T override def setupTask(taskContext: TaskAttemptContext): Unit = {} override def commitJob(jobContext: JobContext) { - try { - val configuration = ContextUtil.getConfiguration(jobContext) - val fileSystem = outputPath.getFileSystem(configuration) - if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) { + val configuration = ContextUtil.getConfiguration(jobContext) + val fileSystem = outputPath.getFileSystem(configuration) + + if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) { + try { val outputStatus = fileSystem.getFileStatus(outputPath) val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus) try { @@ -54,15 +55,19 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T } } } + } catch { + case e: Exception => LOG.warn("could not write summary file for " + outputPath, e) } - if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { + } + + if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { + try { val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) fileSystem.create(successPath).close() + } catch { + case e: Exception => LOG.warn("could not write success file for " + outputPath, e) } - } catch { - case e: Exception => LOG.warn("could not write summary file for " + outputPath, e) } } - }