Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10005] [SQL] Fixes schema merging for nested structs #8228

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType

private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
// Called after `init()` when initializing Parquet record reader.
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
Expand All @@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
// available if the target file is written by Spark SQL.
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
logDebug("Catalyst schema not available, falling back to Parquet schema")
logInfo("Catalyst schema not available, falling back to Parquet schema")
toCatalyst.convert(parquetRequestedSchema)
}

logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
logInfo {
s"""Going to read the following fields from the Parquet file:
|
|Parquet form:
|$parquetRequestedSchema
|
|Catalyst form:
|$catalystRequestedSchema
""".stripMargin
}

new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
}

// Called before `prepareForRead()` when initializing Parquet record reader.
override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration

// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its the metadata.
// schema of this file from its metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))

// Optional schema of requested columns, in the form of a string serialized from a Catalyst
Expand Down Expand Up @@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)

logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
new ReadContext(parquetRequestedSchema, metadata)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.OriginalType.LIST
import org.apache.parquet.schema.OriginalType.{LIST, INT_32, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -88,12 +89,54 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
}

/**
* A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s.
* Since any Parquet record is also a struct, this converter can also be used as root converter.
* A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s.
* Since Catalyst `StructType` is also a Parquet record, this converter can be used as root
* converter. Take the following Parquet type as an example:
* {{{
* message root {
* required int32 f1;
* optional group f2 {
* required double f21;
* optional binary f22 (utf8);
* }
* }
* }}}
* 5 converters will be created:
*
* - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which contains:
* - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and
* - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, which contains:
* - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and
* - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22`
*
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
* any "parent" container.
*
* @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the
* Parquet file being read, while constructor argument [[catalystType]] refers to requested
* fields of the global schema. The key difference is that, in case of schema merging,
* [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have
* the following [[catalystType]]:
* {{{
* new StructType()
* .add("f1", IntegerType, nullable = false)
* .add("f2", StringType, nullable = true)
* .add("f3", new StructType()
* .add("f31", DoubleType, nullable = false)
* .add("f32", IntegerType, nullable = true)
* .add("f33", StringType, nullable = true), nullable = false)
* }}}
* and the following [[parquetType]] (`f2` and `f32` are missing):
* {{{
* message root {
* required int32 f1;
* required group f3 {
* required double f31;
* optional binary f33 (utf8);
* }
* }
* }}}
*
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
* @param updater An updater which propagates converted field values to the parent container
Expand Down Expand Up @@ -126,7 +169,24 @@ private[parquet] class CatalystRowConverter(

// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
parquetType.getFields.zip(catalystType).zipWithIndex.map {
// In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad
// those missing fields and create converters for them, although values of these fields are
// always null.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also investigate if there is any issue when catalystType is a subset of parquetType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question. Tested the following snippet against 1.4.1, and it doesn't work as expected:

import sqlContext.implicits._
import org.apache.spark.sql.types._

sqlContext
  .range(1).select('id as 'a, 'id as 'b)
  .write.mode("overwrite").parquet("file:///tmp/schema")

sqlContext
  .read
  .schema(StructType(StructField("a", LongType, false) :: Nil))
  .parquet("file:///tmp/schema").show()

So at least this won't be a regression. It worths further investigation though.

val paddedParquetFields = {
val parquetFields = parquetType.getFields
val parquetFieldNames = parquetFields.map(_.getName).toSet
val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the case-sensitivity of field names introduce any issue at here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Currently reading Parquet metastore tables may introduce case sensitivity issue. But it's already reconciled within ParquetRelation before building the converters.


// We don't need to worry about feature flag arguments like `assumeBinaryIsString` when
// creating the schema converter here, since values of missing fields are always null.
val toParquet = new CatalystSchemaConverter()

(parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f =>
catalystType.indexWhere(_.name == f.getName)
}
}

paddedParquetFields.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,9 @@ private[parquet] class CatalystSchemaConverter(
followParquetFormatSpec = conf.followParquetFormatSpec)

def this(conf: Configuration) = this(
assumeBinaryIsString =
conf.getBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
assumeInt96IsTimestamp =
conf.getBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
followParquetFormatSpec =
conf.getBoolean(
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three configurations should always be set. Removed default values to ensure that these configurations are actually set in ParquetRelation. I'd rather it crashes instead of giving wrong schema (and wrong data).


/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.sql._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -201,4 +201,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
assert(Decimal("67123.45") === Decimal(decimal))
}
}

test("SPARK-10005 Schema merging for nested struct") {
val sqlContext = _sqlContext
import sqlContext.implicits._

withTempPath { dir =>
val path = dir.getCanonicalPath

def append(df: DataFrame): Unit = {
df.write.mode(SaveMode.Append).parquet(path)
}

// Note that both the following two DataFrames contain a single struct column with multiple
// nested fields.
append((1 to 2).map(i => Tuple1((i, i))).toDF())
append((1 to 2).map(i => Tuple1((i, i, i))).toDF())

withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
checkAnswer(
sqlContext.read.option("mergeSchema", "true").parquet(path),
Seq(
Row(Row(1, 1, null)),
Row(Row(2, 2, null)),
Row(Row(1, 1, 1)),
Row(Row(2, 2, 2))))
}
}
}
}