From 90444a22e319c9c4f78cf87f1b82a825d9675e22 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 20 Jul 2016 16:30:05 +0800 Subject: [PATCH] Don't convert Orc/Parquet Metastore tables to datasource tables if metastore schema does not match schema stored in the Orc/Parquet files. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 80 ++++++++++++------- .../spark/sql/hive/orc/OrcQuerySuite.scala | 34 ++++++++ 2 files changed, 87 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2be51ed0e87e..3fa5bfc2d32c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -244,7 +244,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log options: Map[String, String], defaultSource: FileFormat, fileFormatClass: Class[_ <: FileFormat], - fileType: String): LogicalRelation = { + fileType: String): Option[LogicalRelation] = { val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) @@ -285,7 +285,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec, Some(partitionSpec)) - val hadoopFsRelation = cached.getOrElse { + val hadoopFsRelation = if (cached.isDefined) { + cached + } else { val fileCatalog = new MetaStorePartitionedTableFileCatalog( sparkSession, new Path(metastoreRelation.catalogTable.storage.locationUri.get), @@ -301,21 +303,27 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get } - val relation = HadoopFsRelation( - sparkSession = sparkSession, - location = fileCatalog, - partitionSchema = partitionSchema, - dataSchema = inferredSchema, - bucketSpec = bucketSpec, - fileFormat = defaultSource, - options = options) - - val created = LogicalRelation( - relation, - metastoreTableIdentifier = - Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) - cachedDataSourceTables.put(tableIdentifier, created) - created + // If the inferred schema from the data doesn't match the schema stored in metastore, + // we should not convert the MetastoreRelation to data source relation. + if (!DataType.equalsIgnoreCompatibleNullability(inferredSchema, metastoreSchema)) { + None + } else { + val relation = HadoopFsRelation( + sparkSession = sparkSession, + location = fileCatalog, + partitionSchema = partitionSchema, + dataSchema = inferredSchema, + bucketSpec = bucketSpec, + fileFormat = defaultSource, + options = options) + + val created = LogicalRelation( + relation, + metastoreTableIdentifier = + Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) + cachedDataSourceTables.put(tableIdentifier, created) + Some(created) + } } hadoopFsRelation @@ -347,9 +355,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log created } - logicalRelation + Some(logicalRelation) } - result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) + result.map(_.copy(expectedOutputAttributes = Some(metastoreRelation.output))) } /** @@ -362,7 +370,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log sessionState.convertMetastoreParquet } - private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { + private def convertToParquetRelation(relation: MetastoreRelation): Option[LogicalRelation] = { val defaultSource = new ParquetFileFormat() val fileFormatClass = classOf[ParquetFileFormat] @@ -379,15 +387,24 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + case i @ InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => - InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) + val parquetRelation = convertToParquetRelation(r) + if (parquetRelation.isDefined) { + InsertIntoTable(parquetRelation.get, partition, child, overwrite, ifNotExists) + } else { + i + } // Read path case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => val parquetRelation = convertToParquetRelation(relation) - SubqueryAlias(relation.tableName, parquetRelation) + if (parquetRelation.isDefined) { + SubqueryAlias(relation.tableName, parquetRelation.get) + } else { + relation + } } } } @@ -402,7 +419,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log sessionState.convertMetastoreOrc } - private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { + private def convertToOrcRelation(relation: MetastoreRelation): Option[LogicalRelation] = { val defaultSource = new OrcFileFormat() val fileFormatClass = classOf[OrcFileFormat] val options = Map[String, String]() @@ -417,15 +434,24 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + case i @ InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Orc data source (yet). if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => - InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + val orcRelation = convertToOrcRelation(r) + if (orcRelation.isDefined) { + InsertIntoTable(orcRelation.get, partition, child, overwrite, ifNotExists) + } else { + i + } // Read path case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => val orcRelation = convertToOrcRelation(relation) - SubqueryAlias(relation.tableName, orcRelation) + if (orcRelation.isDefined) { + SubqueryAlias(relation.tableName, orcRelation.get) + } else { + relation + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index b9e98fc85f78..299ce8d6ff90 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -450,6 +450,40 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + test("No ORC conversion when metastore schema does not match schema stored in ORC files") { + withTempTable("single") { + val singleRowDF = Seq((0, "foo")).toDF("key", "value") + singleRowDF.createOrReplaceTempView("single") + + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") { + withTable("dummy_orc") { + withTempPath { dir => + val path = dir.getCanonicalPath + singleRowDF.write.partitionBy("key").orc(path) + + // Create a Metastore ORC table with different schema. + spark.sql( + s""" + |CREATE TABLE dummy_orc(value STRING, value2 STRING) + |PARTITIONED BY (key INT) + |STORED AS ORC + |LOCATION '$path' + """.stripMargin) + + val df = spark.sql("SELECT key, value FROM dummy_orc WHERE key=0") + val queryExecution = df.queryExecution + queryExecution.analyzed.collectFirst { + case _: MetastoreRelation => () + }.getOrElse { + fail(s"Expecting no conversion from orc to data sources, " + + s"but got:\n$queryExecution") + } + } + } + } + } + } + test("SPARK-14962 Produce correct results on array type with isnotnull") { withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { val data = (0 until 10).map(i => Tuple1(Array(i)))