Skip to content

Commit

Permalink
Fixes bugs related to schema merging and empty requested columns
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jul 6, 2015
1 parent 61cce6c commit 3581497
Showing 1 changed file with 87 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,46 +62,108 @@ private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logg
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")

val toCatalyst = new CatalystSchemaConverter(conf)
val parquetSchema = readContext.getRequestedSchema
val catalystSchema =
Option(readContext.getReadSupportMetadata)
.map(_.toMap)
.flatMap { metadata =>
metadata
// First tries to read requested schema, which may result from projections
.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
// If not available, tries to read Catalyst schema from file metadata. It's only
// available if the target file is written by Spark SQL.
.orElse(metadata.get(RowReadSupport.SPARK_METADATA_KEY))
}
.map(StructType.fromString)
.getOrElse {
logDebug("Catalyst schema not available, falling back to Parquet message type")
toCatalyst.convert(parquetSchema)
}
val parquetRequestedSchema = readContext.getRequestedSchema

val catalystRequestedSchema =
Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
metadata
// First tries to read requested schema, which may result from projections
.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
// If not available, tries to read Catalyst schema from file metadata. It's only
// available if the target file is written by Spark SQL.
.orElse(metadata.get(RowReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
logDebug("Catalyst schema not available, falling back to Parquet schema")
toCatalyst.convert(parquetRequestedSchema)
}

logDebug(s"Catalyst schema used to read Parquet files: $catalystSchema")
new RowRecordMaterializer(parquetSchema, catalystSchema)
logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
new RowRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
}

override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration

// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its the metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))

// Optional schema of requested columns, in the form of a string serialized from a Catalyst
// `StructType` containing all requested columns.
val maybeRequestedSchema = Option(conf.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA))

// Below we construct a Parquet schema containing all requested columns. This schema tells
// Parquet which columns to read.
//
// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
// we have to fallback to the full file schema which contains all columns in the file.
// Obviously this may waste IO bandwidth since it may read more columns than requested.
//
// Two things to note:
//
// 1. It's possible that some requested columns don't exist in the target Parquet file. For
// example, in the case of schema merging, the globally merged schema may contain extra
// columns gathered from other Parquet files. These columns will be simply filled with nulls
// when actually reading the target Parquet file.
//
// 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
// Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
// non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
// containing a single integer array field `f1` may have the following legacy 2-level
// structure:
//
// message root {
// optional group f1 (LIST) {
// required INT32 element;
// }
// }
//
// while `CatalystSchemaConverter` may generate a standard 3-level structure:
//
// message root {
// optional group f1 (LIST) {
// repeated group list {
// required INT32 element;
// }
// }
// }
//
// Apparently, we can't use the 2nd schema to read the target Parquet file as they have
// different physical structures.
val parquetRequestedSchema =
maybeRequestedSchema.map { schemaString =>
StructType.fromString(schemaString).map { field =>
val fieldType = context.getFileSchema.asGroupType().getType(field.name)
new MessageType("root", fieldType)
}.reduce(_ union _)
}.getOrElse(context.getFileSchema)
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
val toParquet = new CatalystSchemaConverter(conf)
val fileSchema = context.getFileSchema.asGroupType()
val fileFieldNames = fileSchema.getFields.map(_.getName).toSet

StructType
// Deserializes the Catalyst schema of requested columns
.fromString(schemaString)
.map { field =>
if (fileFieldNames.contains(field.name)) {
// If the field exists in the target Parquet file, extracts the field type from the
// full file schema and makes a single-field Parquet schema
new MessageType("root", fileSchema.getType(field.name))
} else {
// Otherwise, just resorts to `CatalystSchemaConverter`
toParquet.convert(StructType(Array(field)))
}
}
// Merges all single-field Parquet schemas to form a complete schema for all requested
// columns. Note that it's possible that no columns are requested at all (e.g., count
// some partition column of a partitioned Parquet table). That's why `fold` is used here
// and always fallback to an empty Parquet schema.
.fold(new MessageType("root")) {
_ union _
}
}

val metadata =
Map.empty[String, String] ++
maybeRequestedSchema.map(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)

logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
new ReadContext(parquetRequestedSchema, metadata)
}
}
Expand Down

0 comments on commit 3581497

Please sign in to comment.