From 938b4303cde28e72481324bdb8f5f89b2ae1baff Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 20:49:15 -0700 Subject: [PATCH 01/14] remove lookupRelation --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 76 +++++++++---------- .../spark/sql/hive/HiveSessionCatalog.scala | 30 +++++++- 2 files changed, 61 insertions(+), 45 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c7c1acda25db2..41e0e0b4af708 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -136,53 +136,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) } - def refreshTable(tableIdent: TableIdentifier): Unit = { - // refreshTable does not eagerly reload the cache. It just invalidate the cache. - // Next time when we use the table, it will be populated in the cache. - // Since we also cache ParquetRelations converted from Hive Parquet tables and - // adding converted ParquetRelations into the cache is not defined in the load function - // of the cache (instead, we add the cache entry in convertToParquetRelation), - // it is better at here to invalidate the cache to avoid confusing waring logs from the - // cache loader (e.g. cannot find data source provider, which is only defined for - // data source table.). - cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) - } - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString } - def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String]): LogicalPlan = { - val qualifiedTableName = getQualifiedTableName(tableIdent) - val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name) - - if (table.properties.get(DATASOURCE_PROVIDER).isDefined) { - val dataSourceTable = cachedDataSourceTables(qualifiedTableName) - val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable) - // Then, if alias is specified, wrap the table with a Subquery using the alias. - // Otherwise, wrap the table with a Subquery using the table name. - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) - } else if (table.tableType == CatalogTableType.VIEW) { - val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) - alias match { - case None => - SubqueryAlias(table.identifier.table, - sparkSession.sessionState.sqlParser.parsePlan(viewText)) - case Some(aliasText) => - SubqueryAlias(aliasText, sessionState.sqlParser.parsePlan(viewText)) - } - } else { - val qualifiedTable = - MetastoreRelation( - qualifiedTableName.database, qualifiedTableName.name)(table, client, sparkSession) - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) - } - } - private def getCached( tableIdentifier: QualifiedTableName, pathsInMetastore: Seq[String], @@ -460,6 +419,41 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log mode == SaveMode.Ignore) } } + + /** + * Data Source Table is inserted directly, using Cache.put. + * Note, this is not using automatic cache loading. + */ + def cacheTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + cachedDataSourceTables.put(getQualifiedTableName(tableIdent), plan) + } + + def getTableIfPresent(tableIdent: TableIdentifier): Option[LogicalPlan] = { + cachedDataSourceTables.getIfPresent(getQualifiedTableName(tableIdent)) match { + case null => None // Cache miss + case o: LogicalPlan => Option(o.asInstanceOf[LogicalPlan]) + } + } + + def getTable(tableIdent: TableIdentifier): LogicalPlan = { + cachedDataSourceTables.get(getQualifiedTableName(tableIdent)) + } + + def refreshTable(tableIdent: TableIdentifier): Unit = { + // refreshTable does not eagerly reload the cache. It just invalidate the cache. + // Next time when we use the table, it will be populated in the cache. + // Since we also cache ParquetRelations converted from Hive Parquet tables and + // adding converted ParquetRelations into the cache is not defined in the load function + // of the cache (instead, we add the cache entry in convertToParquetRelation), + // it is better at here to invalidate the cache to avoid confusing waring logs from the + // cache loader (e.g. cannot find data source provider, which is only defined for + // data source table.). + cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) + } + + def invalidateAll(): Unit = { + cachedDataSourceTables.invalidateAll() + } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index c59ac3dcafea4..207bd3544976e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -29,10 +29,11 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResourceLoader, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.SQLConf @@ -63,9 +64,30 @@ private[sql] class HiveSessionCatalog( override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = { val table = formatTableName(name.table) if (name.database.isDefined || !tempTables.contains(table)) { - val database = name.database.map(formatDatabaseName) - val newName = name.copy(database = database, table = table) - metastoreCatalog.lookupRelation(newName, alias) + val database = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + val newName = name.copy(database = Option(database), table = table) + val metadata = getTableMetadata(newName) + if (DDLUtils.isDatasourceTable(metadata.properties)) { + val dataSourceTable = metastoreCatalog.getTable(newName) + val qualifiedTable = SubqueryAlias(table, dataSourceTable) + // Then, if alias is specified, wrap the table with a Subquery using the alias. + // Otherwise, wrap the table with a Subquery using the table name. + alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) + } else if (metadata.tableType == CatalogTableType.VIEW) { + val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text.")) + alias match { + case None => + SubqueryAlias(metadata.identifier.table, + sparkSession.sessionState.sqlParser.parsePlan(viewText)) + case Some(aliasText) => + SubqueryAlias(aliasText, sparkSession.sessionState.sqlParser.parsePlan(viewText)) + } + } else { + val qualifiedTable = + MetastoreRelation( + databaseName = database, tableName = table)(metadata, client, sparkSession) + alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) + } } else { val relation = tempTables(table) val tableWithQualifiers = SubqueryAlias(table, relation) From 43eb5ee93401c3b63ca2aaba1569a3e077450821 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 21:11:56 -0700 Subject: [PATCH 02/14] remove hiveDefaultTableFilePath --- .../sql/catalyst/catalog/SessionCatalog.scala | 16 ++++++++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 ------ .../spark/sql/hive/HiveSessionCatalog.scala | 17 ++++++++++++++--- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 00c3db0aac1ac..bd182719ee351 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -495,6 +495,22 @@ class SessionCatalog( } } + /** + * Invalidate all the cache entries. + */ + def invalidateCache(): Unit = { /* no-op */ } + + /** + * Directly put the cache entry for a metastore table, if any. + */ + def cacheDataSourceTable(name: TableIdentifier, plan: LogicalPlan): Unit = { /* no-op */ } + + /** + * Get the cache entry for a metastore table, if given. + * If cache missed, returns None. It will not invoke automatic loading. + */ + def getCachedDataSourceTableIfPresent(name: TableIdentifier): Option[LogicalPlan] = { None } + /** * Drop all existing temporary tables. * For testing only. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 41e0e0b4af708..d425447901bbb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -136,12 +136,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) } - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { - // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) - new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString - } - private def getCached( tableIdentifier: QualifiedTableName, pathsInMetastore: Seq[String], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 207bd3544976e..21f4ce619d4db 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -21,6 +21,7 @@ import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} @@ -116,12 +117,22 @@ private[sql] class HiveSessionCatalog( metastoreCatalog.refreshTable(name) } - def invalidateCache(): Unit = { - metastoreCatalog.cachedDataSourceTables.invalidateAll() + override def invalidateCache(): Unit = { + metastoreCatalog.invalidateAll() + } + + override def cacheDataSourceTable(name: TableIdentifier, plan: LogicalPlan): Unit = { + metastoreCatalog.cacheTable(name, plan) + } + + override def getCachedDataSourceTableIfPresent(name: TableIdentifier): Option[LogicalPlan] = { + metastoreCatalog.getTableIfPresent(name) } def hiveDefaultTableFilePath(name: TableIdentifier): String = { - metastoreCatalog.hiveDefaultTableFilePath(name) + // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) + val dbName = name.database.getOrElse(getCurrentDatabase) + new Path(new Path(getDatabaseMetadata(dbName).locationUri), name.table).toString } // For testing only From c68015e47813abd270d98da79e8981ac3b7660f5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 21:25:04 -0700 Subject: [PATCH 03/14] remove CreateTables --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 35 ----------------- .../spark/sql/hive/HiveSessionCatalog.scala | 1 - .../spark/sql/hive/HiveSessionState.scala | 2 +- .../spark/sql/hive/HiveStrategies.scala | 39 +++++++++++++++++++ 4 files changed, 40 insertions(+), 37 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d425447901bbb..01a3bd99b4591 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -379,41 +379,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - /** - * Creates any tables required for query execution. - * For example, because of a CREATE TABLE X AS statement. - */ - object CreateTables extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // Wait until children are resolved. - case p: LogicalPlan if !p.childrenResolved => p - - case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => - val newTableDesc = if (tableDesc.storage.serde.isEmpty) { - // add default serde - tableDesc.withNewStorage( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - } else { - tableDesc - } - - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc) - - // Currently we will never hit this branch, as SQL string API can only use `Ignore` or - // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde - // tables yet. - if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { - throw new AnalysisException("" + - "CTAS for hive serde tables does not support append or overwrite semantics.") - } - - execution.CreateHiveTableAsSelectCommand( - newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))), - query, - mode == SaveMode.Ignore) - } - } - /** * Data Source Table is inserted directly, using Cache.put. * Note, this is not using automatic cache loading. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 21f4ce619d4db..7bd7c27b81e69 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -110,7 +110,6 @@ private[sql] class HiveSessionCatalog( val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions - val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables override def refreshTable(name: TableIdentifier): Unit = { super.refreshTable(name) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index e01c053ab5a76..d85fe0c619497 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -64,7 +64,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val extendedResolutionRules = catalog.ParquetConversions :: catalog.OrcConversions :: - catalog.CreateTables :: + new CreateTables(sparkSession) :: PreprocessDDL(conf) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 17956ded1796d..e4a266790e464 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,11 +18,14 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ private[hive] trait HiveStrategies { @@ -74,3 +77,39 @@ private[hive] trait HiveStrategies { } } } + +/** + * Creates any tables required for query execution. + * For example, because of a CREATE TABLE X AS statement. + */ +class CreateTables(sparkSession: SparkSession) extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Wait until children are resolved. + case p: LogicalPlan if !p.childrenResolved => p + + case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => + val catalog = sparkSession.sessionState.catalog + val dbName = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase).toLowerCase + + val newTableDesc = if (tableDesc.storage.serde.isEmpty) { + // add default serde + tableDesc.withNewStorage( + serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + } else { + tableDesc + } + + // Currently we will never hit this branch, as SQL string API can only use `Ignore` or + // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde + // tables yet. + if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { + throw new AnalysisException("" + + "CTAS for hive serde tables does not support append or overwrite semantics.") + } + + execution.CreateHiveTableAsSelectCommand( + newTableDesc.copy(identifier = TableIdentifier(tableDesc.identifier.table, Some(dbName))), + query, + mode == SaveMode.Ignore) + } +} From 839712ff963ad5d218ac96fdd1ee1d387fc8e45f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 22:54:40 -0700 Subject: [PATCH 04/14] remove OrcConversions and ParquetConversions --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 294 +---------------- .../spark/sql/hive/HiveSessionCatalog.scala | 3 - .../spark/sql/hive/HiveSessionState.scala | 4 +- .../spark/sql/hive/HiveStrategies.scala | 300 +++++++++++++++++- 4 files changed, 302 insertions(+), 299 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 01a3bd99b4591..fba2d541101b8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,22 +17,15 @@ package org.apache.spark.sql.hive -import scala.collection.JavaConverters._ - import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.execution.datasources.{Partition => _, _} -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} -import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.types._ @@ -44,7 +37,6 @@ import org.apache.spark.sql.types._ */ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] - private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) @@ -57,18 +49,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log tableIdent.table.toLowerCase) } - private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = { - QualifiedTableName( - t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase, - t.identifier.table.toLowerCase) - } - /** A cache of Spark SQL data source tables that have been accessed. */ protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { override def load(in: QualifiedTableName): LogicalPlan = { logDebug(s"Creating new cached data source for $in") - val table = client.getTable(in.database, in.name) + val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name) // TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable @@ -136,249 +122,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) } - private def getCached( - tableIdentifier: QualifiedTableName, - pathsInMetastore: Seq[String], - metastoreRelation: MetastoreRelation, - schemaInMetastore: StructType, - expectedFileFormat: Class[_ <: FileFormat], - expectedBucketSpec: Option[BucketSpec], - partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { - - cachedDataSourceTables.getIfPresent(tableIdentifier) match { - case null => None // Cache miss - case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => - val cachedRelationFileFormatClass = relation.fileFormat.getClass - - expectedFileFormat match { - case `cachedRelationFileFormatClass` => - // If we have the same paths, same schema, and same partition spec, - // we will use the cached relation. - val useCached = - relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && - logical.schema.sameType(schemaInMetastore) && - relation.bucketSpec == expectedBucketSpec && - relation.partitionSpec == partitionSpecInMetastore.getOrElse { - PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory]) - } - - if (useCached) { - Some(logical) - } else { - // If the cached relation is not updated, we invalidate it right away. - cachedDataSourceTables.invalidate(tableIdentifier) - None - } - case _ => - logWarning( - s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " + - s"should be stored as $expectedFileFormat. However, we are getting " + - s"a ${relation.fileFormat} from the metastore cache. This cached " + - s"entry will be invalidated.") - cachedDataSourceTables.invalidate(tableIdentifier) - None - } - case other => - logWarning( - s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + - s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " + - s"This cached entry will be invalidated.") - cachedDataSourceTables.invalidate(tableIdentifier) - None - } - } - - private def convertToLogicalRelation( - metastoreRelation: MetastoreRelation, - options: Map[String, String], - defaultSource: FileFormat, - fileFormatClass: Class[_ <: FileFormat], - fileType: String): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) - val tableIdentifier = - QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) - val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. - - val result = if (metastoreRelation.hiveQlTable.isPartitioned) { - val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) - val partitionColumnDataTypes = partitionSchema.map(_.dataType) - // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore - // are empty. - val partitions = metastoreRelation.getHiveQlPartitions().map { p => - val location = p.getLocation - val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { - case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) - }) - PartitionDirectory(values, location) - } - val partitionSpec = PartitionSpec(partitionSchema, partitions) - val partitionPaths = partitions.map(_.path.toString) - - // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a - // partitioned table's paths depends on whether that table has any actual partitions. - // Partitioned tables without partitions use the location of the table's base path. - // Partitioned tables with partitions use the locations of those partitions' data locations, - // _omitting_ the table's base path. - val paths = if (partitionPaths.isEmpty) { - Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - } else { - partitionPaths - } - - val cached = getCached( - tableIdentifier, - paths, - metastoreRelation, - metastoreSchema, - fileFormatClass, - bucketSpec, - Some(partitionSpec)) - - val hadoopFsRelation = cached.getOrElse { - val fileCatalog = new MetaStorePartitionedTableFileCatalog( - sparkSession, - new Path(metastoreRelation.catalogTable.storage.locationUri.get), - partitionSpec) - - val inferredSchema = if (fileType.equals("parquet")) { - val inferredSchema = - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - inferredSchema.map { inferred => - ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) - }.getOrElse(metastoreSchema) - } else { - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get - } - - val relation = HadoopFsRelation( - sparkSession = sparkSession, - location = fileCatalog, - partitionSchema = partitionSchema, - dataSchema = inferredSchema, - bucketSpec = bucketSpec, - fileFormat = defaultSource, - options = options) - - val created = LogicalRelation( - relation, - metastoreTableIdentifier = - Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) - cachedDataSourceTables.put(tableIdentifier, created) - created - } - - hadoopFsRelation - } else { - val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - - val cached = getCached(tableIdentifier, - paths, - metastoreRelation, - metastoreSchema, - fileFormatClass, - bucketSpec, - None) - val logicalRelation = cached.getOrElse { - val created = - LogicalRelation( - DataSource( - sparkSession = sparkSession, - paths = paths, - userSpecifiedSchema = Some(metastoreRelation.schema), - bucketSpec = bucketSpec, - options = options, - className = fileType).resolveRelation(), - metastoreTableIdentifier = - Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) - - - cachedDataSourceTables.put(tableIdentifier, created) - created - } - - logicalRelation - } - result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) - } - - /** - * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet - * data source relations for better performance. - */ - object ParquetConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = { - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") && - sessionState.convertMetastoreParquet - } - - private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new ParquetFileFormat() - val fileFormatClass = classOf[ParquetFileFormat] - - val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging - val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString) - - convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet") - } - - override def apply(plan: LogicalPlan): LogicalPlan = { - if (!plan.resolved || plan.analyzed) { - return plan - } - - plan transformUp { - // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => - InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) - - // Read path - case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => - val parquetRelation = convertToParquetRelation(relation) - SubqueryAlias(relation.tableName, parquetRelation) - } - } - } - - /** - * When scanning Metastore ORC tables, convert them to ORC data source relations - * for better performance. - */ - object OrcConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { - relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") && - sessionState.convertMetastoreOrc - } - - private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new OrcFileFormat() - val fileFormatClass = classOf[OrcFileFormat] - val options = Map[String, String]() - - convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") - } - - override def apply(plan: LogicalPlan): LogicalPlan = { - if (!plan.resolved || plan.analyzed) { - return plan - } - - plan transformUp { - // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Orc data source (yet). - if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => - InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) - - // Read path - case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => - val orcRelation = convertToOrcRelation(relation) - SubqueryAlias(relation.tableName, orcRelation) - } - } - } - /** * Data Source Table is inserted directly, using Cache.put. * Note, this is not using automatic cache loading. @@ -414,34 +157,3 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log cachedDataSourceTables.invalidateAll() } } - -/** - * An override of the standard HDFS listing based catalog, that overrides the partition spec with - * the information from the metastore. - * - * @param tableBasePath The default base path of the Hive metastore table - * @param partitionSpec The partition specifications from Hive metastore - */ -private[hive] class MetaStorePartitionedTableFileCatalog( - sparkSession: SparkSession, - tableBasePath: Path, - override val partitionSpec: PartitionSpec) - extends ListingFileCatalog( - sparkSession, - MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, partitionSpec), - Map.empty, - Some(partitionSpec.partitionColumns)) { -} - -private[hive] object MetaStorePartitionedTableFileCatalog { - /** Get the list of paths to list files in the for a metastore table */ - def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = { - // If there are no partitions currently specified then use base path, - // otherwise use the paths corresponding to the partitions. - if (partitionSpec.partitions.isEmpty) { - Seq(tableBasePath) - } else { - partitionSpec.partitions.map(_.path) - } - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 7bd7c27b81e69..7b45e8ebe24c3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -108,9 +108,6 @@ private[sql] class HiveSessionCatalog( // and HiveCatalog. We should still do it at some point... private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession) - val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions - val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions - override def refreshTable(name: TableIdentifier): Unit = { super.refreshTable(name) metastoreCatalog.refreshTable(name) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index d85fe0c619497..a2a18c38a3d72 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -62,8 +62,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override lazy val analyzer: Analyzer = { new Analyzer(catalog, conf) { override val extendedResolutionRules = - catalog.ParquetConversions :: - catalog.OrcConversions :: + new ParquetConversions(sparkSession) :: + new OrcConversions(sparkSession) :: new CreateTables(sparkSession) :: PreprocessDDL(conf) :: PreprocessTableInsertion(conf) :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index e4a266790e464..281665e51907d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,16 +17,25 @@ package org.apache.spark.sql.hive +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.Path + +import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.execution._ +import org.apache.spark.sql.hive.orc.OrcFileFormat +import org.apache.spark.sql.types.StructType private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. @@ -113,3 +122,288 @@ class CreateTables(sparkSession: SparkSession) extends Rule[LogicalPlan] { mode == SaveMode.Ignore) } } + + +/** + * When scanning Metastore ORC tables, convert them to ORC data source relations + * for better performance. + */ +class OrcConversions(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { + relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") && + sparkSession.sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) + } + + private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { + val defaultSource = new OrcFileFormat() + val fileFormatClass = classOf[OrcFileFormat] + val options = Map[String, String]() + + ConvertMetastoreTablesUtils.convertToLogicalRelation( + sparkSession, relation, options, defaultSource, fileFormatClass, "orc") + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.resolved || plan.analyzed) { + return plan + } + + plan transformUp { + // Write path + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + + // Read path + case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => + val orcRelation = convertToOrcRelation(relation) + SubqueryAlias(relation.tableName, orcRelation) + } + } +} + +/** + * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet + * data source relations for better performance. + */ +class ParquetConversions(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = { + relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") && + sparkSession.sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) + } + + private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { + val defaultSource = new ParquetFileFormat() + val fileFormatClass = classOf[ParquetFileFormat] + + val mergeSchema = sparkSession.sessionState.conf.getConf( + HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) + val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString) + + ConvertMetastoreTablesUtils.convertToLogicalRelation( + sparkSession, relation, options, defaultSource, fileFormatClass, "parquet") + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.resolved || plan.analyzed) { + return plan + } + + plan transformUp { + // Write path + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Parquet data source (yet). + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) + + // Read path + case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => + val parquetRelation = convertToParquetRelation(relation) + SubqueryAlias(relation.tableName, parquetRelation) + } + } +} + +object ConvertMetastoreTablesUtils extends Logging { + + def convertToLogicalRelation( + sparkSession: SparkSession, + metastoreRelation: MetastoreRelation, + options: Map[String, String], + defaultSource: FileFormat, + fileFormatClass: Class[_ <: FileFormat], + fileType: String): LogicalRelation = { + val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + val tableIdentifier = + TableIdentifier(metastoreRelation.tableName, Some(metastoreRelation.databaseName)) + val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. + + val result = if (metastoreRelation.hiveQlTable.isPartitioned) { + val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) + val partitionColumnDataTypes = partitionSchema.map(_.dataType) + // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore + // are empty. + val partitions = metastoreRelation.getHiveQlPartitions().map { p => + val location = p.getLocation + val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { + case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) + }) + PartitionDirectory(values, location) + } + val partitionSpec = PartitionSpec(partitionSchema, partitions) + val partitionPaths = partitions.map(_.path.toString) + + // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a + // partitioned table's paths depends on whether that table has any actual partitions. + // Partitioned tables without partitions use the location of the table's base path. + // Partitioned tables with partitions use the locations of those partitions' data locations, + // _omitting_ the table's base path. + val paths = if (partitionPaths.isEmpty) { + Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + } else { + partitionPaths + } + + val cached = getCached( + sparkSession, + tableIdentifier, + paths, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + Some(partitionSpec)) + + val hadoopFsRelation = cached.getOrElse { + val fileCatalog = new MetaStorePartitionedTableFileCatalog( + sparkSession, + new Path(metastoreRelation.catalogTable.storage.locationUri.get), + partitionSpec) + + val inferredSchema = if (fileType.equals("parquet")) { + val inferredSchema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) + inferredSchema.map { inferred => + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) + }.getOrElse(metastoreSchema) + } else { + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get + } + + val relation = HadoopFsRelation( + sparkSession = sparkSession, + location = fileCatalog, + partitionSchema = partitionSchema, + dataSchema = inferredSchema, + bucketSpec = bucketSpec, + fileFormat = defaultSource, + options = options) + + val created = LogicalRelation( + relation, + metastoreTableIdentifier = Some(tableIdentifier)) + sparkSession.sessionState.catalog.cacheDataSourceTable(tableIdentifier, created) + created + } + + hadoopFsRelation + } else { + val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + + val cached = getCached( + sparkSession, + tableIdentifier, + paths, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + None) + val logicalRelation = cached.getOrElse { + val created = + LogicalRelation( + DataSource( + sparkSession = sparkSession, + paths = paths, + userSpecifiedSchema = Some(metastoreRelation.schema), + bucketSpec = bucketSpec, + options = options, + className = fileType).resolveRelation(), + metastoreTableIdentifier = Some(tableIdentifier)) + sparkSession.sessionState.catalog.cacheDataSourceTable(tableIdentifier, created) + created + } + + logicalRelation + } + result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) + } + + private def getCached( + sparkSession: SparkSession, + tableIdentifier: TableIdentifier, + pathsInMetastore: Seq[String], + metastoreRelation: MetastoreRelation, + schemaInMetastore: StructType, + expectedFileFormat: Class[_ <: FileFormat], + expectedBucketSpec: Option[BucketSpec], + partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { + + sparkSession.sessionState.catalog.getCachedDataSourceTableIfPresent(tableIdentifier) match { + case null => None // Cache miss + case Some(logical @ LogicalRelation(relation: HadoopFsRelation, _, _)) => + val cachedRelationFileFormatClass = relation.fileFormat.getClass + + expectedFileFormat match { + case `cachedRelationFileFormatClass` => + // If we have the same paths, same schema, and same partition spec, + // we will use the cached relation. + val useCached = + relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && + logical.schema.sameType(schemaInMetastore) && + relation.bucketSpec == expectedBucketSpec && + relation.partitionSpec == partitionSpecInMetastore.getOrElse { + PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory]) + } + + if (useCached) { + Some(logical) + } else { + // If the cached relation is not updated, we invalidate it right away. + sparkSession.sessionState.catalog.refreshTable(tableIdentifier) + None + } + case _ => + logWarning( + s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " + + s"should be stored as $expectedFileFormat. However, we are getting " + + s"a ${relation.fileFormat} from the metastore cache. This cached " + + s"entry will be invalidated.") + sparkSession.sessionState.catalog.refreshTable(tableIdentifier) + None + } + case other => + logWarning( + s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + + s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " + + s"This cached entry will be invalidated.") + sparkSession.sessionState.catalog.refreshTable(tableIdentifier) + None + } + } + + /** + * An override of the standard HDFS listing based catalog, that overrides the partition spec with + * the information from the metastore. + * + * @param tableBasePath The default base path of the Hive metastore table + * @param partitionSpec The partition specifications from Hive metastore + */ + private[hive] class MetaStorePartitionedTableFileCatalog( + sparkSession: SparkSession, + tableBasePath: Path, + override val partitionSpec: PartitionSpec) + extends ListingFileCatalog( + sparkSession, + MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, partitionSpec), + Map.empty, + Some(partitionSpec.partitionColumns)) { + } + + private[hive] object MetaStorePartitionedTableFileCatalog { + /** Get the list of paths to list files in the for a metastore table */ + def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = { + // If there are no partitions currently specified then use base path, + // otherwise use the paths corresponding to the partitions. + if (partitionSpec.partitions.isEmpty) { + Seq(tableBasePath) + } else { + partitionSpec.partitions.map(_.path) + } + } + } + +} + + From 164be254cab43f40c697729eb87109b7345f73e2 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 23:11:51 -0700 Subject: [PATCH 05/14] remove getCachedDataSourceTable --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/HiveSessionCatalog.scala | 6 ------ .../apache/spark/sql/hive/parquetSuites.scala | 20 +++++++++---------- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index fba2d541101b8..1cf88b724fcb4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -43,7 +43,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase - def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { + private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { QualifiedTableName( tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase, tableIdent.table.toLowerCase) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 7b45e8ebe24c3..fdbc4106dc2a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -131,12 +131,6 @@ private[sql] class HiveSessionCatalog( new Path(new Path(getDatabaseMetadata(dbName).locationUri), name.table).toString } - // For testing only - private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { - val key = metastoreCatalog.getQualifiedTableName(table) - metastoreCatalog.cachedDataSourceTables.getIfPresent(key) - } - override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = { makeFunctionBuilder(funcName, Utils.classForName(className)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 31b6197d56fc7..b5438c50d7610 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -453,9 +453,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test("Caching converted data source Parquet Relations") { def checkCached(tableIdentifier: TableIdentifier): Unit = { // Converted test_parquet should be cached. - sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match { - case null => fail("Converted test_parquet should be cached in the cache.") - case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK + sessionState.catalog.getCachedDataSourceTableIfPresent(tableIdentifier) match { + case None => fail("Converted test_parquet should be cached in the cache.") + case Some(LogicalRelation(parquetRelation: HadoopFsRelation, _, _)) => // OK case other => fail( "The cached test_parquet should be a Parquet Relation. " + @@ -481,14 +481,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default")) // First, make sure the converted test_parquet is not cached. - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTableIfPresent(tableIdentifier).isEmpty) // Table lookup will make the table cached. table("test_insert_parquet") checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. sessionState.refreshTable("test_insert_parquet") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTableIfPresent(tableIdentifier).isEmpty) sql( """ |INSERT INTO TABLE test_insert_parquet @@ -501,7 +501,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select a, b from jt").collect()) // Invalidate the cache. sessionState.refreshTable("test_insert_parquet") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTableIfPresent(tableIdentifier).isEmpty) // Create a partitioned table. sql( @@ -519,7 +519,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default")) - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTableIfPresent(tableIdentifier).isEmpty) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -528,14 +528,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTableIfPresent(tableIdentifier).isEmpty) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (`date`='2015-04-02') |select a, b from jt """.stripMargin) - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTableIfPresent(tableIdentifier).isEmpty) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") @@ -551,7 +551,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin).collect()) sessionState.refreshTable("test_parquet_partitioned_cache_test") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTableIfPresent(tableIdentifier).isEmpty) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") } From 32f7caab469343cf0d2eb9189f04b91e0b38f8c9 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 23:19:17 -0700 Subject: [PATCH 06/14] remove Hive dependency --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 1cf88b724fcb4..48f60fd0b93b3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,21 +31,14 @@ import org.apache.spark.sql.types._ /** * Legacy catalog for interacting with the Hive metastore. - * - * This is still used for things like creating data source tables, but in the future will be - * cleaned up to integrate more nicely with [[HiveExternalCatalog]]. */ -private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { - private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] - +private[hive] class HiveMetastoreCatalog(spark: SparkSession) extends Logging { /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) - private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase - private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { QualifiedTableName( - tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase, + tableIdent.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase).toLowerCase, tableIdent.table.toLowerCase) } @@ -54,7 +47,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { override def load(in: QualifiedTableName): LogicalPlan = { logDebug(s"Creating new cached data source for $in") - val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name) + val table = spark.sharedState.externalCatalog.getTable(in.database, in.name) // TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable @@ -106,7 +99,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val options = table.storage.properties val dataSource = DataSource( - sparkSession, + spark, userSpecifiedSchema = userSpecifiedSchema, partitionColumns = partitionColumns, bucketSpec = bucketSpec, From be770f45ab7bfe8c435df08a4080f378ca4ff9de Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 23:31:54 -0700 Subject: [PATCH 07/14] rename HiveMetaStoreCatalog to MetadataCache --- .../spark/sql/hive/HiveSessionCatalog.scala | 13 ++++----- ...storeCatalog.scala => MetadataCache.scala} | 28 +++++++++++++++++-- 2 files changed, 31 insertions(+), 10 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/sql/hive/{HiveMetastoreCatalog.scala => MetadataCache.scala} (83%) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index fdbc4106dc2a8..6b0dcc13ead92 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResourceLoader, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.hive.client.HiveClient @@ -69,7 +68,7 @@ private[sql] class HiveSessionCatalog( val newName = name.copy(database = Option(database), table = table) val metadata = getTableMetadata(newName) if (DDLUtils.isDatasourceTable(metadata.properties)) { - val dataSourceTable = metastoreCatalog.getTable(newName) + val dataSourceTable = metadataCache.getTable(newName) val qualifiedTable = SubqueryAlias(table, dataSourceTable) // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. @@ -106,23 +105,23 @@ private[sql] class HiveSessionCatalog( // essentially a cache for metastore tables. However, it relies on a lot of session-specific // things so it would be a lot of work to split its functionality between HiveSessionCatalog // and HiveCatalog. We should still do it at some point... - private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession) + private val metadataCache = new MetadataCache(sparkSession) override def refreshTable(name: TableIdentifier): Unit = { super.refreshTable(name) - metastoreCatalog.refreshTable(name) + metadataCache.refreshTable(name) } override def invalidateCache(): Unit = { - metastoreCatalog.invalidateAll() + metadataCache.invalidateAll() } override def cacheDataSourceTable(name: TableIdentifier, plan: LogicalPlan): Unit = { - metastoreCatalog.cacheTable(name, plan) + metadataCache.cacheTable(name, plan) } override def getCachedDataSourceTableIfPresent(name: TableIdentifier): Option[LogicalPlan] = { - metastoreCatalog.getTableIfPresent(name) + metadataCache.getTableIfPresent(name) } def hiveDefaultTableFilePath(name: TableIdentifier): String = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala similarity index 83% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala index 48f60fd0b93b3..ad8e01aaa2f73 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala @@ -30,9 +30,13 @@ import org.apache.spark.sql.types._ /** - * Legacy catalog for interacting with the Hive metastore. + * Metadata cache is a key-value cache built on Google Guava Cache to speed up building logical plan + * nodes (LogicalRelation) for data source tables. The cache key is a unique identifier of a table. + * Here, the identifier is the fully qualified table name, including the database in which it + * resides. The value is the corresponding LogicalRelation that represents a specific data source + * table. */ -private[hive] class HiveMetastoreCatalog(spark: SparkSession) extends Logging { +private[hive] class MetadataCache(spark: SparkSession) extends Logging { /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) @@ -116,13 +120,19 @@ private[hive] class HiveMetastoreCatalog(spark: SparkSession) extends Logging { } /** - * Data Source Table is inserted directly, using Cache.put. + * cacheTable is a wrapper of cache.put(key, value). It associates value with key in this cache. + * If the cache previously contained a value associated with key, the old value is replaced by + * value. * Note, this is not using automatic cache loading. */ def cacheTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { cachedDataSourceTables.put(getQualifiedTableName(tableIdent), plan) } + /** + * getTableIfPresent is a wrapper of cache.getIfPresent(key) that never causes values to be + * automatically loaded. + */ def getTableIfPresent(tableIdent: TableIdentifier): Option[LogicalPlan] = { cachedDataSourceTables.getIfPresent(getQualifiedTableName(tableIdent)) match { case null => None // Cache miss @@ -130,10 +140,19 @@ private[hive] class HiveMetastoreCatalog(spark: SparkSession) extends Logging { } } + /** + * getTable is a wrapper of cache.get(key). If cache misses, Caches loaded by a CacheLoader + * will call CacheLoader.load(K) to load new values into the cache. That means, it will call + * the function load. + */ def getTable(tableIdent: TableIdentifier): LogicalPlan = { cachedDataSourceTables.get(getQualifiedTableName(tableIdent)) } + /** + * refreshTable is a wrapper of cache.invalidate. It does not eagerly reload the cache. It just + * invalidates the cache. Next time when we use the table, it will be populated in the cache. + */ def refreshTable(tableIdent: TableIdentifier): Unit = { // refreshTable does not eagerly reload the cache. It just invalidate the cache. // Next time when we use the table, it will be populated in the cache. @@ -146,6 +165,9 @@ private[hive] class HiveMetastoreCatalog(spark: SparkSession) extends Logging { cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) } + /** + * Discards all entries in the cache. It is a wrapper of cache.invalidateAll. + */ def invalidateAll(): Unit = { cachedDataSourceTables.invalidateAll() } From 6b96ee0a266d64af2512aac5bb151d5b594827e8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 23:39:03 -0700 Subject: [PATCH 08/14] make cachedDataSourceTables private --- .../main/scala/org/apache/spark/sql/hive/MetadataCache.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala index ad8e01aaa2f73..25d2cef0145bc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala @@ -47,7 +47,7 @@ private[hive] class MetadataCache(spark: SparkSession) extends Logging { } /** A cache of Spark SQL data source tables that have been accessed. */ - protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { + private val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { override def load(in: QualifiedTableName): LogicalPlan = { logDebug(s"Creating new cached data source for $in") From 847a526037ae9082f18a4a93b56e6d7dcef25ff2 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 23:51:37 -0700 Subject: [PATCH 09/14] remove useless empty lines --- .../main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 281665e51907d..c8678c4c9ddf1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -403,7 +403,4 @@ object ConvertMetastoreTablesUtils extends Logging { } } } - } - - From 9fe620567aa7d61038ef497bf2358e6fff374d38 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Aug 2016 23:56:07 -0700 Subject: [PATCH 10/14] remove convertMetastoreParquet, convertMetastoreParquetWithSchemaMerging, convertMetastoreOrc from HiveSessionState --- .../spark/sql/hive/HiveSessionState.scala | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index a2a18c38a3d72..7fbf3cebe7b7f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -110,34 +110,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) super.addJar(path) } - /** - * When true, enables an experimental feature where metastore tables that use the parquet SerDe - * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive - * SerDe. - */ - def convertMetastoreParquet: Boolean = { - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) - } - - /** - * When true, also tries to merge possibly different but compatible Parquet schemas in different - * Parquet data files. - * - * This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true. - */ - def convertMetastoreParquetWithSchemaMerging: Boolean = { - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) - } - - /** - * When true, enables an experimental feature where metastore tables that use the Orc SerDe - * are automatically converted to use the Spark SQL ORC table scan, instead of the Hive - * SerDe. - */ - def convertMetastoreOrc: Boolean = { - conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) - } - /** * When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool." */ From 3a52e45cfa6f23141a68348e26c4aa72f4d25854 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 12 Aug 2016 00:39:12 -0700 Subject: [PATCH 11/14] remove convertMetastoreOrc from TestHive --- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 3 ++- .../main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 13d18fdec0e9d..99e665f741522 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -39,7 +39,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalLocale = Locale.getDefault private val originalColumnBatchSize = TestHive.conf.columnBatchSize private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning - private val originalConvertMetastoreOrc = TestHive.sessionState.convertMetastoreOrc + private val originalConvertMetastoreOrc = + TestHive.sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled def testCases: Seq[(String, File)] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index c8678c4c9ddf1..6869ee5123b2b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -380,7 +380,7 @@ object ConvertMetastoreTablesUtils extends Logging { * @param tableBasePath The default base path of the Hive metastore table * @param partitionSpec The partition specifications from Hive metastore */ - private[hive] class MetaStorePartitionedTableFileCatalog( + private class MetaStorePartitionedTableFileCatalog( sparkSession: SparkSession, tableBasePath: Path, override val partitionSpec: PartitionSpec) @@ -391,7 +391,7 @@ object ConvertMetastoreTablesUtils extends Logging { Some(partitionSpec.partitionColumns)) { } - private[hive] object MetaStorePartitionedTableFileCatalog { + private object MetaStorePartitionedTableFileCatalog { /** Get the list of paths to list files in the for a metastore table */ def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = { // If there are no partitions currently specified then use base path, From 1bfd9e114043820662a120d80c38db2b1a60ece0 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 12 Aug 2016 08:28:00 -0700 Subject: [PATCH 12/14] fix hiveDefaultTableFilePath --- .../scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 6b0dcc13ead92..e28e9f27d8253 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -126,8 +126,9 @@ private[sql] class HiveSessionCatalog( def hiveDefaultTableFilePath(name: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) - val dbName = name.database.getOrElse(getCurrentDatabase) - new Path(new Path(getDatabaseMetadata(dbName).locationUri), name.table).toString + val dbName = name.database.getOrElse(getCurrentDatabase).toLowerCase + val tableName = name.table.toLowerCase + new Path(new Path(getDatabaseMetadata(dbName).locationUri), tableName).toString } override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = { From af9b0a29972b0e89e6499f391eef776b21f294c6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 16 Aug 2016 15:16:18 -0700 Subject: [PATCH 13/14] rename MetadataCache to LogicalRelationCache and move it to sql/core. --- .../apache/spark/sql/LogicalRelationCache.scala} | 15 +++++++-------- .../spark/sql/hive/HiveSessionCatalog.scala | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) rename sql/{hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala => core/src/main/scala/org/apache/spark/sql/LogicalRelationCache.scala} (92%) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/LogicalRelationCache.scala similarity index 92% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala rename to sql/core/src/main/scala/org/apache/spark/sql/LogicalRelationCache.scala index 25d2cef0145bc..c2eaec25ed8b4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetadataCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/LogicalRelationCache.scala @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.spark.sql.hive +package org.apache.spark.sql import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -30,13 +29,13 @@ import org.apache.spark.sql.types._ /** - * Metadata cache is a key-value cache built on Google Guava Cache to speed up building logical plan - * nodes (LogicalRelation) for data source tables. The cache key is a unique identifier of a table. - * Here, the identifier is the fully qualified table name, including the database in which it - * resides. The value is the corresponding LogicalRelation that represents a specific data source - * table. + * LogicalRelationCache is a key-value cache built on Google Guava Cache to speed up building + * logical plan nodes (LogicalRelation) for data source tables. The cache key is a unique + * identifier of a table. Here, the identifier is the fully qualified table name, including the + * database in which it resides. The value is the corresponding LogicalRelation that represents + * a specific data source table. */ -private[hive] class MetadataCache(spark: SparkSession) extends Logging { +class LogicalRelationCache(spark: SparkSession) extends Logging { /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index e28e9f27d8253..0cebc5d8efbe2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.{AnalysisException, LogicalRelationCache, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder @@ -105,7 +105,7 @@ private[sql] class HiveSessionCatalog( // essentially a cache for metastore tables. However, it relies on a lot of session-specific // things so it would be a lot of work to split its functionality between HiveSessionCatalog // and HiveCatalog. We should still do it at some point... - private val metadataCache = new MetadataCache(sparkSession) + private val metadataCache = new LogicalRelationCache(sparkSession) override def refreshTable(name: TableIdentifier): Unit = { super.refreshTable(name) From f40b1f5f1e3510e6a2fa0be4ace39e7880b11b26 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 16 Aug 2016 15:41:47 -0700 Subject: [PATCH 14/14] move LogicalRelationCache to org.apache.spark.sql.execution.datasources. --- .../{ => execution/datasources}/LogicalRelationCache.scala | 5 +++-- .../scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/{ => execution/datasources}/LogicalRelationCache.scala (98%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/LogicalRelationCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelationCache.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/LogicalRelationCache.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelationCache.scala index c2eaec25ed8b4..19c088f6a8677 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/LogicalRelationCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelationCache.scala @@ -15,16 +15,17 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.execution.datasources import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.datasources.{Partition => _, _} +import org.apache.spark.sql.execution.datasources.{Partition => _} import org.apache.spark.sql.types._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 0cebc5d8efbe2..842ae56f63611 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} -import org.apache.spark.sql.{AnalysisException, LogicalRelationCache, SparkSession} +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.LogicalRelationCache import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.SQLConf