From 8ec13c95d3277c3464c97bd0cdefda10b578eac9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 16 Aug 2016 14:42:29 +0900 Subject: [PATCH] Fetch Parquet schema within driver-side when there is single file to touch without another Spark job --- .../apache/spark/sql/internal/SQLConf.scala | 10 ++ .../parquet/ParquetFileFormat.scala | 109 +++++++++++++----- 2 files changed, 91 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5566b06aa3553..c23f462043290 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -265,6 +265,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_READ_SCHEMA_LOCAL_THRESHOLD = + buildConf("spark.sql.parquet.readSchemaLocalThreshold") + .doc("Configures the maximum number of files to allow reading schema in driver-side. " + + "If the number of files exceeds this value, then schemas will be read then in parallel " + + "via another Spark distributed job.") + .intConf + .createWithDefault(32) + val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf @@ -848,6 +856,8 @@ class SQLConf extends Serializable with Logging { def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) + def parquetReadSchemaLocalThreshold: Int = getConf(PARQUET_READ_SCHEMA_LOCAL_THRESHOLD) + def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 062aa5c8ea624..a3ce01a779e20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -164,6 +164,8 @@ class ParquetFileFormat val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries + val readSchemaLocalThreshold = sparkSession.sessionState.conf.parquetReadSchemaLocalThreshold + val filesByType = splitFiles(files) // Sees which file(s) we need to touch in order to figure out the schema. @@ -234,7 +236,15 @@ class ParquetFileFormat .orElse(filesByType.data.headOption) .toSeq } - ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession) + + if (filesToTouch.length > readSchemaLocalThreshold) { + ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession) + } else { + // When the number of files to touch is less than or equal to `readSchemaLocalThreshold`, + // it touches the file and then fetch the schema within driver-side rather than + // launching another Spark distributed job to figure this out. + ParquetFileFormat.mergeSchemas(filesToTouch, sparkSession) + } } case class FileTypes( @@ -544,33 +554,16 @@ object ParquetFileFormat extends Logging { new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path)) }.toSeq - // Reads footers in multi-threaded manner within each task - val footers = - ParquetFileFormat.readParquetFootersInParallel( - serializedConf.value, fakeFileStatuses, ignoreCorruptFiles) + // Merges the schemas from footers locally within executor-side. + val maybeSchema = mergeSchemas( + fakeFileStatuses, + assumeBinaryIsString, + assumeInt96IsTimestamp, + writeLegacyParquetFormat, + ignoreCorruptFiles, + serializedConf.value) - // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` - val converter = - new ParquetSchemaConverter( - assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp, - writeLegacyParquetFormat = writeLegacyParquetFormat) - - if (footers.isEmpty) { - Iterator.empty - } else { - var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter) - footers.tail.foreach { footer => - val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter) - try { - mergedSchema = mergedSchema.merge(schema) - } catch { case cause: SparkException => - throw new SparkException( - s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause) - } - } - Iterator.single(mergedSchema) - } + maybeSchema.map(Iterator.single).getOrElse(Iterator.empty) }.collect() if (partiallyMergedSchemas.isEmpty) { @@ -589,8 +582,68 @@ object ParquetFileFormat extends Logging { } } + def mergeSchemas( + fileStatuses: Seq[FileStatus], + sparkSession: SparkSession): Option[StructType] = { + val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString + val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp + val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + mergeSchemas( + fileStatuses, + assumeBinaryIsString, + assumeInt96IsTimestamp, + writeLegacyParquetFormat, + ignoreCorruptFiles, + hadoopConf) + } + + /** + * Merges schemas from the footers of the given files. This is used for either driver-side or + * executor-side. Please see `spark.sql.parquet.readSchemaLocalThreshold` option in `SQLConf`. + */ + private def mergeSchemas( + fileStatuses: Seq[FileStatus], + assumeBinaryIsString: Boolean, + assumeInt96IsTimestamp: Boolean, + writeLegacyParquetFormat: Boolean, + ignoreCorruptFiles: Boolean, + hadoopConf: Configuration): Option[StructType] = { + + // Skips row group information since we only need the schema + val skipRowGroups = true + + // Reads footers in multi-threaded manner within each task + val footers = + ParquetFileFormat.readParquetFootersInParallel(hadoopConf, fileStatuses, ignoreCorruptFiles) + + // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` + val converter = + new ParquetSchemaConverter( + assumeBinaryIsString = assumeBinaryIsString, + assumeInt96IsTimestamp = assumeInt96IsTimestamp, + writeLegacyParquetFormat = writeLegacyParquetFormat) + + if (footers.isEmpty) { + None + } else { + var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter) + footers.tail.foreach { footer => + val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter) + try { + mergedSchema = mergedSchema.merge(schema) + } catch { case cause: SparkException => + throw new SparkException( + s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause) + } + } + Some(mergedSchema) + } + } + /** - * Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string + * Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string * can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns * a [[StructType]] converted from the [[MessageType]] stored in this footer. */