Skip to content

Commit

Permalink
make table relation cache general and does not depend on hive
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 18, 2017
1 parent 4494cd9 commit 096fcc8
Show file tree
Hide file tree
Showing 20 changed files with 144 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable

import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
Expand Down Expand Up @@ -117,6 +117,14 @@ class SessionCatalog(
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
}

/**
* A cache of qualified table name to table relation plan.
*/
val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
// TODO: create a config instead of hardcode 1000 here.
CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, LogicalPlan]()
}

/**
* This method is used to make the given path qualified before we
* store this path in the underlying external catalog. So, when a path
Expand Down Expand Up @@ -573,7 +581,7 @@ class SessionCatalog(
val relationAlias = alias.getOrElse(table)
if (db == globalTempViewManager.database) {
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))
SubqueryAlias(relationAlias, viewDef, None)
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
Expand All @@ -586,12 +594,12 @@ class SessionCatalog(
desc = metadata,
output = metadata.schema.toAttributes,
child = parser.parsePlan(viewText))
SubqueryAlias(relationAlias, child, Option(name))
SubqueryAlias(relationAlias, child, Some(name.copy(table = table, database = Some(db))))
} else {
SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), None)
}
} else {
SubqueryAlias(relationAlias, tempTables(table), Option(name))
SubqueryAlias(relationAlias, tempTables(table), None)
}
}
}
Expand Down Expand Up @@ -651,14 +659,21 @@ class SessionCatalog(
* Refresh the cache entry for a metastore table, if any.
*/
def refreshTable(name: TableIdentifier): Unit = synchronized {
val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
val tableName = formatTableName(name.table)

// Go through temporary tables and invalidate them.
// If the database is defined, this is definitely not a temp table.
// If the database is defined, this may be a global temporary view.
// If the database is not defined, there is a good chance this is a temp table.
if (name.database.isEmpty) {
tempTables.get(formatTableName(name.table)).foreach(_.refresh())
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
tempTables.get(tableName).foreach(_.refresh())
} else if (dbName == globalTempViewManager.database) {
globalTempViewManager.get(tableName).foreach(_.refresh())
}

// Also invalidate the table relation cache.
val qualifiedTableName = QualifiedTableName(dbName, tableName)
tableRelationCache.invalidate(qualifiedTableName)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ case class TableIdentifier(table: String, database: Option[String])
override val identifier: String = table

def this(table: String) = this(table, None)

}

/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)

object TableIdentifier {
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ class SessionCatalogSuite extends PlanTest {
== SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
// Otherwise, we'll first look up a temporary table with the same name
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
== SubqueryAlias("tbl1", tempTable1, None))
// Then, if that does not exist, look up the relation in the current database
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
Expand All @@ -462,7 +462,7 @@ class SessionCatalogSuite extends PlanTest {
val tmpView = Range(1, 10, 2, 10)
catalog.createTempView("vw1", tmpView, overrideIfExists = false)
val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
assert(plan == SubqueryAlias("range", tmpView, None))
}

test("look up view relation") {
Expand All @@ -479,7 +479,7 @@ class SessionCatalogSuite extends PlanTest {
// Look up a view using current database of the session catalog.
sessionCatalog.setCurrentDatabase("db3")
comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")),
SubqueryAlias("view1", view, Some(TableIdentifier("view1"))))
SubqueryAlias("view1", view, Some(TableIdentifier("view1", Some("db3")))))
}

test("table exists") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
relation.catalogTable.identifier
}
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {

val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
EliminateSubqueryAliases(tableRelation) match {
// check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ case class AnalyzeColumnCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
val relation =
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)

// Compute total size
val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ case class AnalyzeTableCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
val relation =
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)

relation match {
case relation: CatalogRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ case class DescribeTableCommand(
if (metadata.schema.isEmpty) {
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
// inferred at runtime. We should still support it.
describeSchema(catalog.lookupRelation(metadata.identifier).schema, result)
describeSchema(sparkSession.table(metadata.identifier).schema, result)
} else {
describeSchema(metadata.schema, result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable.ArrayBuffer
import java.util.concurrent.Callable

import org.apache.hadoop.fs.Path
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand All @@ -37,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPa
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -215,37 +215,43 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {


/**
* Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data
* source information.
* Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def readDataSourceTable(
sparkSession: SparkSession,
simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
val table = simpleCatalogRelation.catalogTable
val pathOption = table.storage.locationUri.map("path" -> _)
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = Some(table.schema),
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
options = table.storage.properties ++ pathOption)

LogicalRelation(
dataSource.resolveRelation(),
expectedOutputAttributes = Some(simpleCatalogRelation.output),
catalogTable = Some(table))
private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val cache = sparkSession.sessionState.catalog.tableRelationCache
val withHiveSupport =
sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"

cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
val pathOption = table.storage.locationUri.map("path" -> _)
val dataSource =
DataSource(
sparkSession,
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
// inferred at runtime. We should still support it.
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
options = table.storage.properties ++ pathOption,
// TODO: improve `InMemoryCatalog` and remove this limitation.
catalogTable = if (withHiveSupport) Some(table) else None)

LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
}
})
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
if DDLUtils.isDatasourceTable(s.metadata) =>
i.copy(table = readDataSourceTable(sparkSession, s))
i.copy(table = readDataSourceTable(s.metadata))

case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
readDataSourceTable(sparkSession, s)
readDataSourceTable(s.metadata)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {

// If this table is cached as an InMemoryRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
val logicalPlan = sparkSession.table(tableIdent).queryExecution.analyzed
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
Expand Down
11 changes: 0 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1626,17 +1626,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
assert(d.size == d.distinct.size)
}

test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") {
val tableName = "tbl"
withTable(tableName) {
spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
val expr = relation.resolve("i")
val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
qe.assertAnalyzed()
}
}

private def verifyNullabilityInFilterExec(
df: DataFrame,
expr: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}

test("SET LOCATION for managed table") {
withTable("src") {
withTable("tbl") {
withTempDir { dir =>
sql("CREATE TABLE tbl(i INT) USING parquet")
sql("INSERT INTO tbl SELECT 1")
Expand All @@ -1799,6 +1799,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
.getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get

sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'")
spark.catalog.refreshTable("tbl")
// SET LOCATION won't move data from previous table path to new table path.
assert(spark.table("tbl").count() == 0)
// the previous table path should be still there.
Expand Down

0 comments on commit 096fcc8

Please sign in to comment.