From 1a5b164153df946d713b34727a001d5005479d1a Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Wed, 27 Jan 2016 07:49:53 +0530 Subject: [PATCH] SPARK-12998 [SQL]. Enable OrcRelation when connecting via spark thrift server. --- .../apache/spark/sql/hive/HiveContext.scala | 12 ++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 122 +++++++++++++++++- .../spark/sql/hive/orc/OrcQuerySuite.scala | 13 +- 3 files changed, 139 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index eaca3c9269bb7..de14991554f11 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -116,6 +116,12 @@ class HiveContext private[hive]( */ protected[sql] def convertMetastoreParquet: Boolean = getConf(CONVERT_METASTORE_PARQUET) + /** + * When true, orc SerDe are automatically converted to use the Spark SQL + * orc table scan, instead of the Hive SerDe. + */ + protected[sql] def convertMetastoreOrc: Boolean = getConf(CONVERT_METASTORE_ORC) + /** * When true, also tries to merge possibly different but compatible Parquet schemas in different * Parquet data files. @@ -461,6 +467,7 @@ class HiveContext private[hive]( new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = catalog.ParquetConversions :: + catalog.OrcConversions :: catalog.CreateTables :: catalog.PreInsertionCasts :: ExtractPythonUDFs :: @@ -673,6 +680,11 @@ private[hive] object HiveContext { doc = "When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " + "the built in support.") + val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc", + defaultValue = Some(true), + doc = "When set to false, Spark SQL will use the Hive SerDe for orc tables instead of " + + "the built in support.") + val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING = booleanConf( "spark.sql.hive.convertMetastoreParquet.mergeSchema", defaultValue = Some(false), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0cfe03ba91ec7..404afad34a69c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -40,10 +40,11 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.util.DataTypeParser import org.apache.spark.sql.execution.{datasources, FileRelation} -import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, _} +import org.apache.spark.sql.execution.datasources.{Partition => TPartition, _} import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.HiveNativeCommand +import org.apache.spark.sql.hive.orc.OrcRelation import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -488,7 +489,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) }) - ParquetPartition(values, location) + TPartition(values, location) } val partitionSpec = PartitionSpec(partitionSchema, partitions) val paths = partitions.map(_.path) @@ -520,6 +521,87 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } + private def convertToOrcRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { + val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + + val orcOptions = Map[String, String]() + val tableIdentifier = + QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) + + def getCached( + tableIdentifier: QualifiedTableName, + pathsInMetastore: Seq[String], + schemaInMetastore: StructType, + partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { + cachedDataSourceTables.getIfPresent(tableIdentifier) match { + case null => None // Cache miss + case logical @ LogicalRelation(orcRelation: OrcRelation, _, _) => + // If we have the same paths, same schema, and same partition spec, + // we will use the cached orc Relation. + val useCached = + orcRelation.paths.toSet == pathsInMetastore.toSet && + logical.schema.sameType(metastoreSchema) && + orcRelation.partitionSpec == partitionSpecInMetastore.getOrElse { + PartitionSpec(StructType(Nil), Array.empty[datasources.Partition]) + } + + if (useCached) { + Some(logical) + } else { + // If the cached relation is not updated, we invalidate it right away. + cachedDataSourceTables.invalidate(tableIdentifier) + None + } + case other => + logWarning( + s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + + s"as Orc. However, we are getting a $other from the metastore " + + s"cache. This cached entry will be invalidated.") + cachedDataSourceTables.invalidate(tableIdentifier) + None + } + } + + val result = if (metastoreRelation.hiveQlTable.isPartitioned) { + val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) + val partitionColumnDataTypes = partitionSchema.map(_.dataType) + val partitions = metastoreRelation.getHiveQlPartitions().map { p => + val location = p.getLocation + val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { + case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) + }) + TPartition(values, location) + } + val partitionSpec = PartitionSpec(partitionSchema, partitions) + val paths = partitions.map(_.path) + + val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec)) + val orcRelation = cached.getOrElse { + val created = LogicalRelation( + new OrcRelation( + paths.toArray, Some(metastoreSchema), Some(partitionSpec), orcOptions)(hive)) + cachedDataSourceTables.put(tableIdentifier, created) + created + } + + orcRelation + } else { + val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + + val cached = getCached(tableIdentifier, paths, metastoreSchema, None) + val orcRelation = cached.getOrElse { + val created = LogicalRelation( + new OrcRelation(paths.toArray, Some(metastoreSchema), None, orcOptions)(hive)) + cachedDataSourceTables.put(tableIdentifier, created) + created + } + + orcRelation + } + + result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) + } + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { val db = databaseName.getOrElse(client.currentDatabase) @@ -562,6 +644,42 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } + /** + * When scanning or writing to non-partitioned Metastore Orc tables, convert + * them to Orc data source relations for better performance. + */ + object OrcConversions extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.resolved || plan.analyzed) { + return plan + } + + plan transformUp { + // Write path + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in ORC data source (yet). + if !r.hiveQlTable.isPartitioned && hive.convertMetastoreOrc && + r.tableDesc.getSerdeClassName.toLowerCase.contains("orc") => + val orcRelation = convertToOrcRelation(r) + InsertIntoTable(orcRelation, partition, child, overwrite, ifNotExists) + + // Write path + case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in ORC data source (yet). + if !r.hiveQlTable.isPartitioned && hive.convertMetastoreOrc && + r.tableDesc.getSerdeClassName.toLowerCase.contains("orc") => + val orcRelation = convertToOrcRelation(r) + InsertIntoTable(orcRelation, partition, child, overwrite, ifNotExists) + + // Read path + case relation: MetastoreRelation if hive.convertMetastoreOrc && + relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") => + val orcRelation = convertToOrcRelation(relation) + Subquery(relation.alias.getOrElse(relation.tableName), orcRelation) + } + } + } + /** * Creates any tables required for query execution. * For example, because of a CREATE TABLE X AS statement. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 2156806d21f96..beb0bac3fd7c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -307,7 +307,6 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val path = dir.getCanonicalPath withTable("empty_orc") { - withTempTable("empty", "single") { sqlContext.sql( s"""CREATE TABLE empty_orc(key INT, value STRING) |STORED AS ORC @@ -317,18 +316,18 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) emptyDF.registerTempTable("empty") - // This creates 1 empty ORC file with Hive ORC SerDe. We are using this trick because - // Spark SQL ORC data source always avoids write empty ORC files. + // No data file would be generated sqlContext.sql( s"""INSERT INTO TABLE empty_orc |SELECT key, value FROM empty """.stripMargin) - val errorMessage = intercept[AnalysisException] { + // IllegalArgumentException as no ORC files are stored + val errorMessage = intercept[IllegalArgumentException] { sqlContext.read.orc(path) }.getMessage - assert(errorMessage.contains("Failed to discover schema from ORC files")) + assert(errorMessage.contains("does not have valid orc files")) val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1) singleRowDF.registerTempTable("single") @@ -341,7 +340,9 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val df = sqlContext.read.orc(path) assert(df.schema === singleRowDF.schema.asNullable) checkAnswer(df, singleRowDF) - } + + sqlContext.dropTempTable("empty") + sqlContext.dropTempTable("single") } } }