From 3bfe45fe8b81f44141b737df6b292f12cd37d06a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 1 Jul 2016 19:32:52 +0800 Subject: [PATCH 1/6] Fixes SPARK-16344 --- .../parquet/ParquetRowConverter.scala | 111 ++++++++++++++++-- .../parquet/ParquetQuerySuite.scala | 16 ++- 2 files changed, 117 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 9dad59647e0db..ac0f982136980 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -442,9 +442,9 @@ private[parquet] class ParquetRowConverter( private val elementConverter: Converter = { val repeatedType = parquetSchema.getType(0) val elementType = catalystSchema.elementType - val parentName = parquetSchema.getName + val parent = parquetSchema - if (isElementType(repeatedType, elementType, parentName)) { + if (isElementType(repeatedType, elementType, parent)) { newConverter(repeatedType, elementType, new ParentContainerUpdater { override def set(value: Any): Unit = currentArray += value }) @@ -469,7 +469,7 @@ private[parquet] class ParquetRowConverter( * a syntactic group and by checking whether a potential syntactic group matches the expected * schema. * {{{ - * group (LIST) { + * group (LIST) { <-- parent * repeated group list { <-- repeatedType points here * element; * } @@ -482,13 +482,106 @@ private[parquet] class ParquetRowConverter( */ // scalastyle:on private def isElementType( - parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = { + parquetRepeatedType: Type, catalystElementType: DataType, parent: GroupType): Boolean = { + + def isStandardListLayout(t: GroupType): Boolean = + Option(parent.getOriginalType) == Some(LIST) && + t.getFieldCount == 1 && + t.getName == "list" && + t.getFieldName(0) == "element" + (parquetRepeatedType, catalystElementType) match { - case (t: PrimitiveType, _) => true - case (t: GroupType, _) if t.getFieldCount > 1 => true - case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true - case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true - case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true + case (t: PrimitiveType, _) => + // For legacy 2-level list types with primitive element type, e.g.: + // + // // List (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated int32 element; + // } + true + + case (t: GroupType, _) if t.getFieldCount > 1 => + // For legacy 2-level list types whose element type is a group type with 2 or more fields, + // e.g.: + // + // // List> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group element { + // required binary str (UTF8); + // required int32 num; + // }; + // } + true + + case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => + // For Parquet data generated by parquet-thrift, e.g.: + // + // // List> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group my_list_tuple { + // required binary str (UTF8); + // }; + // } + true + + case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parent.getName + "_tuple" => + // For Parquet data generated by parquet-thrift, e.g.: + // + // // List> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group my_list_tuple { + // required binary str (UTF8); + // }; + // } + true + + case (t: GroupType, _) if isStandardListLayout(t) => + // For standard 3-level list types, e.g.: + // + // // List (list nullable, elements non-null) + // optional group my_list (LIST) { + // repeated group list { + // required binary element (UTF8); + // } + // } + // + // This case branch must appear before the next one. See comments of the next case branch + // for details. + false + + case (t: GroupType, StructType(fields)) => + // For legacy 2-level list types whose element type is a group type with a single field, + // e.g.: + // + // // List> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group list { + // required binary str (UTF8); + // }; + // } + // + // NOTE: This kind of schema is ambiguous. According to parquet-format spec, this schema + // can also be interpreted as `List`. However, consider we have a Parquet file + // with the following legacy 2-level list type whose element type is a group type with + // 2 fields: + // + // // List> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group list { + // required binary str (UTF8); + // required int32 num; + // }; + // } + // + // It's perfectly legal and quite possible that the user level query only want to query a + // single column `str` from this file, and thus gives the first schema as requested + // schema. To disambiguate these two cases, we check to see whether the nested field name + // is a member of corresponding Catalyst struct field names. + // + // NOTE: The standard 3-level LIST layout also matches this pattern, that's why we must + // check for standard layout before this case branch. + fields.map(_.name).contains(t.getFieldName(0)) + case _ => false } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 83d10010f9dcb..0b27de3ef7013 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow import org.apache.spark.sql.execution.BatchedDataSourceScanExec -import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT} +import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -668,9 +668,23 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-16344: array of struct with a single field named 'element'") { + withTempPath { dir => + val path = dir.getCanonicalPath + Seq(Tuple1(Array(SingleElement(42)))).toDF("f").write.parquet(path) + + checkAnswer( + sqlContext.read.parquet(path), + Row(Array(Row(42))) + ) + } + } } object TestingUDT { + case class SingleElement(element: Long) + @SQLUserDefinedType(udt = classOf[NestedStructUDT]) case class NestedStruct(a: Integer, b: Long, c: Double) From 5d3fdd4d31a5ddb91d4b4fd30a8a267fbe519952 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 6 Jul 2016 12:37:05 +0800 Subject: [PATCH 2/6] Addresses PR comment --- .../parquet/ParquetRowConverter.scala | 44 +++++++++---------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index ac0f982136980..14a2cfca1dad0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -470,7 +470,7 @@ private[parquet] class ParquetRowConverter( * schema. * {{{ * group (LIST) { <-- parent - * repeated group list { <-- repeatedType points here + * repeated group list { <-- repeatedType * element; * } * } @@ -483,20 +483,13 @@ private[parquet] class ParquetRowConverter( // scalastyle:on private def isElementType( parquetRepeatedType: Type, catalystElementType: DataType, parent: GroupType): Boolean = { - - def isStandardListLayout(t: GroupType): Boolean = - Option(parent.getOriginalType) == Some(LIST) && - t.getFieldCount == 1 && - t.getName == "list" && - t.getFieldName(0) == "element" - (parquetRepeatedType, catalystElementType) match { case (t: PrimitiveType, _) => // For legacy 2-level list types with primitive element type, e.g.: // // // List (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated int32 element; + // optional group my_list (LIST) { <-- parent + // repeated int32 element; <-- repeatedType // } true @@ -505,8 +498,8 @@ private[parquet] class ParquetRowConverter( // e.g.: // // // List> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group element { + // optional group my_list (LIST) { <-- parent + // repeated group element { <-- repeatedType // required binary str (UTF8); // required int32 num; // }; @@ -517,8 +510,8 @@ private[parquet] class ParquetRowConverter( // For Parquet data generated by parquet-thrift, e.g.: // // // List> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group my_list_tuple { + // optional group my_list (LIST) { <-- parent + // repeated group my_list_tuple { <-- repeatedType // required binary str (UTF8); // }; // } @@ -528,19 +521,23 @@ private[parquet] class ParquetRowConverter( // For Parquet data generated by parquet-thrift, e.g.: // // // List> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group my_list_tuple { + // optional group my_list (LIST) { <-- parent + // repeated group my_list_tuple { <-- repeatedType // required binary str (UTF8); // }; // } true - case (t: GroupType, _) if isStandardListLayout(t) => + case (t: GroupType, _) + if parent.getOriginalType == LIST && + t.getFieldCount == 1 && + t.getName == "list" && + t.getFieldName(0) == "element" => // For standard 3-level list types, e.g.: // // // List (list nullable, elements non-null) - // optional group my_list (LIST) { - // repeated group list { + // optional group my_list (LIST) { <-- parent + // repeated group list { <-- repeatedType // required binary element (UTF8); // } // } @@ -554,8 +551,8 @@ private[parquet] class ParquetRowConverter( // e.g.: // // // List> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group list { + // optional group my_list (LIST) { <-- parent + // repeated group list { <-- repeatedType // required binary str (UTF8); // }; // } @@ -578,8 +575,9 @@ private[parquet] class ParquetRowConverter( // schema. To disambiguate these two cases, we check to see whether the nested field name // is a member of corresponding Catalyst struct field names. // - // NOTE: The standard 3-level LIST layout also matches this pattern, that's why we must - // check for standard layout before this case branch. + // NOTE: The standard 3-level LIST layout also matches this pattern since the backwards- + // compatibility rules require that the names "list" and "element" are not mandatory, + // that's why we must check for standard layout before this case branch. fields.map(_.name).contains(t.getFieldName(0)) case _ => false From 1c6e9328fa19e633df358be2f949c35137ea898a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 6 Jul 2016 17:29:36 +0800 Subject: [PATCH 3/6] Fixes inconsistent comment --- .../execution/datasources/parquet/ParquetRowConverter.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 14a2cfca1dad0..92c45ed17b639 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -507,11 +507,12 @@ private[parquet] class ParquetRowConverter( true case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => - // For Parquet data generated by parquet-thrift, e.g.: + // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), + // e.g.: // // // List> (nullable list, non-null elements) // optional group my_list (LIST) { <-- parent - // repeated group my_list_tuple { <-- repeatedType + // repeated group array { <-- repeatedType // required binary str (UTF8); // }; // } From 35b85ae16d8f0b777a954930ecb95ef6a4ec828d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 10 Jul 2016 14:33:41 +0800 Subject: [PATCH 4/6] Proper fix for SPARK-16344 --- .../parquet/ParquetReadSupport.scala | 3 +- .../parquet/ParquetRecordMaterializer.scala | 5 +- .../parquet/ParquetRowConverter.scala | 147 +++--------------- .../parquet/ParquetSchemaConverter.scala | 8 +- .../hive/ParquetHiveCompatibilitySuite.scala | 8 +- 5 files changed, 33 insertions(+), 138 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index e6ef63442128d..8a2e0d7995bbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -94,7 +94,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with new ParquetRecordMaterializer( parquetRequestedSchema, - ParquetReadSupport.expandUDT(catalystRequestedSchema)) + ParquetReadSupport.expandUDT(catalystRequestedSchema), + new ParquetSchemaConverter(conf)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index 0818d802b077a..e5dd5fe579db5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -30,10 +30,11 @@ import org.apache.spark.sql.types.StructType * @param catalystSchema Catalyst schema of the rows to be constructed */ private[parquet] class ParquetRecordMaterializer( - parquetSchema: MessageType, catalystSchema: StructType) + parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter) extends RecordMaterializer[InternalRow] { - private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater) + private val rootConverter = + new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 92c45ed17b639..10ddd32c58713 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} -import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type} +import org.apache.parquet.schema.{GroupType, MessageType, Type} import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64} @@ -113,12 +113,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have * any "parent" container. * + * @param schemaConverter A utility converter used to convert Parquet types to Catalyst types. * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( + schemaConverter: ParquetSchemaConverter, parquetType: GroupType, catalystType: StructType, updater: ParentContainerUpdater) @@ -292,9 +294,10 @@ private[parquet] class ParquetRowConverter( new ParquetMapConverter(parquetType.asGroupType(), t, updater) case t: StructType => - new ParquetRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater { - override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) - }) + new ParquetRowConverter( + schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater { + override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) + }) case t => throw new RuntimeException( @@ -442,13 +445,22 @@ private[parquet] class ParquetRowConverter( private val elementConverter: Converter = { val repeatedType = parquetSchema.getType(0) val elementType = catalystSchema.elementType - val parent = parquetSchema - if (isElementType(repeatedType, elementType, parent)) { + // At this stage, we're not sure whether the repeated field maps to the element type or is + // just the syntactic repeated group of the 3-level standard LIST layout. Here we try to + // convert the repeated field into a Catalyst type to see whether the converted type matches + // the Catalyst array element type. + val guessedElementType = schemaConverter.convertField(repeatedType) + + if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)) { + // If the repeated field corresponds to the element type, creates a new converter using the + // type of the repeated field. newConverter(repeatedType, elementType, new ParentContainerUpdater { override def set(value: Any): Unit = currentArray += value }) } else { + // If the repeated field corresponds to the syntactic group in the standard 3-level Parquet + // LIST layout, creates a new converter using the only child field of the repeated field. new ElementConverter(repeatedType.asGroupType().getType(0), elementType) } } @@ -462,129 +474,6 @@ private[parquet] class ParquetRowConverter( // in row cells. override def start(): Unit = currentArray = ArrayBuffer.empty[Any] - // scalastyle:off - /** - * Returns whether the given type is the element type of a list or is a syntactic group with - * one field that is the element type. This is determined by checking whether the type can be - * a syntactic group and by checking whether a potential syntactic group matches the expected - * schema. - * {{{ - * group (LIST) { <-- parent - * repeated group list { <-- repeatedType - * element; - * } - * } - * }}} - * In short, here we handle Parquet list backwards-compatibility rules on the read path. This - * method is based on `AvroIndexedRecordConverter.isElementType`. - * - * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules - */ - // scalastyle:on - private def isElementType( - parquetRepeatedType: Type, catalystElementType: DataType, parent: GroupType): Boolean = { - (parquetRepeatedType, catalystElementType) match { - case (t: PrimitiveType, _) => - // For legacy 2-level list types with primitive element type, e.g.: - // - // // List (nullable list, non-null elements) - // optional group my_list (LIST) { <-- parent - // repeated int32 element; <-- repeatedType - // } - true - - case (t: GroupType, _) if t.getFieldCount > 1 => - // For legacy 2-level list types whose element type is a group type with 2 or more fields, - // e.g.: - // - // // List> (nullable list, non-null elements) - // optional group my_list (LIST) { <-- parent - // repeated group element { <-- repeatedType - // required binary str (UTF8); - // required int32 num; - // }; - // } - true - - case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => - // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), - // e.g.: - // - // // List> (nullable list, non-null elements) - // optional group my_list (LIST) { <-- parent - // repeated group array { <-- repeatedType - // required binary str (UTF8); - // }; - // } - true - - case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parent.getName + "_tuple" => - // For Parquet data generated by parquet-thrift, e.g.: - // - // // List> (nullable list, non-null elements) - // optional group my_list (LIST) { <-- parent - // repeated group my_list_tuple { <-- repeatedType - // required binary str (UTF8); - // }; - // } - true - - case (t: GroupType, _) - if parent.getOriginalType == LIST && - t.getFieldCount == 1 && - t.getName == "list" && - t.getFieldName(0) == "element" => - // For standard 3-level list types, e.g.: - // - // // List (list nullable, elements non-null) - // optional group my_list (LIST) { <-- parent - // repeated group list { <-- repeatedType - // required binary element (UTF8); - // } - // } - // - // This case branch must appear before the next one. See comments of the next case branch - // for details. - false - - case (t: GroupType, StructType(fields)) => - // For legacy 2-level list types whose element type is a group type with a single field, - // e.g.: - // - // // List> (nullable list, non-null elements) - // optional group my_list (LIST) { <-- parent - // repeated group list { <-- repeatedType - // required binary str (UTF8); - // }; - // } - // - // NOTE: This kind of schema is ambiguous. According to parquet-format spec, this schema - // can also be interpreted as `List`. However, consider we have a Parquet file - // with the following legacy 2-level list type whose element type is a group type with - // 2 fields: - // - // // List> (nullable list, non-null elements) - // optional group my_list (LIST) { - // repeated group list { - // required binary str (UTF8); - // required int32 num; - // }; - // } - // - // It's perfectly legal and quite possible that the user level query only want to query a - // single column `str` from this file, and thus gives the first schema as requested - // schema. To disambiguate these two cases, we check to see whether the nested field name - // is a member of corresponding Catalyst struct field names. - // - // NOTE: The standard 3-level LIST layout also matches this pattern since the backwards- - // compatibility rules require that the names "list" and "element" are not mandatory, - // that's why we must check for standard layout before this case branch. - fields.map(_.name).contains(t.getFieldName(0)) - - case _ => false - } - } - /** Array element converter */ private final class ElementConverter(parquetType: Type, catalystType: DataType) extends GroupConverter { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index bcf535d455219..c81a65f4973e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -260,7 +260,7 @@ private[parquet] class ParquetSchemaConverter( { // For legacy 2-level list types with primitive element type, e.g.: // - // // List (nullable list, non-null elements) + // // ARRAY (nullable list, non-null elements) // optional group my_list (LIST) { // repeated int32 element; // } @@ -270,7 +270,7 @@ private[parquet] class ParquetSchemaConverter( // For legacy 2-level list types whose element type is a group type with 2 or more fields, // e.g.: // - // // List> (nullable list, non-null elements) + // // ARRAY> (nullable list, non-null elements) // optional group my_list (LIST) { // repeated group element { // required binary str (UTF8); @@ -282,7 +282,7 @@ private[parquet] class ParquetSchemaConverter( } || { // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.: // - // // List> (nullable list, non-null elements) + // // ARRAY> (nullable list, non-null elements) // optional group my_list (LIST) { // repeated group array { // required binary str (UTF8); @@ -293,7 +293,7 @@ private[parquet] class ParquetSchemaConverter( } || { // For Parquet data generated by parquet-thrift, e.g.: // - // // List> (nullable list, non-null elements) + // // ARRAY> (nullable list, non-null elements) // optional group my_list (LIST) { // repeated group my_list_tuple { // required binary str (UTF8); diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index ac89bbbf8e19d..2b576469e949b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.hive import java.sql.Timestamp -import org.apache.hadoop.hive.conf.HiveConf - import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -137,4 +135,10 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi Row(Row(1, Seq("foo", "bar", null))), "STRUCT>") } + + test("SPARK-16344: array of struct with a single field named 'array_element'") { + testParquetHiveCompatibility( + Row(Seq(Row(1))), + "ARRAY>") + } } From ddde8c6240e7f43f82cbcffa7e31ce445246817c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 10 Jul 2016 16:50:37 +0800 Subject: [PATCH 5/6] Adds assertion --- .../sql/execution/datasources/parquet/ParquetRowConverter.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 10ddd32c58713..aa3588e069130 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -461,6 +461,7 @@ private[parquet] class ParquetRowConverter( } else { // If the repeated field corresponds to the syntactic group in the standard 3-level Parquet // LIST layout, creates a new converter using the only child field of the repeated field. + assert(!repeatedType.isPrimitive && repeatedType.asGroupType().getFieldCount == 1) new ElementConverter(repeatedType.asGroupType().getType(0), elementType) } } From 516a99a32b415898423737946d5d01bc90f1241f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jul 2016 22:18:57 +0800 Subject: [PATCH 6/6] Addresses PR comments --- .../parquet/ParquetRecordMaterializer.scala | 1 + .../parquet/ParquetRowConverter.scala | 29 +++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index e5dd5fe579db5..d12e7805281ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StructType * * @param parquetSchema Parquet schema of the records to be read * @param catalystSchema Catalyst schema of the rows to be constructed + * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters */ private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index aa3588e069130..9ffc2b5dd8a56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -447,9 +447,32 @@ private[parquet] class ParquetRowConverter( val elementType = catalystSchema.elementType // At this stage, we're not sure whether the repeated field maps to the element type or is - // just the syntactic repeated group of the 3-level standard LIST layout. Here we try to - // convert the repeated field into a Catalyst type to see whether the converted type matches - // the Catalyst array element type. + // just the syntactic repeated group of the 3-level standard LIST layout. Take the following + // Parquet LIST-annotated group type as an example: + // + // optional group f (LIST) { + // repeated group list { + // optional group element { + // optional int32 element; + // } + // } + // } + // + // This type is ambiguous: + // + // 1. When interpreted as a standard 3-level layout, the `list` field is just the syntactic + // group, and the entire type should be translated to: + // + // ARRAY> + // + // 2. On the other hand, when interpreted as a non-standard 2-level layout, the `list` field + // represents the element type, and the entire type should be translated to: + // + // ARRAY>> + // + // Here we try to convert field `list` into a Catalyst type to see whether the converted type + // matches the Catalyst array element type. If it doesn't match, then it's case 1; otherwise, + // it's case 2. val guessedElementType = schemaConverter.convertField(repeatedType) if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)) {