From d246010b8b413868c2cad28b6d46e14075d841a4 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 2 Nov 2021 16:00:51 -0700 Subject: [PATCH] [SPARK-36935][SQL] Extend ParquetSchemaConverter to compute Parquet repetition & definition level ### What changes were proposed in this pull request? This PR includes the following: 1. adds a new class `ParquetColumn`, which is a wrapper on a Spark `DataType` with additional Parquet column information, including its max repetition level & definition level, path from the root schema, column descriptor if the node is a leaf, the children nodes is it is a non-leaf, etc. This is needed to support complex type in the vectorized path, where we need to assemble a column vector of complex type using these information. 2. extends `ParquetSchemaConverter` to convert from a Parquet `MessageType` to a `ParquetColumn`, mostly by piggy-backing on the existing logic. A new method `converParquetColumn` is added for this purpose, and the existing `convert` is changed to simply by calling the former. ### Why are the changes needed? In order to support complex type for the vectorized Parquet reader (see SPARK-34863 for more info), we'll need to capture Parquet specific information such as max repetition/definition level for Spark's `DataType`, so that we can assemble primitive column vectors into ones with complex type (e.g., struct, map, array). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Extended the test cases in `ParquetSchemaSuite` Closes #34199 from sunchao/SPARK-36935-column-io. Authored-by: Chao Sun Signed-off-by: Dongjoon Hyun --- .../org/apache/parquet/io/ColumnIOUtil.java | 40 + .../SpecificParquetRecordReaderBase.java | 2 + .../datasources/parquet/ParquetColumn.scala | 55 + .../parquet/ParquetRowConverter.scala | 11 +- .../parquet/ParquetSchemaConverter.scala | 198 ++- .../parquet/ParquetSchemaSuite.scala | 1241 ++++++++++++++++- 6 files changed, 1461 insertions(+), 86 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/parquet/io/ColumnIOUtil.java create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumn.scala diff --git a/sql/core/src/main/java/org/apache/parquet/io/ColumnIOUtil.java b/sql/core/src/main/java/org/apache/parquet/io/ColumnIOUtil.java new file mode 100644 index 0000000000000..d4f93e54caca1 --- /dev/null +++ b/sql/core/src/main/java/org/apache/parquet/io/ColumnIOUtil.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.io; + +/** + * This is a workaround since methods below are not public in {@link ColumnIO}. + * + * TODO(SPARK-36511): we should remove this once PARQUET-2050 and PARQUET-2083 are released with + * Parquet 1.13. + */ +public class ColumnIOUtil { + private ColumnIOUtil() {} + + public static int getDefinitionLevel(ColumnIO column) { + return column.getDefinitionLevel(); + } + + public static int getRepetitionLevel(ColumnIO column) { + return column.getRepetitionLevel(); + } + + public static String[] getFieldPath(ColumnIO column) { + return column.getFieldPath(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index ccfe3792cb826..e1a0607d37c2c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -152,6 +152,7 @@ protected void initialize(String path, List columns) throws IOException Configuration config = new Configuration(); config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false); config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); + config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); this.file = new Path(path); long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen(); @@ -199,6 +200,7 @@ protected void initialize( Configuration config = new Configuration(); config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false); config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); + config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema); this.totalRowCount = totalRowCount; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumn.scala new file mode 100644 index 0000000000000..5d05d6a6759bc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumn.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.parquet.column.ColumnDescriptor +import org.apache.parquet.io.ColumnIOUtil +import org.apache.parquet.io.GroupColumnIO +import org.apache.parquet.io.PrimitiveColumnIO +import org.apache.parquet.schema.Type.Repetition + +import org.apache.spark.sql.types.DataType + +/** + * Rich information for a Parquet column together with its SparkSQL type. + */ +case class ParquetColumn( + sparkType: DataType, + descriptor: Option[ColumnDescriptor], // only set when this is a primitive column + repetitionLevel: Int, + definitionLevel: Int, + required: Boolean, + path: Seq[String], + children: Seq[ParquetColumn]) { + + def isPrimitive: Boolean = descriptor.nonEmpty +} + +object ParquetColumn { + def apply(sparkType: DataType, io: PrimitiveColumnIO): ParquetColumn = { + this(sparkType, Some(io.getColumnDescriptor), ColumnIOUtil.getRepetitionLevel(io), + ColumnIOUtil.getDefinitionLevel(io), io.getType.isRepetition(Repetition.REQUIRED), + ColumnIOUtil.getFieldPath(io), Seq.empty) + } + + def apply(sparkType: DataType, io: GroupColumnIO, children: Seq[ParquetColumn]): ParquetColumn = { + this(sparkType, None, ColumnIOUtil.getRepetitionLevel(io), + ColumnIOUtil.getDefinitionLevel(io), io.getType.isRepetition(Repetition.REQUIRED), + ColumnIOUtil.getFieldPath(io), children) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 19670660fc1d0..942c805a353ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -25,8 +25,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary +import org.apache.parquet.io.ColumnIOFactory import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} -import org.apache.parquet.schema.{GroupType, Type} +import org.apache.parquet.schema.{GroupType, Type, Types} import org.apache.parquet.schema.LogicalTypeAnnotation._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96} @@ -609,7 +610,13 @@ private[parquet] class ParquetRowConverter( // // If the element type does not match the Catalyst type and the underlying repeated type // does not belong to the legacy LIST type, then it is case 1; otherwise, it is case 2. - val guessedElementType = schemaConverter.convertField(repeatedType) + // + // Since `convertField` method requires a Parquet `ColumnIO` as input, here we first create + // a dummy message type which wraps the given repeated type, and then convert it to the + // `ColumnIO` using Parquet API. + val messageType = Types.buildMessage().addField(repeatedType).named("foo") + val column = new ColumnIOFactory().getColumnIO(messageType) + val guessedElementType = schemaConverter.convertField(column.getChild(0)).sparkType val isLegacy = schemaConverter.isElementType(repeatedType, parquetSchema.getName) if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) || isLegacy) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index e91a3ce29b79a..4fc4b98ee9bc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet -import scala.collection.JavaConverters._ - import org.apache.hadoop.conf.Configuration +import org.apache.parquet.io.{ColumnIO, ColumnIOFactory, GroupColumnIO, PrimitiveColumnIO} import org.apache.parquet.schema._ import org.apache.parquet.schema.LogicalTypeAnnotation._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ @@ -30,9 +29,11 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ - /** - * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]]. + * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] + * (via the `convert` method) as well as [[ParquetColumn]] (via the `convertParquetColumn` + * method). The latter contains richer information about the Parquet type, including its + * associated repetition & definition level, column path, column descriptor etc. * * Parquet format backwards-compatibility rules are respected when converting Parquet * [[MessageType]] schemas. @@ -43,57 +44,130 @@ import org.apache.spark.sql.types._ * [[StringType]] fields. * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL * [[TimestampType]] fields. + * @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read + * schema with Parquet schema */ class ParquetToSparkSchemaConverter( assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, - assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) { + assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, + caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, - assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp) + assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, + caseSensitive = conf.caseSensitiveAnalysis) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, - assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean) + assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, + caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. */ - def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType()) + def convert(parquetSchema: MessageType): StructType = { + val column = new ColumnIOFactory().getColumnIO(parquetSchema) + val converted = convertInternal(column) + converted.sparkType.asInstanceOf[StructType] + } - private def convert(parquetSchema: GroupType): StructType = { - val fields = parquetSchema.getFields.asScala.map { field => - field.getRepetition match { - case OPTIONAL => - StructField(field.getName, convertField(field), nullable = true) + /** + * Convert `parquetSchema` into a [[ParquetColumn]] which contains its corresponding Spark + * SQL [[StructType]] along with other information such as the maximum repetition and definition + * level of each node, column descriptor for the leave nodes, etc. + * + * If `sparkReadSchema` is not empty, when deriving Spark SQL type from a Parquet field this will + * check if the same field also exists in the schema. If so, it will use the Spark SQL type. + * This is necessary since conversion from Parquet to Spark could cause precision loss. For + * instance, Spark read schema is smallint/tinyint but Parquet only support int. + */ + def convertParquetColumn( + parquetSchema: MessageType, + sparkReadSchema: Option[StructType] = None): ParquetColumn = { + val column = new ColumnIOFactory().getColumnIO(parquetSchema) + convertInternal(column, sparkReadSchema) + } - case REQUIRED => - StructField(field.getName, convertField(field), nullable = false) + private def convertInternal( + groupColumn: GroupColumnIO, + sparkReadSchema: Option[StructType] = None): ParquetColumn = { + val converted = (0 until groupColumn.getChildrenCount).map { i => + val field = groupColumn.getChild(i) + val fieldFromReadSchema = sparkReadSchema.flatMap { schema => + schema.find(f => isSameFieldName(f.name, field.getName, caseSensitive)) + } + var fieldReadType = fieldFromReadSchema.map(_.dataType) + + // If a field is repeated here then it is neither contained by a `LIST` nor `MAP` + // annotated group (these should've been handled in `convertGroupField`), e.g.: + // + // message schema { + // repeated int32 int_array; + // } + // or + // message schema { + // repeated group struct_array { + // optional int32 field; + // } + // } + // + // the corresponding Spark read type should be an array and we should pass the element type + // to the group or primitive type conversion method. + if (field.getType.getRepetition == REPEATED) { + fieldReadType = fieldReadType.flatMap { + case at: ArrayType => Some(at.elementType) + case _ => + throw QueryCompilationErrors.illegalParquetTypeError(groupColumn.toString) + } + } + + val convertedField = convertField(field, fieldReadType) + val fieldName = fieldFromReadSchema.map(_.name).getOrElse(field.getType.getName) + + field.getType.getRepetition match { + case OPTIONAL | REQUIRED => + val nullable = field.getType.getRepetition == OPTIONAL + (StructField(fieldName, convertedField.sparkType, nullable = nullable), + convertedField) case REPEATED => // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor // annotated by `LIST` or `MAP` should be interpreted as a required list of required // elements where the element type is the type of the field. - val arrayType = ArrayType(convertField(field), containsNull = false) - StructField(field.getName, arrayType, nullable = false) + val arrayType = ArrayType(convertedField.sparkType, containsNull = false) + (StructField(fieldName, arrayType, nullable = false), + ParquetColumn(arrayType, None, convertedField.repetitionLevel - 1, + convertedField.definitionLevel - 1, required = true, convertedField.path, + Seq(convertedField.copy(required = true)))) } } - StructType(fields.toSeq) + ParquetColumn(StructType(converted.map(_._1)), groupColumn, converted.map(_._2)) } + private def isSameFieldName(left: String, right: String, caseSensitive: Boolean): Boolean = + if (!caseSensitive) left.equalsIgnoreCase(right) + else left == right + /** - * Converts a Parquet [[Type]] to a Spark SQL [[DataType]]. + * Converts a Parquet [[Type]] to a [[ParquetColumn]] which wraps a Spark SQL [[DataType]] with + * additional information such as the Parquet column's repetition & definition level, column + * path, column descriptor etc. */ - def convertField(parquetType: Type): DataType = parquetType match { - case t: PrimitiveType => convertPrimitiveField(t) - case t: GroupType => convertGroupField(t.asGroupType()) + def convertField( + field: ColumnIO, + sparkReadType: Option[DataType] = None): ParquetColumn = field match { + case primitiveColumn: PrimitiveColumnIO => convertPrimitiveField(primitiveColumn, sparkReadType) + case groupColumn: GroupColumnIO => convertGroupField(groupColumn, sparkReadType) } - private def convertPrimitiveField(field: PrimitiveType): DataType = { - val typeName = field.getPrimitiveTypeName - val typeAnnotation = field.getLogicalTypeAnnotation + private def convertPrimitiveField( + primitiveColumn: PrimitiveColumnIO, + sparkReadType: Option[DataType] = None): ParquetColumn = { + val parquetType = primitiveColumn.getType.asPrimitiveType() + val typeAnnotation = primitiveColumn.getType.getLogicalTypeAnnotation + val typeName = primitiveColumn.getPrimitive def typeString = if (typeAnnotation == null) s"$typeName" else s"$typeName ($typeAnnotation)" @@ -108,7 +182,7 @@ class ParquetToSparkSchemaConverter( // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored // as binaries with variable lengths. def makeDecimalType(maxPrecision: Int = -1): DecimalType = { - val decimalLogicalTypeAnnotation = field.getLogicalTypeAnnotation + val decimalLogicalTypeAnnotation = typeAnnotation .asInstanceOf[DecimalLogicalTypeAnnotation] val precision = decimalLogicalTypeAnnotation.getPrecision val scale = decimalLogicalTypeAnnotation.getScale @@ -120,7 +194,7 @@ class ParquetToSparkSchemaConverter( DecimalType(precision, scale) } - typeName match { + val sparkType = sparkReadType.getOrElse(typeName match { case BOOLEAN => BooleanType case FLOAT => FloatType @@ -195,17 +269,23 @@ class ParquetToSparkSchemaConverter( case FIXED_LEN_BYTE_ARRAY => typeAnnotation match { case _: DecimalLogicalTypeAnnotation => - makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength)) + makeDecimalType(Decimal.maxPrecisionForBytes(parquetType.getTypeLength)) case _: IntervalLogicalTypeAnnotation => typeNotImplemented() case _ => illegalType() } case _ => illegalType() - } + }) + + ParquetColumn(sparkType, primitiveColumn) } - private def convertGroupField(field: GroupType): DataType = { - Option(field.getLogicalTypeAnnotation).fold(convert(field): DataType) { + private def convertGroupField( + groupColumn: GroupColumnIO, + sparkReadType: Option[DataType] = None): ParquetColumn = { + val field = groupColumn.getType.asGroupType() + Option(field.getLogicalTypeAnnotation).fold( + convertInternal(groupColumn, sparkReadType.map(_.asInstanceOf[StructType]))) { // A Parquet list is represented as a 3-level structure: // // group (LIST) { @@ -222,17 +302,36 @@ class ParquetToSparkSchemaConverter( case _: ListLogicalTypeAnnotation => ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1, s"Invalid list type $field") + ParquetSchemaConverter.checkConversionRequirement( + sparkReadType.forall(_.isInstanceOf[ArrayType]), + s"Invalid Spark read type: expected $field to be list type but found $sparkReadType") - val repeatedType = field.getType(0) + val repeated = groupColumn.getChild(0) + val repeatedType = repeated.getType ParquetSchemaConverter.checkConversionRequirement( repeatedType.isRepetition(REPEATED), s"Invalid list type $field") + val sparkReadElementType = sparkReadType.map(_.asInstanceOf[ArrayType].elementType) if (isElementType(repeatedType, field.getName)) { - ArrayType(convertField(repeatedType), containsNull = false) + var converted = convertField(repeated, sparkReadElementType) + val convertedType = sparkReadElementType.getOrElse(converted.sparkType) + + // legacy format such as: + // optional group my_list (LIST) { + // repeated int32 element; + // } + // we should mark the primitive field as required + if (repeatedType.isPrimitive) converted = converted.copy(required = true) + + ParquetColumn(ArrayType(convertedType, containsNull = false), + groupColumn, Seq(converted)) } else { - val elementType = repeatedType.asGroupType().getType(0) - val optional = elementType.isRepetition(OPTIONAL) - ArrayType(convertField(elementType), containsNull = optional) + val element = repeated.asInstanceOf[GroupColumnIO].getChild(0) + val converted = convertField(element, sparkReadElementType) + val convertedType = sparkReadElementType.getOrElse(converted.sparkType) + val optional = element.getType.isRepetition(OPTIONAL) + ParquetColumn(ArrayType(convertedType, containsNull = optional), + groupColumn, Seq(converted)) } // scalastyle:off @@ -243,20 +342,29 @@ class ParquetToSparkSchemaConverter( ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1 && !field.getType(0).isPrimitive, s"Invalid map type: $field") + ParquetSchemaConverter.checkConversionRequirement( + sparkReadType.forall(_.isInstanceOf[MapType]), + s"Invalid Spark read type: expected $field to be map type but found $sparkReadType") - val keyValueType = field.getType(0).asGroupType() + val keyValue = groupColumn.getChild(0).asInstanceOf[GroupColumnIO] + val keyValueType = keyValue.getType.asGroupType() ParquetSchemaConverter.checkConversionRequirement( keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2, s"Invalid map type: $field") - val keyType = keyValueType.getType(0) - val valueType = keyValueType.getType(1) - val valueOptional = valueType.isRepetition(OPTIONAL) - MapType( - convertField(keyType), - convertField(valueType), - valueContainsNull = valueOptional) - + val key = keyValue.getChild(0) + val value = keyValue.getChild(1) + val sparkReadKeyType = sparkReadType.map(_.asInstanceOf[MapType].keyType) + val sparkReadValueType = sparkReadType.map(_.asInstanceOf[MapType].valueType) + val convertedKey = convertField(key, sparkReadKeyType) + val convertedValue = convertField(value, sparkReadValueType) + val convertedKeyType = sparkReadKeyType.getOrElse(convertedKey.sparkType) + val convertedValueType = sparkReadValueType.getOrElse(convertedValue.sparkType) + val valueOptional = value.getType.isRepetition(OPTIONAL) + ParquetColumn( + MapType(convertedKeyType, convertedValueType, + valueContainsNull = valueOptional), + groupColumn, Seq(convertedKey, convertedValue)) case _ => throw QueryCompilationErrors.unrecognizedParquetTypeError(field.toString) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index c200f97edcd02..fc1392dcc1ce3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -20,8 +20,11 @@ package org.apache.spark.sql.execution.datasources.parquet import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag +import org.apache.parquet.column.ColumnDescriptor import org.apache.parquet.io.ParquetDecodingException -import org.apache.parquet.schema.{MessageType, MessageTypeParser} +import org.apache.parquet.schema._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection @@ -41,14 +44,16 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { messageType: String, binaryAsString: Boolean, int96AsTimestamp: Boolean, - writeLegacyParquetFormat: Boolean): Unit = { + writeLegacyParquetFormat: Boolean, + expectedParquetColumn: Option[ParquetColumn] = None): Unit = { testSchema( testName, StructType.fromAttributes(ScalaReflection.attributesFor[T]), messageType, binaryAsString, int96AsTimestamp, - writeLegacyParquetFormat) + writeLegacyParquetFormat, + expectedParquetColumn = expectedParquetColumn) } protected def testParquetToCatalyst( @@ -56,13 +61,19 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { sqlSchema: StructType, parquetSchema: String, binaryAsString: Boolean, - int96AsTimestamp: Boolean): Unit = { + int96AsTimestamp: Boolean, + caseSensitive: Boolean = false, + sparkReadSchema: Option[StructType] = None, + expectedParquetColumn: Option[ParquetColumn] = None): Unit = { val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = binaryAsString, - assumeInt96IsTimestamp = int96AsTimestamp) + assumeInt96IsTimestamp = int96AsTimestamp, + caseSensitive = caseSensitive) test(s"sql <= parquet: $testName") { - val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema)) + val actualParquetColumn = converter.convertParquetColumn( + MessageTypeParser.parseMessageType(parquetSchema), sparkReadSchema) + val actual = actualParquetColumn.sparkType val expected = sqlSchema assert( actual === expected, @@ -70,6 +81,10 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { |Expected schema: ${expected.json} |Actual schema: ${actual.json} """.stripMargin) + + if (expectedParquetColumn.isDefined) { + compareParquetColumn(actualParquetColumn, expectedParquetColumn.get) + } } } @@ -100,7 +115,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean, outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = - SQLConf.ParquetOutputTimestampType.INT96): Unit = { + SQLConf.ParquetOutputTimestampType.INT96, + expectedParquetColumn: Option[ParquetColumn] = None): Unit = { testCatalystToParquet( testName, @@ -114,7 +130,66 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { sqlSchema, parquetSchema, binaryAsString, - int96AsTimestamp) + int96AsTimestamp, + expectedParquetColumn = expectedParquetColumn) + } + + protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = { + assert(actual.sparkType == expected.sparkType, "sparkType mismatch: " + + s"actual = ${actual.sparkType}, expected = ${expected.sparkType}") + assert(actual.descriptor === expected.descriptor, "column descriptor mismatch: " + + s"actual = ${actual.descriptor}, expected = ${expected.descriptor})") + // since Parquet ColumnDescriptor equals only compares path, we'll need to compare other + // fields explicitly here + if (actual.descriptor.isDefined && expected.descriptor.isDefined) { + val actualDesc = actual.descriptor.get + val expectedDesc = expected.descriptor.get + assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel) + assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel) + assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType) + } + + assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " + + s"actual = ${actual.repetitionLevel}, expected = ${expected.repetitionLevel}") + assert(actual.definitionLevel == expected.definitionLevel, "definition level mismatch: " + + s"actual = ${actual.definitionLevel}, expected = ${expected.definitionLevel}") + assert(actual.required == expected.required, "required mismatch: " + + s"actual = ${actual.required}, expected = ${expected.required}") + assert(actual.path == expected.path, "path mismatch: " + + s"actual = ${actual.path}, expected = ${expected.path}") + + assert(actual.children.size == expected.children.size, "size of children mismatch: " + + s"actual = ${actual.children.size}, expected = ${expected.children.size}") + actual.children.zip(expected.children).foreach { case (actualChild, expectedChild) => + compareParquetColumn(actualChild, expectedChild) + } + } + + protected def primitiveParquetColumn( + sparkType: DataType, + parquetTypeName: PrimitiveTypeName, + repetition: Repetition, + repetitionLevel: Int, + definitionLevel: Int, + path: Seq[String], + logicalTypeAnnotation: Option[LogicalTypeAnnotation] = None): ParquetColumn = { + var typeBuilder = repetition match { + case Repetition.REQUIRED => Types.required(parquetTypeName) + case Repetition.OPTIONAL => Types.optional(parquetTypeName) + case Repetition.REPEATED => Types.repeated(parquetTypeName) + } + if (logicalTypeAnnotation.isDefined) { + typeBuilder = typeBuilder.as(logicalTypeAnnotation.get) + } + ParquetColumn( + sparkType = sparkType, + descriptor = Some(new ColumnDescriptor(path.toArray, + typeBuilder.named(path.last), repetitionLevel, definitionLevel)), + repetitionLevel = repetitionLevel, + definitionLevel = definitionLevel, + required = repetition != Repetition.OPTIONAL, + path = path, + children = Seq.empty) } } @@ -133,7 +208,31 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = false, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + expectedParquetColumn = Some( + ParquetColumn( + sparkType = StructType.fromAttributes( + ScalaReflection.attributesFor[(Boolean, Int, Long, Float, Double, Array[Byte])]), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + primitiveParquetColumn(BooleanType, PrimitiveTypeName.BOOLEAN, Repetition.REQUIRED, + 0, 0, Seq("_1")), + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 0, 0, Seq("_2")), + primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED, + 0, 0, Seq("_3")), + primitiveParquetColumn(FloatType, PrimitiveTypeName.FLOAT, Repetition.REQUIRED, + 0, 0, Seq("_4")), + primitiveParquetColumn(DoubleType, PrimitiveTypeName.DOUBLE, Repetition.REQUIRED, + 0, 0, Seq("_5")), + primitiveParquetColumn(BinaryType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 0, 1, Seq("_6")) + ))) + ) testSchemaInference[(Byte, Short, Int, Long, java.sql.Date)]( "logical integral types", @@ -148,7 +247,28 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + expectedParquetColumn = Some( + ParquetColumn( + sparkType = StructType.fromAttributes( + ScalaReflection.attributesFor[(Byte, Short, Int, Long, java.sql.Date)]), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + primitiveParquetColumn(ByteType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(8, true))), + primitiveParquetColumn(ShortType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 0, 0, Seq("_2"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(16, true))), + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 0, 0, Seq("_3"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(32, true))), + primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED, + 0, 0, Seq("_4"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, true))), + primitiveParquetColumn(DateType, PrimitiveTypeName.INT32, Repetition.OPTIONAL, + 0, 1, Seq("_5"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.dateType())) + )))) testSchemaInference[Tuple1[String]]( "string", @@ -159,7 +279,20 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + expectedParquetColumn = Some( + ParquetColumn( + sparkType = StructType.fromAttributes( + ScalaReflection.attributesFor[Tuple1[String]]), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 0, 1, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())) + )))) testSchemaInference[Tuple1[String]]( "binary enum as string", @@ -170,7 +303,20 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + expectedParquetColumn = Some( + ParquetColumn( + sparkType = StructType.fromAttributes( + ScalaReflection.attributesFor[Tuple1[String]]), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 0, 1, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.enumType())) + )))) testSchemaInference[Tuple1[Seq[Int]]]( "non-nullable array - non-standard", @@ -183,7 +329,28 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + ArrayType(IntegerType, containsNull = false)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REPEATED, + 1, 2, Seq("_1", "array"))) + ))))) testSchemaInference[Tuple1[Seq[Int]]]( "non-nullable array - standard", @@ -198,7 +365,28 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = false) + writeLegacyParquetFormat = false, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + ArrayType(IntegerType, containsNull = false)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("_1", "list", "element")) + )))))) testSchemaInference[Tuple1[Seq[Integer]]]( "nullable array - non-standard", @@ -213,7 +401,28 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + ArrayType(IntegerType, containsNull = true)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.OPTIONAL, + 1, 3, Seq("_1", "bag", "array")) + )))))) testSchemaInference[Tuple1[Seq[Integer]]]( "nullable array - standard", @@ -228,7 +437,28 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = false) + writeLegacyParquetFormat = false, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + ArrayType(IntegerType, containsNull = true)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.OPTIONAL, + 1, 3, Seq("_1", "list", "element")) + )))))) testSchemaInference[Tuple1[Map[Int, String]]]( "map - standard", @@ -244,7 +474,33 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = false) + writeLegacyParquetFormat = false, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + MapType(IntegerType, StringType, valueContainsNull = true), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = MapType(IntegerType, StringType, valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("_1", "key_value", "key")), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 3, Seq("_1", "key_value", "value"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType()))) + ))))) testSchemaInference[Tuple1[Map[Int, String]]]( "map - non-standard", @@ -260,7 +516,33 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + MapType(IntegerType, StringType, valueContainsNull = true), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = MapType(IntegerType, StringType, valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("_1", "key_value", "key")), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 3, Seq("_1", "key_value", "value"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType()))) + ))))) testSchemaInference[Tuple1[Map[(String, String), String]]]( "map - group type key", @@ -279,7 +561,50 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + MapType(StructType(Seq(StructField("_1", StringType), StructField("_2", StringType))), + StringType, valueContainsNull = true), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = MapType( + StructType(Seq(StructField("_1", StringType), StructField("_2", StringType))), + StringType, valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + ParquetColumn( + sparkType = + StructType(Seq(StructField("_1", StringType), StructField("_2", StringType))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 2, + required = true, + path = Seq("_1", "key_value", "key"), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 3, Seq("_1", "key_value", "key", "_1"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 3, Seq("_1", "key_value", "key", "_2"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType()))) + ), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 3, Seq("_1", "key_value", "value"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType()))) + ))))) testSchemaInference[Tuple1[(Int, String)]]( "struct", @@ -293,7 +618,36 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = false) + writeLegacyParquetFormat = false, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", StringType)))))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", StringType))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 0, 1, Seq("_1", "_1")), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 0, 2, Seq("_1", "_2"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType()))) + ))))) testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( "deeply nested type - non-standard", @@ -319,7 +673,89 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + MapType(IntegerType, + StructType(Seq( + StructField("_1", StringType), + StructField("_2", ArrayType( + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false))))))), + valueContainsNull = true)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = + MapType(IntegerType, + StructType(Seq( + StructField("_1", StringType), + StructField("_2", ArrayType( + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false))))))), + valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("_1", "key_value", "key")), + ParquetColumn( + sparkType = + StructType(Seq( + StructField("_1", StringType), + StructField("_2", ArrayType( + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false))))))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 3, + required = false, + path = Seq("_1", "key_value", "value"), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 4, Seq("_1", "key_value", "value", "_1"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())), + ParquetColumn( + sparkType = ArrayType( + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false)))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 4, + required = false, + path = Seq("_1", "key_value", "value", "_2"), + children = Seq( + ParquetColumn( + sparkType = StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false))), + descriptor = None, + repetitionLevel = 2, + definitionLevel = 6, + required = false, + path = Seq("_1", "key_value", "value", "_2", "bag", "array"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, + Repetition.REQUIRED, 2, 6, + Seq("_1", "key_value", "value", "_2", "bag", "array", "_1")), + primitiveParquetColumn(DoubleType, PrimitiveTypeName.DOUBLE, + Repetition.REQUIRED, 2, 6, + Seq("_1", "key_value", "value", "_2", "bag", "array", "_2")) + )))))) + )))))) testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( "deeply nested type - standard", @@ -345,7 +781,88 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = false) + writeLegacyParquetFormat = false, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "_1", + MapType(IntegerType, + StructType(Seq( + StructField("_1", StringType), + StructField("_2", ArrayType( + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false))))))), + valueContainsNull = true)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = + MapType(IntegerType, + StructType(Seq( + StructField("_1", StringType), + StructField("_2", ArrayType( + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false))))))), + valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("_1", "key_value", "key")), + ParquetColumn( + sparkType = + StructType(Seq( + StructField("_1", StringType), + StructField("_2", ArrayType( + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false))))))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 3, + required = false, + path = Seq("_1", "key_value", "value"), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 4, Seq("_1", "key_value", "value", "_1"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())), + ParquetColumn( + sparkType = ArrayType( + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false)))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 4, + required = false, + path = Seq("_1", "key_value", "value", "_2"), + children = Seq( + ParquetColumn( + sparkType = StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", DoubleType, nullable = false))), + descriptor = None, + repetitionLevel = 2, + definitionLevel = 6, + required = false, + path = Seq("_1", "key_value", "value", "_2", "list", "element"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, + Repetition.REQUIRED, 2, 6, + Seq("_1", "key_value", "value", "_2", "list", "element", "_1")), + primitiveParquetColumn(DoubleType, PrimitiveTypeName.DOUBLE, + Repetition.REQUIRED, 2, 6, + Seq("_1", "key_value", "value", "_2", "list", "element", "_2")))))))) + )))))) testSchemaInference[(Option[Int], Map[Int, Option[Double]])]( "optional types", @@ -362,7 +879,39 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = false) + writeLegacyParquetFormat = false, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField("_1", IntegerType), + StructField("_2", MapType(IntegerType, DoubleType, valueContainsNull = true)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = IntegerType, + descriptor = Some(new ColumnDescriptor(Array("_1"), + Types.optional(PrimitiveTypeName.INT32).named("_1"), 0, 1)), + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_1"), + children = Seq()), + ParquetColumn( + sparkType = MapType(IntegerType, DoubleType, valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("_2"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("_2", "key_value", "key")), + primitiveParquetColumn(DoubleType, PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL, + 1, 3, Seq("_2", "key_value", "value"))))) + ))) } class ParquetSchemaSuite extends ParquetSchemaTest { @@ -474,7 +1023,28 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType(IntegerType, containsNull = true)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.OPTIONAL, + 1, 3, Seq("f1", "list", "element")))) + )))) testParquetToCatalyst( "Backwards-compatibility: LIST with nullable element type - 2", @@ -492,7 +1062,28 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType(IntegerType, containsNull = true)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.OPTIONAL, + 1, 3, Seq("f1", "element", "num")))) + )))) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 1 - standard", @@ -507,7 +1098,28 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType(IntegerType, containsNull = false)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "list", "element")))) + )))) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 2", @@ -522,7 +1134,28 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType(IntegerType, containsNull = false)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "element", "num")))) + )))) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 3", @@ -535,7 +1168,28 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType(IntegerType, containsNull = false)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REPEATED, + 1, 2, Seq("f1", "element")))) + )))) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 4", @@ -558,7 +1212,48 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType( + StructType(Seq( + StructField("str", StringType, nullable = false), + StructField("num", IntegerType, nullable = false))), + containsNull = false)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + ArrayType( + StructType(Seq( + StructField("str", StringType, nullable = false), + StructField("num", IntegerType, nullable = false))), + containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq(ParquetColumn( + sparkType = StructType(Seq( + StructField("str", StringType, nullable = false), + StructField("num", IntegerType, nullable = false))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 2, + required = false, + path = Seq("f1", "element"), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.REQUIRED, + 1, 2, Seq("f1", "element", "str"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())), + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "element", "num"))))) + ))))) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style", @@ -579,7 +1274,44 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType( + StructType(Seq( + StructField("str", StringType, nullable = false))), + containsNull = false), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType( + StructType(Seq( + StructField("str", StringType, nullable = false))), + containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq(ParquetColumn( + sparkType = StructType(Seq( + StructField("str", StringType, nullable = false))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 2, + required = false, + path = Seq("f1", "array"), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.REQUIRED, + 1, 2, Seq("f1", "array", "str"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())))))) + )))) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style", @@ -600,7 +1332,44 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType( + StructType(Seq( + StructField("str", StringType, nullable = false))), + containsNull = false), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType( + StructType(Seq( + StructField("str", StringType, nullable = false))), + containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq(ParquetColumn( + sparkType = StructType(Seq( + StructField("str", StringType, nullable = false))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 2, + required = false, + path = Seq("f1", "f1_tuple"), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.REQUIRED, + 1, 2, Seq("f1", "f1_tuple", "str"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())))))) + )))) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type 7 - " + @@ -612,7 +1381,28 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType(IntegerType, containsNull = false), nullable = false))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType(IntegerType, containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = true, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REPEATED, + 1, 1, Seq("f1"))))) + ))) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type 8 - " + @@ -634,7 +1424,49 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + ArrayType( + new StructType() + .add("c1", StringType, nullable = true) + .add("c2", IntegerType, nullable = false), + containsNull = false), nullable = false))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq(ParquetColumn( + sparkType = ArrayType( + new StructType() + .add("c1", StringType, nullable = true) + .add("c2", IntegerType, nullable = false), + containsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = true, + path = Seq("f1"), + children = Seq( + ParquetColumn( + sparkType = new StructType() + .add("c1", StringType, nullable = true) + .add("c2", IntegerType, nullable = false), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 1, + required = true, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 2, Seq("f1", "c1"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())), + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 1, Seq("f1", "c2"))))) + ))))) // ======================================================= // Tests for converting Catalyst ArrayType to Parquet LIST @@ -727,7 +1559,33 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + MapType(IntegerType, StringType, valueContainsNull = false), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = MapType(IntegerType, StringType, valueContainsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "key")), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "value"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())) + )))))) testParquetToCatalyst( "Backwards-compatibility: MAP with non-nullable value type - 2", @@ -746,7 +1604,33 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + MapType(IntegerType, StringType, valueContainsNull = false), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = MapType(IntegerType, StringType, valueContainsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "num")), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "str"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())) + )))))) testParquetToCatalyst( "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x", @@ -765,7 +1649,33 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + MapType(IntegerType, StringType, valueContainsNull = false), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = MapType(IntegerType, StringType, valueContainsNull = false), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "key")), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "value"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType()))) + ))))) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 1 - standard", @@ -784,7 +1694,33 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + MapType(IntegerType, StringType, valueContainsNull = true), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = MapType(IntegerType, StringType, valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "key")), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 3, Seq("f1", "key_value", "value"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType()))) + ))))) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 2", @@ -803,7 +1739,33 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + MapType(IntegerType, StringType, valueContainsNull = true), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = MapType(IntegerType, StringType, valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "num")), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 3, Seq("f1", "key_value", "str"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType()))) + ))))) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style", @@ -822,7 +1784,33 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true) + int96AsTimestamp = true, + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "f1", + MapType(IntegerType, StringType, valueContainsNull = true), + nullable = true))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = MapType(IntegerType, StringType, valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "key")), + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 3, Seq("f1", "key_value", "value"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType()))) + ))))) // ==================================================== // Tests for converting Catalyst MapType to Parquet Map @@ -900,6 +1888,181 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """.stripMargin, writeLegacyParquetFormat = true) + testParquetToCatalyst( + "SPARK-36935: test case insensitive when converting Parquet schema", + StructType(Seq(StructField("F1", ShortType))), + """message root { + | optional int32 f1; + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + sparkReadSchema = Some(StructType(Seq(StructField("F1", ShortType)))), + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq(StructField("F1", ShortType))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + primitiveParquetColumn(ShortType, PrimitiveTypeName.INT32, Repetition.OPTIONAL, + 0, 1, Seq("f1")) + ) + ))) + + testParquetToCatalyst( + "SPARK-36935: test case sensitive when converting Parquet schema", + StructType(Seq(StructField("f1", IntegerType))), + """message root { + | optional int32 f1; + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + caseSensitive = true, + sparkReadSchema = Some(StructType(Seq(StructField("F1", ShortType)))), + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq(StructField("f1", IntegerType))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + primitiveParquetColumn(IntegerType, PrimitiveTypeName.INT32, Repetition.OPTIONAL, + 0, 1, Seq("f1")) + ) + ))) + + testParquetToCatalyst( + "SPARK-36935: test Spark read schema with case sensitivity", + StructType(Seq( + StructField( + "F1", + MapType(ShortType, + StructType(Seq( + StructField("G1", StringType), + StructField("G2", ArrayType( + StructType(Seq( + StructField("H1", ByteType, nullable = false), + StructField("H2", FloatType, nullable = false))))))), + valueContainsNull = true)))), + """message root { + | optional group f1 (MAP_KEY_VALUE) { + | repeated group key_value { + | required int32 key; + | optional group value { + | optional binary g1 (UTF8); + | optional group g2 (LIST) { + | repeated group list { + | optional group element { + | required int32 h1; + | required double h2; + | } + | } + | } + | } + | } + | } + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + sparkReadSchema = + Some(StructType(Seq( + StructField( + "F1", + MapType(ShortType, + StructType(Seq( + StructField("G1", StringType), + StructField("G2", ArrayType( + StructType(Seq( + StructField("H1", ByteType, nullable = false), + StructField("H2", FloatType, nullable = false))))))), + valueContainsNull = true))))), + expectedParquetColumn = Some(ParquetColumn( + sparkType = StructType(Seq( + StructField( + "F1", + MapType(ShortType, + StructType(Seq( + StructField("G1", StringType), + StructField("G2", ArrayType( + StructType(Seq( + StructField("H1", ByteType, nullable = false), + StructField("H2", FloatType, nullable = false))))))), + valueContainsNull = true)))), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + ParquetColumn( + sparkType = + MapType(ShortType, + StructType(Seq( + StructField("G1", StringType), + StructField("G2", ArrayType( + StructType(Seq( + StructField("H1", ByteType, nullable = false), + StructField("H2", FloatType, nullable = false))))))), + valueContainsNull = true), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 1, + required = false, + path = Seq("f1"), + children = Seq( + primitiveParquetColumn(ShortType, PrimitiveTypeName.INT32, Repetition.REQUIRED, + 1, 2, Seq("f1", "key_value", "key")), + ParquetColumn( + sparkType = + StructType(Seq( + StructField("G1", StringType), + StructField("G2", ArrayType( + StructType(Seq( + StructField("H1", ByteType, nullable = false), + StructField("H2", FloatType, nullable = false))))))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 3, + required = false, + path = Seq("f1", "key_value", "value"), + children = Seq( + primitiveParquetColumn(StringType, PrimitiveTypeName.BINARY, Repetition.OPTIONAL, + 1, 4, Seq("f1", "key_value", "value", "g1"), + logicalTypeAnnotation = Some(LogicalTypeAnnotation.stringType())), + ParquetColumn( + sparkType = ArrayType( + StructType(Seq( + StructField("H1", ByteType, nullable = false), + StructField("H2", FloatType, nullable = false)))), + descriptor = None, + repetitionLevel = 1, + definitionLevel = 4, + required = false, + path = Seq("f1", "key_value", "value", "g2"), + children = Seq( + ParquetColumn( + sparkType = StructType(Seq( + StructField("H1", ByteType, nullable = false), + StructField("H2", FloatType, nullable = false))), + descriptor = None, + repetitionLevel = 2, + definitionLevel = 6, + required = false, + path = Seq("f1", "key_value", "value", "g2", "list", "element"), + children = Seq( + primitiveParquetColumn(ByteType, PrimitiveTypeName.INT32, + Repetition.REQUIRED, 2, 6, + Seq("f1", "key_value", "value", "g2", "list", "element", "h1")), + primitiveParquetColumn(FloatType, PrimitiveTypeName.DOUBLE, + Repetition.REQUIRED, 2, 6, + Seq("f1", "key_value", "value", "g2", "list", "element", "h2")))))))) + )))))) + // ================================= // Tests for conversion for decimals // =================================