Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.util.{Map => JMap}

import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter}
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
Expand All @@ -29,34 +29,62 @@ import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema._

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._

/**
* A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
* [[InternalRow]]s.
*
* The API interface of [[ReadSupport]] is a little bit over complicated because of historical
* reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
* instantiated and initialized twice on both driver side and executor side. The [[init()]] method
* is for driver side initialization, while [[prepareForRead()]] is for executor side. However,
* starting from parquet-mr 1.6.0, it's no longer the case, and [[ReadSupport]] is only instantiated
* and initialized on executor side. So, theoretically, now it's totally fine to combine these two
* methods into a single initialization method. The only reason (I could think of) to still have
* them here is for parquet-mr API backwards-compatibility.
*
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
// Called after `init()` when initializing Parquet record reader.
private var catalystRequestedSchema: StructType = _

/**
* Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record
* readers. Responsible for figuring out Parquet requested schema used for column pruning.
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "fallback" logic is removed because now we always set requested schema properly along the read path. This piece of code was inherited from the old Parquet support, which has already been removed.

override def init(context: InitContext): ReadContext = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this method in front of prepareForRead() for better readability, since this method is called right before prepareForRead().

catalystRequestedSchema = {
// scalastyle:off jobcontext
val conf = context.getConfiguration
// scalastyle:on jobcontext
val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}

val parquetRequestedSchema =
CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)

new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}

/**
* Called on executor side after [[init()]], before instantiating actual Parquet record readers.
* Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
* records to Catalyst [[InternalRow]]s.
*/
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")

val toCatalyst = new CatalystSchemaConverter(conf)
val parquetRequestedSchema = readContext.getRequestedSchema

val catalystRequestedSchema =
Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata =>
metadata
// First tries to read requested schema, which may result from projections
.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
// If not available, tries to read Catalyst schema from file metadata. It's only
// available if the target file is written by Spark SQL.
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
logInfo("Catalyst schema not available, falling back to Parquet schema")
toCatalyst.convert(parquetRequestedSchema)
}

logInfo {
s"""Going to read the following fields from the Parquet file:
|
Expand All @@ -69,36 +97,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with

new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we pass in the maybeRowSchema before? Seems it was not used by prepareForRead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC it was used by the old Parquet support code, which has already been removed.

}

// Called before `prepareForRead()` when initializing Parquet record reader.
override def init(context: InitContext): ReadContext = {
val conf = {
// scalastyle:off jobcontext
context.getConfiguration
// scalastyle:on jobcontext
}

// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))

// Optional schema of requested columns, in the form of a string serialized from a Catalyst
// `StructType` containing all requested columns.
val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))

val parquetRequestedSchema =
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
val catalystRequestedSchema = StructType.fromString(schemaString)
CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
}

val metadata =
Map.empty[String, String] ++
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)

new ReadContext(parquetRequestedSchema, metadata.asJava)
}
}

private[parquet] object CatalystReadSupport {
Expand Down