From d8a1ba439de0b9c4ff7a4eeac1b3517cc1c66cc0 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 1 Sep 2015 18:49:11 +0800 Subject: [PATCH 1/2] Simplifies CatalystReadSupport --- .../parquet/CatalystReadSupport.scala | 89 +++++++++---------- 1 file changed, 42 insertions(+), 47 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 8c819f1a48cd6..2eddbf30a763c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.{Map => JMap} -import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter} +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport.ReadContext @@ -32,31 +32,56 @@ import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ +/** + * A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst + * [[InternalRow]]s. + * + * The API interface of [[ReadSupport]] is a little bit over complicated because of historical + * reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be + * instantiated and initialized twice on both driver side and executor side. The [[init()]] method + * is for driver side initialization, while [[prepareForRead()]] is for executor side. However, + * starting from parquet-mr 1.6.0, it's no longer the case, and [[ReadSupport]] is only instantiated + * and initialized on executor side. So, theoretically, now it's totally fine to combine these two + * methods into a single initialization method. The only reason (I could think of) to still have + * them here is for parquet-mr API backwards-compatibility. + * + * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] + * to [[prepareForRead()]], but use a private `var` for simplicity. + */ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { - // Called after `init()` when initializing Parquet record reader. + private var catalystRequestedSchema: StructType = _ + + /** + * Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record + * readers. Responsible for figuring out Parquet requested schema used for column pruning. + */ + override def init(context: InitContext): ReadContext = { + catalystRequestedSchema = { + val conf = context.getConfiguration + val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) + assert(schemaString != null, "Parquet requested schema not set.") + StructType.fromString(schemaString) + } + + val parquetRequestedSchema = + CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) + + new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) + } + + /** + * Called on executor side after [[init()]], before instantiating actual Parquet record readers. + * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet + * records to Catalyst [[InternalRow]]s. + */ override def prepareForRead( conf: Configuration, keyValueMetaData: JMap[String, String], fileSchema: MessageType, readContext: ReadContext): RecordMaterializer[InternalRow] = { log.debug(s"Preparing for read Parquet file with message type: $fileSchema") - - val toCatalyst = new CatalystSchemaConverter(conf) val parquetRequestedSchema = readContext.getRequestedSchema - val catalystRequestedSchema = - Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata => - metadata - // First tries to read requested schema, which may result from projections - .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) - // If not available, tries to read Catalyst schema from file metadata. It's only - // available if the target file is written by Spark SQL. - .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY)) - }.map(StructType.fromString).getOrElse { - logInfo("Catalyst schema not available, falling back to Parquet schema") - toCatalyst.convert(parquetRequestedSchema) - } - logInfo { s"""Going to read the following fields from the Parquet file: | @@ -69,36 +94,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) } - - // Called before `prepareForRead()` when initializing Parquet record reader. - override def init(context: InitContext): ReadContext = { - val conf = { - // scalastyle:off jobcontext - context.getConfiguration - // scalastyle:on jobcontext - } - - // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst - // schema of this file from its metadata. - val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA)) - - // Optional schema of requested columns, in the form of a string serialized from a Catalyst - // `StructType` containing all requested columns. - val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) - - val parquetRequestedSchema = - maybeRequestedSchema.fold(context.getFileSchema) { schemaString => - val catalystRequestedSchema = StructType.fromString(schemaString) - CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) - } - - val metadata = - Map.empty[String, String] ++ - maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ - maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) - - new ReadContext(parquetRequestedSchema, metadata.asJava) - } } private[parquet] object CatalystReadSupport { From 4dfab07d52db2001b41e11c1a2790769c8bcf8e9 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 16 Sep 2015 13:48:02 -0700 Subject: [PATCH 2/2] Workaround false negative result of the "jobcontext" ScalaStyle rule --- .../execution/datasources/parquet/CatalystReadSupport.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 2eddbf30a763c..9502b835a54d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -29,6 +29,7 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema._ import org.apache.spark.Logging +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ @@ -57,7 +58,9 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with */ override def init(context: InitContext): ReadContext = { catalystRequestedSchema = { + // scalastyle:off jobcontext val conf = context.getConfiguration + // scalastyle:on jobcontext val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) assert(schemaString != null, "Parquet requested schema not set.") StructType.fromString(schemaString)