From 5c3d20b654609c86de9c24c9751ec34916f3aabd Mon Sep 17 00:00:00 2001 From: seancxmao Date: Fri, 17 Aug 2018 18:06:28 +0800 Subject: [PATCH] SPARK-25132: case-insensitive field resolution when reading from Parquet/ORC * Fix ParquetFileFormat * More than one Parquet column is matched * Fix OrcFileFormat (both native and hive implementations) * Fix issues according to review results: refactor test cases, code style, ... * Test cases: change paruqet/orc file schema from a to A * Test cases: let different columns have different value series * Refine error message * Split multi-format test suite * Simplify test cases for ambiguous resolution * Simplify test cases to reduce code lines * Refine tests and comments --- .../execution/datasources/orc/OrcUtils.scala | 29 +++++- .../parquet/ParquetFileFormat.scala | 3 + .../parquet/ParquetReadSupport.scala | 89 ++++++++++++++----- .../spark/sql/FileBasedDataSourceSuite.scala | 45 ++++++++++ .../parquet/ParquetSchemaSuite.scala | 62 +++++++++++-- .../spark/sql/hive/orc/OrcFileFormat.scala | 88 ++++++++++++++++-- .../sql/hive/orc/HiveOrcQuerySuite.scala | 46 ++++++++++ 7 files changed, 323 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index b404cfa61f41e..1b353f0fa4451 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -26,7 +26,7 @@ import org.apache.orc.{OrcFile, Reader, TypeDescription} import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types._ @@ -115,8 +115,31 @@ object OrcUtils extends Logging { } }) } else { - val resolver = if (isCaseSensitive) caseSensitiveResolution else caseInsensitiveResolution - Some(requiredSchema.fieldNames.map { name => orcFieldNames.indexWhere(resolver(_, name)) }) + if (isCaseSensitive) { + Some(requiredSchema.fieldNames.map { name => + orcFieldNames.indexWhere(caseSensitiveResolution(_, name)) + }) + } else { + // Do case-insensitive resolution only if in case-insensitive mode + val caseInsensitiveOrcFieldMap = orcFieldNames.zipWithIndex.groupBy(_._1.toLowerCase) + Some(requiredSchema.fieldNames.map { + requiredFieldName => + caseInsensitiveOrcFieldMap.get(requiredFieldName.toLowerCase).map { + matchedOrcFields => + if (matchedOrcFields.size > 1) { + // Need to fail if there is ambiguity, i.e. more than one field is matched. + val matchedOrcFieldsString = + matchedOrcFields.map(_._1).mkString("[", ", ", "]") + throw new AnalysisException( + s"""Found duplicate field(s) "$requiredFieldName": """ + + s"$matchedOrcFieldsString in case-insensitive modes") + } else { + // Exactly one field is matched + matchedOrcFields(0)._2 + } + }.getOrElse(-1) // No field matched + }) + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index b2409f3470e73..d7eb14356b8b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -310,6 +310,9 @@ class ParquetFileFormat hadoopConf.set( SQLConf.SESSION_LOCAL_TIMEZONE.key, sparkSession.sessionState.conf.sessionLocalTimeZone) + hadoopConf.setBoolean( + SQLConf.CASE_SENSITIVE.key, + sparkSession.sessionState.conf.caseSensitiveAnalysis) ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 40ce5d5e0564e..04fa7e75d60b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -29,7 +29,9 @@ import org.apache.parquet.schema._ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -71,8 +73,10 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } - val parquetRequestedSchema = - ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) + val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key, + SQLConf.CASE_SENSITIVE.defaultValue.get) + val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema( + context.getFileSchema, catalystRequestedSchema, caseSensitive) new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) } @@ -117,8 +121,12 @@ private[parquet] object ParquetReadSupport { * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist * in `catalystSchema`, and adding those only exist in `catalystSchema`. */ - def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { - val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) + def clipParquetSchema( + parquetSchema: MessageType, + catalystSchema: StructType, + caseSensitive: Boolean = true): MessageType = { + val clippedParquetFields = clipParquetGroupFields( + parquetSchema.asGroupType(), catalystSchema, caseSensitive) if (clippedParquetFields.isEmpty) { ParquetSchemaConverter.EMPTY_MESSAGE } else { @@ -129,20 +137,21 @@ private[parquet] object ParquetReadSupport { } } - private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { + private def clipParquetType( + parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type = { catalystType match { case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => // Only clips array types with nested type as element type. - clipParquetListType(parquetType.asGroupType(), t.elementType) + clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive) case t: MapType if !isPrimitiveCatalystType(t.keyType) || !isPrimitiveCatalystType(t.valueType) => // Only clips map types with nested key type or value type - clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType) + clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive) case t: StructType => - clipParquetGroup(parquetType.asGroupType(), t) + clipParquetGroup(parquetType.asGroupType(), t, caseSensitive) case _ => // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able @@ -168,14 +177,15 @@ private[parquet] object ParquetReadSupport { * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a * [[StructType]]. */ - private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { + private def clipParquetListType( + parquetList: GroupType, elementType: DataType, caseSensitive: Boolean): Type = { // Precondition of this method, should only be called for lists with nested element types. assert(!isPrimitiveCatalystType(elementType)) // Unannotated repeated group should be interpreted as required list of required element, so // list element type is just the group itself. Clip it. if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { - clipParquetType(parquetList, elementType) + clipParquetType(parquetList, elementType, caseSensitive) } else { assert( parquetList.getOriginalType == OriginalType.LIST, @@ -207,7 +217,7 @@ private[parquet] object ParquetReadSupport { Types .buildGroup(parquetList.getRepetition) .as(OriginalType.LIST) - .addField(clipParquetType(repeatedGroup, elementType)) + .addField(clipParquetType(repeatedGroup, elementType, caseSensitive)) .named(parquetList.getName) } else { // Otherwise, the repeated field's type is the element type with the repeated field's @@ -218,7 +228,7 @@ private[parquet] object ParquetReadSupport { .addField( Types .repeatedGroup() - .addField(clipParquetType(repeatedGroup.getType(0), elementType)) + .addField(clipParquetType(repeatedGroup.getType(0), elementType, caseSensitive)) .named(repeatedGroup.getName)) .named(parquetList.getName) } @@ -231,7 +241,10 @@ private[parquet] object ParquetReadSupport { * a [[StructType]]. */ private def clipParquetMapType( - parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { + parquetMap: GroupType, + keyType: DataType, + valueType: DataType, + caseSensitive: Boolean): GroupType = { // Precondition of this method, only handles maps with nested key types or value types. assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType)) @@ -243,8 +256,8 @@ private[parquet] object ParquetReadSupport { Types .repeatedGroup() .as(repeatedGroup.getOriginalType) - .addField(clipParquetType(parquetKeyType, keyType)) - .addField(clipParquetType(parquetValueType, valueType)) + .addField(clipParquetType(parquetKeyType, keyType, caseSensitive)) + .addField(clipParquetType(parquetValueType, valueType, caseSensitive)) .named(repeatedGroup.getName) Types @@ -262,8 +275,9 @@ private[parquet] object ParquetReadSupport { * [[MessageType]]. Because it's legal to construct an empty requested schema for column * pruning. */ - private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = { - val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType) + private def clipParquetGroup( + parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): GroupType = { + val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseSensitive) Types .buildGroup(parquetRecord.getRepetition) .as(parquetRecord.getOriginalType) @@ -277,14 +291,41 @@ private[parquet] object ParquetReadSupport { * @return A list of clipped [[GroupType]] fields, which can be empty. */ private def clipParquetGroupFields( - parquetRecord: GroupType, structType: StructType): Seq[Type] = { - val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = { val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false) - structType.map { f => - parquetFieldMap - .get(f.name) - .map(clipParquetType(_, f.dataType)) - .getOrElse(toParquet.convertField(f)) + if (caseSensitive) { + val caseSensitiveParquetFieldMap = + parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + structType.map { + f => { + caseSensitiveParquetFieldMap + .get(f.name) + .map(clipParquetType(_, f.dataType, caseSensitive)) + .getOrElse(toParquet.convertField(f)) + } + } + } else { + // Do case-insensitive resolution only if in case-insensitive mode + val caseInsensitiveParquetFieldMap = + parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase) + structType.map { + f => { + caseInsensitiveParquetFieldMap + .get(f.name.toLowerCase) + .map { + parquetTypes => + if (parquetTypes.size > 1) { + // Need to fail if there is ambiguity, i.e. more than one field is matched + val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]") + throw new AnalysisException(s"""Found duplicate field(s) "${f.name}": """ + + s"$parquetTypesString in case-insensitive mode") + } else { + // Exactly one field is matched + clipParquetType(parquetTypes(0), f.dataType, caseSensitive) + } + }.getOrElse(toParquet.convertField(f)) + } + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 9f9af89570789..32acc3fc87f73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -430,6 +430,51 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } } + + Seq("parquet", "orc").foreach { format => + test(s"SPARK-25132: case-insensitive field resolution when reading from Parquet/ORC - " + + s"$format (native implementation)") { + withTempDir { dir => + val tableDir = dir.getCanonicalPath + s"/$format" + val tableName = s"spark_25132_${format}_native" + withTable(tableName) { + val end = 5 + val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B") + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + data.write.format(format).mode("overwrite").save(tableDir) + } + sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'") + val nulls = (0 until end).map(_ => Row(null)) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkAnswer(sql(s"select a from $tableName"), data.select(col("A"))) + checkAnswer(sql(s"select A from $tableName"), data.select(col("A"))) + + // AnalysisException from Executor when reading files is wrapped in SparkException + val e1 = intercept[SparkException] { + sql(s"select b from $tableName").collect() + } + assert( + e1.getCause.isInstanceOf[AnalysisException] && + e1.getCause.getMessage.contains( + """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) + val e2 = intercept[SparkException] { + sql(s"select B from $tableName").collect() + } + assert( + e2.getCause.isInstanceOf[AnalysisException] && + e2.getCause.getMessage.contains( + """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + checkAnswer(sql(s"select a from $tableName"), nulls) + checkAnswer(sql(s"select b from $tableName"), data.select(col("b"))) + } + } + } + } + } } object TestingUDT { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 368e52cfbda9c..908b72dd6f115 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -24,6 +24,7 @@ import org.apache.parquet.io.ParquetDecodingException import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException @@ -1014,19 +1015,21 @@ class ParquetSchemaSuite extends ParquetSchemaTest { testName: String, parquetSchema: String, catalystSchema: StructType, - expectedSchema: String): Unit = { + expectedSchema: String, + caseSensitive: Boolean = true): Unit = { testSchemaClipping(testName, parquetSchema, catalystSchema, - MessageTypeParser.parseMessageType(expectedSchema)) + MessageTypeParser.parseMessageType(expectedSchema), caseSensitive) } private def testSchemaClipping( testName: String, parquetSchema: String, catalystSchema: StructType, - expectedSchema: MessageType): Unit = { + expectedSchema: MessageType, + caseSensitive: Boolean): Unit = { test(s"Clipping - $testName") { val actual = ParquetReadSupport.clipParquetSchema( - MessageTypeParser.parseMessageType(parquetSchema), catalystSchema) + MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive) try { expectedSchema.checkContains(actual) @@ -1387,7 +1390,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { catalystSchema = new StructType(), - expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE) + expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE, + caseSensitive = true) testSchemaClipping( "disjoint field sets", @@ -1544,4 +1548,52 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin) + + testSchemaClipping( + "case-insensitive resolution: no ambiguity", + parquetSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + |} + """.stripMargin, + catalystSchema = { + val nestedType = new StructType().add("b", IntegerType, nullable = true) + new StructType() + .add("a", nestedType, nullable = true) + .add("c", IntegerType, nullable = true) + }, + expectedSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + |} + """.stripMargin, + caseSensitive = false) + + test("Clipping - case-insensitive resolution: more than one field is matched") { + val parquetSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + | optional int32 a; + |} + """.stripMargin + val catalystSchema = { + val nestedType = new StructType().add("b", IntegerType, nullable = true) + new StructType() + .add("a", nestedType, nullable = true) + .add("c", IntegerType, nullable = true) + } + assertThrows[AnalysisException] { + ParquetReadSupport.clipParquetSchema( + MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive = false) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index de8085f07db19..c7ccb4bf27dcd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -35,12 +35,14 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.orc.OrcConf.COMPRESS import org.apache.spark.TaskContext -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcOptions import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -123,6 +125,9 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, + sparkSession.sessionState.conf.caseSensitiveAnalysis) + if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f => @@ -148,8 +153,6 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable if (isEmptyFile) { Iterator.empty } else { - OrcFileFormat.setRequiredColumns(conf, dataSchema, requiredSchema) - val orcRecordReader = { val job = Job.getInstance(conf) FileInputFormat.setInputPaths(job, file.filePath) @@ -160,6 +163,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable // avoid NameNode call in unwrapOrcStructs per file. // Specifically would be helpful for partitioned datasets. val orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)) + OrcFileFormat.setRequiredColumns(conf, dataSchema, requiredSchema, orcReader) new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength) } @@ -335,9 +339,79 @@ private[orc] object OrcFileFormat extends HiveInspectors { } def setRequiredColumns( - conf: Configuration, dataSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => dataSchema.fieldIndex(a.name): Integer) - val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + conf: Configuration, + dataSchema: StructType, + requestedSchema: StructType, + reader: Reader): Unit = { + // Get the list of types contained in the file. The root type is the first type in the list. + val orcFieldNames = reader.getTypes.get(0).getFieldNamesList.asScala + if (orcFieldNames.isEmpty) { + // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer. + val (sortedIDs, sortedNames) = (null, null) + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + } else { + if (orcFieldNames.forall(_.startsWith("_col"))) { + // This is a ORC file written by Hive, no field names in the physical schema, assume the + // physical schema maps to the data scheme by index. + assert(orcFieldNames.length <= dataSchema.length, "The given data schema " + + s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " + + "no idea which columns were dropped, fail to read.") + val ids = requestedSchema.map { requestedField => + val index = dataSchema.fieldIndex(requestedField.name): Integer + if (index < orcFieldNames.length) { + index + } else { + throw new IllegalArgumentException(s"Field ${requestedField.name} does not exist.") + } + } + val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + } else { + val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key, + SQLConf.CASE_SENSITIVE.defaultValue.get) + if (caseSensitive) { + val ids = requestedSchema.map { requestedField => + orcFieldNames.indexOf(requestedField.name): Integer + } + val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + } else { + // Do case-insensitive resolution only if in case-insensitive mode + val caseInsensitiveOrcFieldMap = + orcFieldNames.zipWithIndex.groupBy(_._1.toLowerCase) + val nameIds = requestedSchema.fieldNames.map { + requestedFieldName => { + caseInsensitiveOrcFieldMap.get(requestedFieldName.toLowerCase).map { + matchedOrcFields => + if (matchedOrcFields.size > 1) { + // Need to fail if there is ambiguity, i.e. more than one field is matched. + val matchedOrcFieldsString = + matchedOrcFields.map(_._1).mkString("[", ", ", "]") + throw new AnalysisException( + s"""Found duplicate field(s) "$requestedFieldName": """ + + s"$matchedOrcFieldsString in case-insensitive mode") + } else { + // Exactly one field is matched + matchedOrcFields(0) + } + }.getOrElse( + // No field matched, but exists in data schema, filter out later + if (dataSchema.fieldNames.count( + caseInsensitiveResolution(_, requestedFieldName)) != 0) { + (requestedFieldName, -1) + } else { + // Neither exists in ORC schema, nor in data schema + throw new IllegalArgumentException( + s"Field ${requestedFieldName} does not exist.") + } + ) + } + } + val (sortedIDs, sortedNames) = + nameIds.filter(_._2 != -1).map(nameId => (nameId._2: Integer, nameId._1)).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + } + } + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 597b0f56a55e4..181a9c81911c4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -21,10 +21,12 @@ import java.io.File import com.google.common.io.Files +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.orc.OrcQueryTest +import org.apache.spark.sql.functions.col import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -218,4 +220,48 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } } + + test("SPARK-25132: case-insensitive field resolution when reading from Parquet/ORC - " + + "ORC (hive implementation)") { + val (format, impl) = ("orc", "hive") + withTempDir { dir => + val tableDir = dir.getCanonicalPath + s"/$format" + val tableName = s"spark_25132_${format}_${impl}" + withTable(tableName) { + val end = 5 + val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B") + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + data.write.format(format).mode("overwrite").save(tableDir) + } + sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'") + val nulls = (0 until end).map(_ => Row(null)) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkAnswer(sql(s"select a from $tableName"), data.select(col("A"))) + checkAnswer(sql(s"select A from $tableName"), data.select(col("A"))) + + // AnalysisException from Executor when reading files is wrapped in SparkException + val e1 = intercept[SparkException] { + sql(s"select b from $tableName").collect() + } + assert( + e1.getCause.isInstanceOf[AnalysisException] && + e1.getCause.getMessage.contains( + """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) + val e2 = intercept[SparkException] { + sql(s"select B from $tableName").collect() + } + assert( + e2.getCause.isInstanceOf[AnalysisException] && + e2.getCause.getMessage.contains( + """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + checkAnswer(sql(s"select a from $tableName"), nulls) + checkAnswer(sql(s"select b from $tableName"), data.select(col("b"))) + } + } + } + } }