From b942dcaca9cfa52f57e8f5034ca83f74c33da14f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 10 Jul 2016 16:50:02 +0800 Subject: [PATCH] Fixes SPARK-16344 --- .../parquet/CatalystReadSupport.scala | 3 +- .../parquet/CatalystRecordMaterializer.scala | 7 ++- .../parquet/CatalystRowConverter.scala | 50 ++++++------------- .../parquet/CatalystSchemaConverter.scala | 11 ++-- .../parquet/ParquetQuerySuite.scala | 16 +++++- .../hive/ParquetHiveCompatibilitySuite.scala | 6 +++ 6 files changed, 49 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index a958373eb769d..1a8cd0ed5e059 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -97,7 +97,8 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with new CatalystRecordMaterializer( parquetRequestedSchema, - CatalystReadSupport.expandUDT(catalystRequestedSchema)) + CatalystReadSupport.expandUDT(catalystRequestedSchema), + new CatalystSchemaConverter(conf)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala index eeead9f5d88a2..573fd1d66fa6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala @@ -30,10 +30,13 @@ import org.apache.spark.sql.types.StructType * @param catalystSchema Catalyst schema of the rows to be constructed */ private[parquet] class CatalystRecordMaterializer( - parquetSchema: MessageType, catalystSchema: StructType) + parquetSchema: MessageType, + catalystSchema: StructType, + schemaConverter: CatalystSchemaConverter) extends RecordMaterializer[InternalRow] { - private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater) + private val rootConverter = + new CatalystRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index 94298fae2d69b..31a253769eb64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -118,6 +118,7 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class CatalystRowConverter( + schemaConverter: CatalystSchemaConverter, parquetType: GroupType, catalystType: StructType, updater: ParentContainerUpdater) @@ -291,9 +292,10 @@ private[parquet] class CatalystRowConverter( new CatalystMapConverter(parquetType.asGroupType(), t, updater) case t: StructType => - new CatalystRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater { - override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) - }) + new CatalystRowConverter( + schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater { + override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) + }) case t => throw new RuntimeException( @@ -443,11 +445,20 @@ private[parquet] class CatalystRowConverter( val elementType = catalystSchema.elementType val parentName = parquetSchema.getName - if (isElementType(repeatedType, elementType, parentName)) { + // At this stage, we're not sure whether the repeated field maps to the element type or is + // just the syntactic repeated group of the 3-level standard LIST layout. Here we try to + // convert the repeated field into a Catalyst type to see whether the converted type matches + // the Catalyst array element type. + val guessedElementType = schemaConverter.convertField(repeatedType) + + if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)) { newConverter(repeatedType, elementType, new ParentContainerUpdater { override def set(value: Any): Unit = currentArray += value }) } else { + // If the repeated field corresponds to the syntactic group in the standard 3-level Parquet + // LIST layout, creates a new converter using the only child field of the repeated field. + assert(!repeatedType.isPrimitive && repeatedType.asGroupType().getFieldCount == 1) new ElementConverter(repeatedType.asGroupType().getType(0), elementType) } } @@ -461,37 +472,6 @@ private[parquet] class CatalystRowConverter( // in row cells. override def start(): Unit = currentArray = ArrayBuffer.empty[Any] - // scalastyle:off - /** - * Returns whether the given type is the element type of a list or is a syntactic group with - * one field that is the element type. This is determined by checking whether the type can be - * a syntactic group and by checking whether a potential syntactic group matches the expected - * schema. - * {{{ - * group (LIST) { - * repeated group list { <-- repeatedType points here - * element; - * } - * } - * }}} - * In short, here we handle Parquet list backwards-compatibility rules on the read path. This - * method is based on `AvroIndexedRecordConverter.isElementType`. - * - * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules - */ - // scalastyle:on - private def isElementType( - parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = { - (parquetRepeatedType, catalystElementType) match { - case (t: PrimitiveType, _) => true - case (t: GroupType, _) if t.getFieldCount > 1 => true - case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true - case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true - case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true - case _ => false - } - } - /** Array element converter */ private final class ElementConverter(parquetType: Type, catalystType: DataType) extends GroupConverter { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 5f9f9083098a7..9e1eb52f3103d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -65,7 +65,8 @@ private[parquet] class CatalystSchemaConverter( def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, - writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean) + writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. @@ -259,7 +260,7 @@ private[parquet] class CatalystSchemaConverter( { // For legacy 2-level list types with primitive element type, e.g.: // - // // List (nullable list, non-null elements) + // // ARRAY (nullable list, non-null elements) // optional group my_list (LIST) { // repeated int32 element; // } @@ -269,7 +270,7 @@ private[parquet] class CatalystSchemaConverter( // For legacy 2-level list types whose element type is a group type with 2 or more fields, // e.g.: // - // // List> (nullable list, non-null elements) + // // ARRAY> (nullable list, non-null elements) // optional group my_list (LIST) { // repeated group element { // required binary str (UTF8); @@ -281,7 +282,7 @@ private[parquet] class CatalystSchemaConverter( } || { // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.: // - // // List> (nullable list, non-null elements) + // // ARRAY> (nullable list, non-null elements) // optional group my_list (LIST) { // repeated group array { // required binary str (UTF8); @@ -292,7 +293,7 @@ private[parquet] class CatalystSchemaConverter( } || { // For Parquet data generated by parquet-thrift, e.g.: // - // // List> (nullable list, non-null elements) + // // ARRAY> (nullable list, non-null elements) // optional group my_list (LIST) { // repeated group my_list_tuple { // required binary str (UTF8); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index f777e973052d3..e378cf620ea7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT} +import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -574,9 +574,23 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext assert(CatalystReadSupport.expandUDT(schema) === expected) } + + test("SPARK-16344: array of struct with a single field named 'element'") { + withTempPath { dir => + val path = dir.getCanonicalPath + Seq(Tuple1(Array(SingleElement(42)))).toDF("f").write.parquet(path) + + checkAnswer( + sqlContext.read.parquet(path), + Row(Array(Row(42))) + ) + } + } } object TestingUDT { + case class SingleElement(element: Long) + @SQLUserDefinedType(udt = classOf[NestedStructUDT]) case class NestedStruct(a: Integer, b: Long, c: Double) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 49aab85cf1aaf..cb4d9673b030d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -136,4 +136,10 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi Row(Row(1, Seq("foo", "bar", null))), "STRUCT>") } + + test("SPARK-16344: array of struct with a single field named 'array_element'") { + testParquetHiveCompatibility( + Row(Seq(Row(1))), + "ARRAY>") + } }