From 60a2c8ee7c610a783e65b78ac21e25661b84f49d Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Fri, 3 Jun 2016 22:31:56 +0800 Subject: [PATCH] Make continuous writing consistent with non-consistent writing --- .../parquet/ParquetFileFormat.scala | 183 +++++++++--------- 1 file changed, 91 insertions(+), 92 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ada9cd4b8e812..f424cc6162596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -70,64 +70,33 @@ private[sql] class ParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + // Setup some configurations for writing out + ParquetFileFormat.prepareConfForWriting( + job, + sparkSession.sessionState.conf, + options, + dataSchema, + // Setup the committer class when under non-continuous-queries scenario + (conf: Configuration) => { + val committerClass = + conf.getClass( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, + classOf[ParquetOutputCommitter], + classOf[ParquetOutputCommitter]) + + if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) { + logInfo("Using default output committer for Parquet: " + + classOf[ParquetOutputCommitter].getCanonicalName) + } else { + logInfo( + "Using user defined output committer for Parquet: " + committerClass.getCanonicalName) + } - val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - - val conf = ContextUtil.getConfiguration(job) - - val committerClass = - conf.getClass( - SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, - classOf[ParquetOutputCommitter], - classOf[ParquetOutputCommitter]) - - if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) { - logInfo("Using default output committer for Parquet: " + - classOf[ParquetOutputCommitter].getCanonicalName) - } else { - logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName) - } - - conf.setClass( - SQLConf.OUTPUT_COMMITTER_CLASS.key, - committerClass, - classOf[ParquetOutputCommitter]) - - // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override - // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why - // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is - // bundled with `ParquetOutputFormat[Row]`. - job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) - - ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport]) - - // We want to clear this temporary metadata from saving into Parquet file. - // This metadata is only useful for detecting optional columns when pushdowning filters. - val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, - dataSchema).asInstanceOf[StructType] - CatalystWriteSupport.setSchema(dataSchemaToWrite, conf) - - // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) - // and `CatalystWriteSupport` (writing actual rows to Parquet files). - conf.set( - SQLConf.PARQUET_BINARY_AS_STRING.key, - sparkSession.sessionState.conf.isParquetBinaryAsString.toString) - - conf.set( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp.toString) - - conf.set( - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, - sparkSession.sessionState.conf.writeLegacyParquetFormat.toString) - - // Sets compression scheme - conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec) - - // SPARK-15719: Disables writing Parquet summary files by default. - if (conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { - conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) - } + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + committerClass, + classOf[ParquetOutputCommitter]) + }) new OutputWriterFactory { override def newInstance( @@ -418,41 +387,14 @@ private[sql] class ParquetOutputWriterFactory( options: Map[String, String]) extends OutputWriterFactory { private val serializableConf: SerializableConfiguration = { - val job = Job.getInstance(hadoopConf) - val conf = ContextUtil.getConfiguration(job) - val parquetOptions = new ParquetOptions(options, sqlConf) - - // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override - // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why - // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is - // bundled with `ParquetOutputFormat[Row]`. - job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) - - ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport]) - - // We want to clear this temporary metadata from saving into Parquet file. - // This metadata is only useful for detecting optional columns when pushdowning filters. - val dataSchemaToWrite = StructType.removeMetadata( - StructType.metadataKeyForOptionalField, - dataSchema).asInstanceOf[StructType] - CatalystWriteSupport.setSchema(dataSchemaToWrite, conf) - - // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) - // and `CatalystWriteSupport` (writing actual rows to Parquet files). - conf.set( - SQLConf.PARQUET_BINARY_AS_STRING.key, - sqlConf.isParquetBinaryAsString.toString) - - conf.set( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sqlConf.isParquetINT96AsTimestamp.toString) - - conf.set( - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, - sqlConf.writeLegacyParquetFormat.toString) - - // Sets compression scheme - conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec) + // Setup some configurations for writing out + val conf = ParquetFileFormat.prepareConfForWriting( + Job.getInstance(hadoopConf), + sqlConf, + options, + dataSchema, + // We don't setup committer class when under continuous-queries scenario + (_: Configuration) => {}) new SerializableConfiguration(conf) } @@ -641,6 +583,63 @@ private[sql] object ParquetFileFormat extends Logging { overrideMinSplitSize(parquetBlockSize, job.getConfiguration) } + /** + * Get a [[Configuration]] from the given job, and then setup writing parameters into it. + * Both continuous-queries writing process and non-continuous-queries writing process will + * call this function. + */ + private[parquet] def prepareConfForWriting( + job: Job, + sqlConf: SQLConf, + options: Map[String, String], + dataSchema: StructType, + setupCommitterClassFunc: (Configuration) => Unit): Configuration = { + val parquetOptions = new ParquetOptions(options, sqlConf) + val conf = ContextUtil.getConfiguration(job) + + // We may need to setup committer class if under non-continuous-queries scenario + setupCommitterClassFunc(conf) + + // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override + // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why + // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is + // bundled with `ParquetOutputFormat[Row]`. + job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) + + ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport]) + + // We want to clear this temporary metadata from saving into Parquet file. + // This metadata is only useful for detecting optional columns when pushdowning filters. + val dataSchemaToWrite = StructType.removeMetadata( + StructType.metadataKeyForOptionalField, + dataSchema).asInstanceOf[StructType] + CatalystWriteSupport.setSchema(dataSchemaToWrite, conf) + + // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) + // and `CatalystWriteSupport` (writing actual rows to Parquet files). + conf.set( + SQLConf.PARQUET_BINARY_AS_STRING.key, + sqlConf.isParquetBinaryAsString.toString) + + conf.set( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + sqlConf.isParquetINT96AsTimestamp.toString) + + conf.set( + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + sqlConf.writeLegacyParquetFormat.toString) + + // Sets compression scheme + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec) + + // SPARK-15719: Disables writing Parquet summary files by default. + if (conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { + conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + } + + conf + } + private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {