From 3e23e3d24e880084f3e4235ab098ac7aa271507f Mon Sep 17 00:00:00 2001 From: "Lantao, Jin(lajin)" Date: Wed, 15 Jul 2020 17:25:21 +0800 Subject: [PATCH] [CARMEL-3156][SPARK-32064][SQL] Supporting create temporary table (#3) --- .../apache/spark/deploy/SparkHadoopUtil.scala | 8 + .../sql/connector/catalog/TableCatalog.java | 6 + .../analysis/AlreadyExistException.scala | 10 +- .../catalog/ExternalCatalogUtils.scala | 8 + .../catalog/GlobalTempViewManager.scala | 4 +- .../sql/catalyst/catalog/SessionCatalog.scala | 374 +++++++++++++----- .../sql/catalyst/catalog/interface.scala | 3 +- .../sql/catalyst/parser/AstBuilder.scala | 38 +- .../apache/spark/sql/internal/SQLConf.scala | 8 + .../spark/sql/internal/StaticSQLConf.scala | 19 + .../catalog/SessionCatalogSuite.scala | 4 +- .../org/apache/spark/sql/SQLContext.scala | 8 + .../apache/spark/sql/catalog/Catalog.scala | 14 + .../analysis/ResolveSessionCatalog.scala | 20 +- .../spark/sql/execution/SparkSqlParser.scala | 42 +- .../command/createDataSourceTables.scala | 33 +- .../spark/sql/execution/command/ddl.scala | 8 +- .../spark/sql/execution/command/tables.scala | 12 +- .../spark/sql/execution/command/views.scala | 2 +- .../datasources/v2/V2SessionCatalog.scala | 17 +- .../spark/sql/internal/CatalogImpl.scala | 27 +- .../execution/command/DDLParserSuite.scala | 2 +- .../sql/execution/command/DDLSuite.scala | 16 +- .../command/TemporaryTableSuite.scala | 248 ++++++++++++ .../sources/CreateTableAsSelectSuite.scala | 10 +- .../apache/spark/sql/test/SQLTestUtils.scala | 20 +- .../SparkGetColumnsOperation.scala | 2 +- .../SparkGetTablesOperation.scala | 4 +- .../hive/thriftserver/SparkOperation.scala | 3 +- .../sql/hive/thriftserver/SparkSQLEnv.scala | 3 + .../thriftserver/SparkSQLSessionManager.scala | 1 + .../HiveThriftServer2Suites.scala | 13 + .../SparkMetadataOperationSuite.scala | 2 +- 33 files changed, 793 insertions(+), 196 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/TemporaryTableSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 1180501e8c738..0d81738e3d357 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -390,6 +390,14 @@ private[spark] class SparkHadoopUtil extends Logging { ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY } + def deletePath(path: Path): Boolean = { + val fs = path.getFileSystem(conf) + if (fs.exists(path)) { + fs.delete(path, true) + } else { + true + } + } } private[spark] object SparkHadoopUtil { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 1809b9cdb52e5..699d5009567e6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -61,6 +61,12 @@ public interface TableCatalog extends CatalogPlugin { */ String PROP_OWNER = "owner"; + /** + * A reserved property to specify the type of the table. + * @since 3.1.0 To determinate a temporary/volatile table. + */ + String PROP_TYPE = "specific_type"; + /** * List the tables in a namespace from the catalog. *

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index 7e5d56a7d1196..9f57cd04e7203 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.Identifier @@ -45,9 +46,16 @@ class TableAlreadyExistsException(message: String) extends AnalysisException(mes } } -class TempTableAlreadyExistsException(table: String) +class TempViewAlreadyExistsException(table: String) extends TableAlreadyExistsException(s"Temporary view '$table' already exists") +class TempTableAlreadyExistsException(table: TableIdentifier) + extends TableAlreadyExistsException(s"Temporary table '${table.unquotedString}' already exists") + +class TempTablePartitionUnsupportedException(table: TableIdentifier) + extends AnalysisException( + s"Partition is unsupported in temporary table '$table'") + class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec) extends AnalysisException( s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n")) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index ae3b75dc3334b..01bfd5cc0eaef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, Predicate} +import org.apache.spark.sql.connector.catalog.TableCatalog object ExternalCatalogUtils { // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't @@ -231,4 +232,11 @@ object CatalogUtils { s"defined table columns are: ${tableCols.mkString(", ")}") } } + + val TEMPORARY_TABLE = "temporary" + + def isTemporaryTable(table: CatalogTable): Boolean = { + table.tableType == CatalogTableType.TEMPORARY || + table.properties.get(TableCatalog.PROP_TYPE).contains(TEMPORARY_TABLE) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala index 6095ac0bc9c50..5ba7b2286a61d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala @@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException +import org.apache.spark.sql.catalyst.analysis.TempViewAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.StringUtils @@ -58,7 +58,7 @@ class GlobalTempViewManager(val database: String) { viewDefinition: LogicalPlan, overrideIfExists: Boolean): Unit = synchronized { if (!overrideIfExists && viewDefinitions.contains(name)) { - throw new TempTableAlreadyExistsException(name) + throw new TempViewAlreadyExistsException(name) } viewDefinitions.put(name, viewDefinition) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b79857cdccd22..f4a51f4de3771 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI -import java.util.Locale +import java.util.{Locale, UUID} import java.util.concurrent.Callable import javax.annotation.concurrent.GuardedBy @@ -26,9 +26,12 @@ import scala.collection.mutable import scala.util.{Failure, Success, Try} import com.google.common.cache.{Cache, CacheBuilder} +import org.apache.commons.lang3.{StringUtils => LangStringUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.security.UserGroupInformation +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst._ @@ -96,6 +99,13 @@ class SessionCatalog( @GuardedBy("this") protected val tempViews = new mutable.HashMap[String, LogicalPlan] + /** List of temporary table, mapping from table name to their logical plan. */ + @GuardedBy("this") + protected val tempTables = new mutable.HashMap[TableIdentifier, CatalogTable] + + @GuardedBy("this") + private val tempTableSessionId = s"temp-${UUID.randomUUID().toString}" + // Note: we track current database here because certain operations do not explicitly // specify the database (e.g. DROP TABLE my_table). In these cases we must first // check whether the temporary view or function exists, then, if not, operate on @@ -340,36 +350,6 @@ class SessionCatalog( } } - /** - * Alter the metadata of an existing metastore table identified by `tableDefinition`. - * - * If no database is specified in `tableDefinition`, assume the table is in the - * current database. - * - * Note: If the underlying implementation does not support altering a certain field, - * this becomes a no-op. - */ - def alterTable(tableDefinition: CatalogTable): Unit = { - val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) - val table = formatTableName(tableDefinition.identifier.table) - val tableIdentifier = TableIdentifier(table, Some(db)) - requireDbExists(db) - requireTableExists(tableIdentifier) - val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined - && !tableDefinition.storage.locationUri.get.isAbsolute) { - // make the location of the table qualified. - val qualifiedTableLocation = - makeQualifiedPath(tableDefinition.storage.locationUri.get) - tableDefinition.copy( - storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)), - identifier = tableIdentifier) - } else { - tableDefinition.copy(identifier = tableIdentifier) - } - - externalCatalog.alterTable(newTableDefinition) - } - /** * Alter the data schema of a table identified by the provided table identifier. The new data * schema should not have conflict column names with the existing partition columns, and should @@ -407,45 +387,6 @@ class SessionCatalog( schema.fields.map(_.name).exists(conf.resolver(_, colName)) } - /** - * Alter Spark's statistics of an existing metastore table identified by the provided table - * identifier. - */ - def alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit = { - val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase)) - val table = formatTableName(identifier.table) - val tableIdentifier = TableIdentifier(table, Some(db)) - requireDbExists(db) - requireTableExists(tableIdentifier) - externalCatalog.alterTableStats(db, table, newStats) - // Invalidate the table relation cache - refreshTable(identifier) - } - - /** - * Return whether a table/view with the specified name exists. If no database is specified, check - * with current database. - */ - def tableExists(name: TableIdentifier): Boolean = synchronized { - val db = formatDatabaseName(name.database.getOrElse(currentDb)) - val table = formatTableName(name.table) - externalCatalog.tableExists(db, table) - } - - /** - * Retrieve the metadata of an existing permanent table/view. If no database is specified, - * assume the table/view is in the current database. - */ - @throws[NoSuchDatabaseException] - @throws[NoSuchTableException] - def getTableMetadata(name: TableIdentifier): CatalogTable = { - val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) - val table = formatTableName(name.table) - requireDbExists(db) - requireTableExists(TableIdentifier(table, Some(db))) - externalCatalog.getTable(db, table) - } - /** * Retrieve all metadata of existing permanent tables/views. If no database is specified, * assume the table/view is in the current database. @@ -531,8 +472,12 @@ class SessionCatalog( tableDefinition: LogicalPlan, overrideIfExists: Boolean): Unit = synchronized { val table = formatTableName(name) + val tableIdentifier = TableIdentifier(table, Some(currentDb)) + if (tempTables.contains(tableIdentifier)) { + throw new TempTableAlreadyExistsException(tableIdentifier) + } if (tempViews.contains(table) && !overrideIfExists) { - throw new TempTableAlreadyExistsException(name) + throw new TempViewAlreadyExistsException(name) } tempViews.put(table, tableDefinition) } @@ -605,6 +550,228 @@ class SessionCatalog( globalTempViewManager.remove(formatTableName(name)) } + // ---------------------------------------------- + // | Methods that interact with temp tables only | + // ---------------------------------------------- + + def getScratchPath: Path = { + new Path(makeQualifiedPath(CatalogUtils.stringToURI(conf.sparkScratchDir))) + } + + def defaultTempTableParentPath: Path = { + val user = Option(UserGroupInformation.getCurrentUser.getUserName).getOrElse("") + val scratchWithUserPath = + if (LangStringUtils.isBlank(user)) getScratchPath else new Path(getScratchPath, user) + new Path(scratchWithUserPath, tempTableSessionId) + } + + def defaultSessionAwareTablePath(parentDir: Path, tableIdent: TableIdentifier): URI = { + new Path(parentDir, formatTableName(tableIdent.unquotedString)).toUri + } + + def defaultTempTablePath(tableIdent: TableIdentifier): URI = { + defaultSessionAwareTablePath(defaultTempTableParentPath, tableIdent) + } + + /** + * Create a temporary table. + */ + def createTempTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = synchronized { + val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(tableDefinition.identifier.table) + val tableIdentifier = TableIdentifier(table, Some(db)) + validateName(table) + + requireDbExists(db) + if (tempViews.contains(table)) { + throw new TempViewAlreadyExistsException(table) + } + if (tempTables.contains(tableIdentifier) && !ignoreIfExists) { + throw new TempTableAlreadyExistsException(tableIdentifier) + } + if (tempTables.size >= conf.maxNumberForTemporaryTablesPerSession) { + val ex = new AnalysisException("The total number of temporary tables in this session " + + s"has reached the number limitation ${conf.maxNumberForTemporaryTablesPerSession}. " + + s"Please use 'DROP TABLE' command to drop old temporary tables to release space.") + throw ex + } + + val tempTableUri = defaultTempTablePath(tableIdentifier) + val tempTablePath = new Path(tempTableUri) + val fs = tempTablePath.getFileSystem(hadoopConf) + fs.mkdirs(tempTablePath) + + val sessionDir = defaultTempTableParentPath + val summary = fs.getContentSummary(sessionDir) + val totalUsagePerSession = summary.getLength + if (totalUsagePerSession > conf.maxSizeForTemporaryTablesPerSession) { + val ex = new AnalysisException("The total size of temporary tables in this session " + + s"has reached the size limitation ${conf.maxSizeForTemporaryTablesPerSession} bytes. " + + s"Current used size is $totalUsagePerSession bytes. Please use 'DROP TABLE' command " + + s"to drop old temporary tables to release space.") + fs.delete(tempTablePath, true) + throw ex + } + + logInfo(s"Temporary table ${tableIdentifier.identifier} stored to ${tempTableUri}") + tempTables.put(tableIdentifier, tableDefinition.copy( + identifier = tableIdentifier, + // replace to temporary table default uri + storage = tableDefinition.storage.copy(locationUri = Option(tempTableUri)))) + } + + /** + * Drop a temporary table in current database. + * + * Returns true if this view is dropped successfully, false otherwise. + */ + def dropTempTable(toDrop: TableIdentifier): Boolean = synchronized { + val db = formatDatabaseName(toDrop.database.getOrElse(currentDb)) + val name = formatTableName(toDrop.table) + val table = TableIdentifier(name, Some(db)) + SparkHadoopUtil.get.deletePath(new Path(defaultTempTableParentPath, table.unquotedString)) + + tempTables.remove(table).isDefined + } + + /** + * Drop all existing temporary views. + * For testing only. + */ + def clearTempTables(): Unit = synchronized { + SparkHadoopUtil.get.deletePath(defaultTempTableParentPath) + tempTables.clear() + } + + // TODO: merge it with `isTemporaryTable`. + def isTempTable(nameParts: Seq[String]): Boolean = { + if (nameParts.length > 2) return false + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + isTemporaryTable(nameParts.asTableIdentifier) + } + + /** + * Return whether a table with the specified name is a temporary table. + */ + def isTemporaryTable(name: TableIdentifier): Boolean = synchronized { + val db = formatDatabaseName(name.database.getOrElse(currentDb)) + val table = formatTableName(name.table) + val qualified = TableIdentifier(table, Option(db)) + tempTables.contains(qualified) + } + + def listTempTables(db: String): Seq[TableIdentifier] = listTempTables(db, "*") + + /** + * List all temporary tables in the specified database. + */ + def listTempTables(db: String, pattern: String): Seq[TableIdentifier] = { + val dbName = formatDatabaseName(db) + synchronized { + StringUtils.filterPattern( + tempTables.keySet.filter(_.database.contains(dbName)).map(_.table).toSeq, pattern) + .map { name => TableIdentifier(name, Some(dbName)) } + } + } + + /** + * Return a temporary table exactly in current database. + * + * For testing only. + */ + private[spark] def getTempTable(tableName: String): Option[CatalogTable] = synchronized { + val nameParts = tableName.split("\\.").toSeq + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val table = nameParts.asTableIdentifier + try { + Option(getTableMetadata(table)) + } catch { + case _: AnalysisException => + None + } + } + + // ------------------------------------------------------------------- + // | Methods that interact with temporary tables and metastore tables | + // ------------------------------------------------------------------- + + /** + * Alter the metadata of an existing metastore table identified by `tableDefinition`. + * + * If no database is specified in `tableDefinition`, assume the table is in the + * current database. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. + */ + def alterTable(tableDefinition: CatalogTable): Unit = { + val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(tableDefinition.identifier.table) + val tableIdentifier = TableIdentifier(table, Some(db)) + requireDbExists(db) + requireTableExists(tableIdentifier) + val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined + && !tableDefinition.storage.locationUri.get.isAbsolute) { + // make the location of the table qualified. + val qualifiedTableLocation = + makeQualifiedPath(tableDefinition.storage.locationUri.get) + tableDefinition.copy( + storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)), + identifier = tableIdentifier) + } else { + tableDefinition.copy(identifier = tableIdentifier) + } + if (CatalogUtils.isTemporaryTable(tableDefinition)) { + tempTables.put(tableIdentifier, newTableDefinition) + } else { + externalCatalog.alterTable(newTableDefinition) + } + } + + /** + * Alter Spark's statistics of an existing metastore table identified by the provided table + * identifier. + */ + def alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit = { + val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(identifier.table) + val tableIdentifier = TableIdentifier(table, Some(db)) + requireDbExists(db) + requireTableExists(tableIdentifier) + tempTables.get(tableIdentifier) match { + case Some(oldTable) => + tempTables(tableIdentifier) = oldTable.copy(stats = newStats) + case None => + externalCatalog.alterTableStats(db, table, newStats) + } + // Invalidate the table relation cache + refreshTable(identifier) + } + + /** + * Return whether a table/view with the specified name exists. If no database is specified, check + * with current database. + */ + def tableExists(name: TableIdentifier): Boolean = synchronized { + val db = formatDatabaseName(name.database.getOrElse(currentDb)) + val table = formatTableName(name.table) + tempTables.contains(TableIdentifier(table, Some(db))) || externalCatalog.tableExists(db, table) + } + + /** + * Retrieve the metadata of an existing permanent table/view. If no database is specified, + * assume the table/view is in the current database. + */ + @throws[NoSuchDatabaseException] + @throws[NoSuchTableException] + def getTableMetadata(name: TableIdentifier): CatalogTable = { + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(name.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Some(db))) + tempTables.getOrElse(TableIdentifier(table, Some(db)), externalCatalog.getTable(db, table)) + } + // ------------------------------------------------------------- // | Methods that interact with temporary and metastore tables | // ------------------------------------------------------------- @@ -665,7 +832,14 @@ class SessionCatalog( globalTempViewManager.rename(oldTableName, newTableName) } else { requireDbExists(db) - if (oldName.database.isDefined || !tempViews.contains(oldTableName)) { + if (tempTables.contains(oldName)) { + requireTableExists(TableIdentifier(oldTableName, Some(db))) + requireTableNotExists(TableIdentifier(newTableName, Some(db))) + validateName(newTableName) + val table = tempTables(oldName) + tempTables.remove(oldName) + tempTables.put(TableIdentifier(newTableName, Some(db)), table) + } else if (oldName.database.isDefined || !tempViews.contains(oldTableName)) { requireTableExists(TableIdentifier(oldTableName, Some(db))) requireTableNotExists(TableIdentifier(newTableName, Some(db))) validateName(newTableName) @@ -712,6 +886,7 @@ class SessionCatalog( // When ignoreIfNotExists is false, no exception is issued when the table does not exist. // Instead, log it as an error message. if (tableExists(TableIdentifier(table, Option(db)))) { + dropTempTable(TableIdentifier(table, Option(db))) externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge) } else if (!ignoreIfNotExists) { throw new NoSuchTableException(db = db, table = table) @@ -747,7 +922,8 @@ class SessionCatalog( SubqueryAlias(table, db, viewDef) }.getOrElse(throw new NoSuchTableException(db, table)) } else if (name.database.isDefined || !tempViews.contains(table)) { - val metadata = externalCatalog.getTable(db, table) + val metadata = tempTables.getOrElse(TableIdentifier(table, Option(db)), + externalCatalog.getTable(db, table)) getRelation(metadata) } else { SubqueryAlias(table, tempViews(table)) @@ -796,11 +972,11 @@ class SessionCatalog( } } - // TODO: merge it with `isTemporaryTable`. + // TODO: merge it with `isTemporaryView`. def isTempView(nameParts: Seq[String]): Boolean = { if (nameParts.length > 2) return false import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - isTemporaryTable(nameParts.asTableIdentifier) + isTemporaryView(nameParts.asTableIdentifier) } /** @@ -809,7 +985,7 @@ class SessionCatalog( * Note: The temporary view cache is checked only when database is not * explicitly specified. */ - def isTemporaryTable(name: TableIdentifier): Boolean = synchronized { + def isTemporaryView(name: TableIdentifier): Boolean = synchronized { val table = formatTableName(name.table) if (name.database.isEmpty) { tempViews.contains(table) @@ -835,7 +1011,8 @@ class SessionCatalog( } /** - * List all tables in the specified database, including local temporary views. + * List all tables in the specified database, including local temporary views + * and temporary tables. * * Note that, if the specified database is global temporary view database, we will list global * temporary views. @@ -843,16 +1020,18 @@ class SessionCatalog( def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") /** - * List all matching tables in the specified database, including local temporary views. + * List all matching tables in the specified database, including local temporary views + * and temporary tables. * * Note that, if the specified database is global temporary view database, we will list global * temporary views. */ - def listTables(db: String, pattern: String): Seq[TableIdentifier] = listTables(db, pattern, true) + def listTables(db: String, pattern: String): Seq[TableIdentifier] = + listTables(db, pattern, true) /** - * List all matching tables in the specified database, including local temporary views - * if includeLocalTempViews is enabled. + * List all matching tables in the specified database, including temporary views + * and temporary tables if includeTempViewsAndTables is enabled. * * Note that, if the specified database is global temporary view database, we will list global * temporary views. @@ -860,7 +1039,7 @@ class SessionCatalog( def listTables( db: String, pattern: String, - includeLocalTempViews: Boolean): Seq[TableIdentifier] = { + includeTempViewsAndTables: Boolean): Seq[TableIdentifier] = { val dbName = formatDatabaseName(db) val dbTables = if (dbName == globalTempViewManager.database) { globalTempViewManager.listViewNames(pattern).map { name => @@ -873,8 +1052,8 @@ class SessionCatalog( } } - if (includeLocalTempViews) { - dbTables ++ listLocalTempViews(pattern) + if (includeTempViewsAndTables) { + dbTables ++ listLocalTempViews(pattern) ++ listTempTables(db, pattern) } else { dbTables } @@ -935,7 +1114,7 @@ class SessionCatalog( * Drop all existing temporary views. * For testing only. */ - def clearTempTables(): Unit = synchronized { + def clearTempViews(): Unit = synchronized { tempViews.clear() } @@ -961,8 +1140,10 @@ class SessionCatalog( ignoreIfExists: Boolean): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + val tableIdentifier = TableIdentifier(table, Option(db)) requireDbExists(db) - requireTableExists(TableIdentifier(table, Option(db))) + requireTableExists(tableIdentifier) + requirePartitionSupported(tableIdentifier) requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) requireNonEmptyValueInPartitionSpec(parts.map(_.spec)) externalCatalog.createPartitions( @@ -981,8 +1162,10 @@ class SessionCatalog( retainData: Boolean): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + val tableIdentifier = TableIdentifier(table, Option(db)) requireDbExists(db) - requireTableExists(TableIdentifier(table, Option(db))) + requireTableExists(tableIdentifier) + requirePartitionSupported(tableIdentifier) requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) requireNonEmptyValueInPartitionSpec(specs) externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData) @@ -1001,8 +1184,10 @@ class SessionCatalog( val tableMetadata = getTableMetadata(tableName) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + val tableIdentifier = TableIdentifier(table, Option(db)) requireDbExists(db) - requireTableExists(TableIdentifier(table, Option(db))) + requireTableExists(tableIdentifier) + requirePartitionSupported(tableIdentifier) requireExactMatchedPartitionSpec(specs, tableMetadata) requireExactMatchedPartitionSpec(newSpecs, tableMetadata) requireNonEmptyValueInPartitionSpec(specs) @@ -1022,8 +1207,10 @@ class SessionCatalog( def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + val tableIdentifier = TableIdentifier(table, Option(db)) requireDbExists(db) - requireTableExists(TableIdentifier(table, Option(db))) + requireTableExists(tableIdentifier) + requirePartitionSupported(tableIdentifier) requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) requireNonEmptyValueInPartitionSpec(parts.map(_.spec)) externalCatalog.alterPartitions(db, table, partitionWithQualifiedPath(tableName, parts)) @@ -1148,6 +1335,12 @@ class SessionCatalog( } } + private def requirePartitionSupported(table: TableIdentifier): Unit = { + if (isTemporaryTable(table)) { + throw new TempTablePartitionUnsupportedException(table) + } + } + /** * Make the partition path qualified. * If the partition path is relative, e.g. 'paris', it will be qualified with @@ -1534,6 +1727,7 @@ class SessionCatalog( } } clearTempTables() + clearTempViews() globalTempViewManager.clear() functionRegistry.clear() tableRelationCache.invalidateAll() @@ -1559,6 +1753,8 @@ class SessionCatalog( target.currentDb = currentDb // copy over temporary views tempViews.foreach(kv => target.tempViews.put(kv._1, kv._2)) + // copy over temporary tables + tempTables.foreach(kv => target.tempTables.put(kv._1, kv._2)) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 4e63ee7428d72..da2636dd6649f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -608,8 +608,9 @@ object CatalogTableType { val EXTERNAL = new CatalogTableType("EXTERNAL") val MANAGED = new CatalogTableType("MANAGED") val VIEW = new CatalogTableType("VIEW") + val TEMPORARY = new CatalogTableType("TEMPORARY") - val tableTypes = Seq(EXTERNAL, MANAGED, VIEW) + val tableTypes = Seq(EXTERNAL, MANAGED, VIEW, TEMPORARY) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f6877d2ed62f3..a70c0b2627ab4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, FunctionResource, FunctionResourceType} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogUtils, FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ @@ -2424,6 +2424,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val temporary = ctx.TEMPORARY != null val ifNotExists = ctx.EXISTS != null if (temporary && ifNotExists) { + // Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE does not support + // IF NOT EXISTS. Users are not allowed to replace the existing temp table. operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx) } val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText) @@ -2693,6 +2695,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException(s"$PROP_OWNER is a reserved table property, it will be" + s" set to the current user", ctx) case (PROP_OWNER, _) => false + case (PROP_TYPE, _) if !legacyOn => + throw new ParseException(s"$PROP_TYPE is a reserved table property, please use" + + s" CREATE [TEMPORARY] TABLE clause to specify it.", ctx) + case (PROP_TYPE, _) => false case _ => true } } @@ -2764,13 +2770,26 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } val schema = Option(ctx.colTypeList()).map(createSchema) val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText) + if (temp && provider.isEmpty) { + throw new ParseException("CREATE TEMPORARY TABLE without a provider is not allowed.", ctx) + } val (partitioning, bucketSpec, properties, options, location, comment) = visitCreateTableClauses(ctx.createTableClauses()) - Option(ctx.query).map(plan) match { - case Some(_) if temp => - operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx) + if (partitioning.nonEmpty && temp) { + operationNotAllowed("PARTITIONED BY in temporary table", ctx) + } + if (location.nonEmpty && temp) { + operationNotAllowed("specify LOCATION in temporary table", ctx) + } + val withTempProps = if (temp) { + Map(TableCatalog.PROP_TYPE -> CatalogUtils.TEMPORARY_TABLE) ++ properties + } else { + properties + } + + Option(ctx.query).map(plan) match { case Some(_) if schema.isDefined => operationNotAllowed( "Schema may not be specified in a Create Table As Select (CTAS) statement", @@ -2778,17 +2797,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case Some(query) => CreateTableAsSelectStatement( - table, query, partitioning, bucketSpec, properties, provider, options, location, comment, - writeOptions = Map.empty, ifNotExists = ifNotExists) - - case None if temp => - // CREATE TEMPORARY TABLE ... USING ... is not supported by the catalyst parser. - // Use CREATE TEMPORARY VIEW ... USING ... instead. - operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) + table, query, partitioning, bucketSpec, withTempProps, provider, options, location, + comment, writeOptions = Map.empty, ifNotExists = ifNotExists) case _ => CreateTableStatement(table, schema.getOrElse(new StructType), partitioning, bucketSpec, - properties, provider, options, location, comment, ifNotExists = ifNotExists) + withTempProps, provider, options, location, comment, ifNotExists = ifNotExists) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b8b0f3250fad8..ad83ada219d28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3153,6 +3153,14 @@ class SQLConf extends Serializable with Logging { def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED) + def sparkScratchDir: String = getConf(StaticSQLConf.SPARK_SCRATCH_DIR) + + def maxSizeForTemporaryTablesPerSession: Long = + getConf(StaticSQLConf.TEMPORARY_TABLE_MAX_SIZE_PER_SESSION) + + def maxNumberForTemporaryTablesPerSession: Long = + getConf(StaticSQLConf.TEMPORARY_TABLE_MAX_NUM_PER_SESSION) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 9618ff6062635..986aab9e777b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.internal import java.util.Locale +import org.apache.spark.network.util.ByteUnit import org.apache.spark.util.Utils @@ -226,4 +227,22 @@ object StaticSQLConf { .version("3.0.0") .intConf .createWithDefault(100) + + val SPARK_SCRATCH_DIR = + buildStaticConf("spark.scratchdir") + .doc("Scratch space for Spark temporary table and so on. Similar with hive.exec.scratchdir") + .stringConf + .createWithDefault("/tmp/spark") + + val TEMPORARY_TABLE_MAX_SIZE_PER_SESSION = + buildStaticConf("spark.sql.temporary.table.max.size.per.session") + .doc("Maximum total size for temporary tables per session.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + + val TEMPORARY_TABLE_MAX_NUM_PER_SESSION = + buildStaticConf("spark.sql.temporary.table.max.number.per.session") + .doc("Maximum total size for temporary tables per session.") + .intConf + .createWithDefault(10000) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 4d88a8d7ee546..abaaa1a882d21 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -291,7 +291,7 @@ abstract class SessionCatalogSuite extends AnalysisTest { assert(catalog.getTempView("tbl2") == Option(tempTable2)) assert(catalog.getTempView("tbl3").isEmpty) // Temporary view already exists - intercept[TempTableAlreadyExistsException] { + intercept[TempViewAlreadyExistsException] { catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) } // Temporary view already exists but we override it @@ -741,7 +741,7 @@ abstract class SessionCatalogSuite extends AnalysisTest { } } - test("list tables with pattern and includeLocalTempViews") { + test("list tables with pattern and includeTempViewsAndTables") { withEmptyCatalog { catalog => catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) catalog.createTable(newTable("tbl1", "mydb"), ignoreIfExists = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 68ce82d5badda..14a95535a33f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -235,6 +235,14 @@ class SQLContext private[sql](val sparkSession: SparkSession) sparkSession.catalog.clearCache() } + /** + * Removes all temporary tables in this session. + * @since 3.1.0 + */ + def clearTempTables(): Unit = { + sparkSession.sessionState.catalog.clearTempTables() + } + // scalastyle:off // Disable style checker so "implicits" object can start with lowercase i /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 60738e6d4ef9e..46aa242ea4314 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -426,6 +426,20 @@ abstract class Catalog { */ def dropGlobalTempView(viewName: String): Boolean + /** + * Drops the temporary table with the given table name in the catalog. + * If the table has been cached before, then it will also be uncached. + * + * Temporary table is session-scoped. Its lifetime is the lifetime of the session that + * created it, i.e. it will be automatically dropped when the session terminates. It's not + * tied to any databases, i.e. we can't use `db1.table1` to reference a temporary table. + * + * @param tableName the name of the temporary table to be dropped. + * @return true if the table is dropped successfully, false otherwise. + * @since 3.1.0 + */ + def dropTempTable(tableName: String): Boolean + /** * Recovers all the partitions in the directory of a table and update the catalog. * Only works with a partitioned table, and not a view. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 9a8d2f0a142d4..b4b7deb5753a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -639,14 +639,22 @@ class ResolveSessionCatalog( comment: Option[String], ifNotExists: Boolean): CatalogTable = { val storage = CatalogStorageFormat.empty.copy( - locationUri = location.map(CatalogUtils.stringToURI), + locationUri = + if (properties.get(TableCatalog.PROP_TYPE).contains(CatalogUtils.TEMPORARY_TABLE)) { + Some(catalogManager.v1SessionCatalog.defaultTempTablePath(table)) + } else { + location.map(CatalogUtils.stringToURI) + }, properties = options) - val tableType = if (location.isDefined) { - CatalogTableType.EXTERNAL - } else { - CatalogTableType.MANAGED - } + val tableType = + if (properties.get(TableCatalog.PROP_TYPE).contains(CatalogUtils.TEMPORARY_TABLE)) { + CatalogTableType.TEMPORARY + } else if (location.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } CatalogTable( identifier = table, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index aa139cb6b0c3b..8a175b7a0f5ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -172,43 +172,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } } - /** - * Create a table, returning a [[CreateTable]] logical plan. - * - * This is used to produce CreateTempViewUsing from CREATE TEMPORARY TABLE. - * - * TODO: Remove this. It is used because CreateTempViewUsing is not a Catalyst plan. - * Either move CreateTempViewUsing into catalyst as a parsed logical plan, or remove it because - * it is deprecated. - */ - override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { - val (ident, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) - - if (!temp || ctx.query != null) { - super.visitCreateTable(ctx) - } else { - if (external) { - operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) - } - if (ifNotExists) { - // Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support - // IF NOT EXISTS. Users are not allowed to replace the existing temp table. - operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) - } - - val (_, _, _, options, _, _) = visitCreateTableClauses(ctx.createTableClauses()) - val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse( - throw new ParseException("CREATE TEMPORARY TABLE without a provider is not allowed.", ctx)) - val schema = Option(ctx.colTypeList()).map(createSchema) - - logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " + - "CREATE TEMPORARY VIEW ... USING ... instead") - - val table = tableIdentifier(ident, "CREATE TEMPORARY VIEW", ctx) - CreateTempViewUsing(table, schema, replace = false, global = false, provider, options) - } - } - /** * Creates a [[CreateTempViewUsing]] logical plan. */ @@ -327,8 +290,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { // TODO: implement temporary tables if (temp) { throw new ParseException( - "CREATE TEMPORARY TABLE is not supported yet. " + - "Please use CREATE TEMPORARY VIEW as an alternative.", ctx) + "CREATE TEMPORARY (Hive) TABLE is not supported yet. " + + "Please use CREATE TEMPORARY TABLE USING PARQUET or" + + "CREATE TEMPORARY VIEW as an alternative.", ctx) } if (ctx.skewSpec.size > 0) { operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 68c47d6a6dfaa..bcc1f99a5171d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command import java.net.URI import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -53,7 +54,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo if (ignoreIfExists) { return Seq.empty[Row] } else { - throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") + if (sessionState.catalog.isTemporaryTable(table.identifier)) { + throw new TempTableAlreadyExistsException(table.identifier) + } else { + throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") + } } } @@ -116,7 +121,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo // We will return Nil or throw exception at the beginning if the table already exists, so when // we reach here, the table should not exist and we should set `ignoreIfExists` to false. - sessionState.catalog.createTable(newTable, ignoreIfExists = false) + if (DDLUtils.isTemporaryTable(table)) { + sessionState.catalog.createTempTable(newTable, ignoreIfExists = false) + } else { + sessionState.catalog.createTable(newTable, ignoreIfExists = false) + } Seq.empty[Row] } @@ -170,6 +179,8 @@ case class CreateDataSourceTableAsSelectCommand( sparkSession.sessionState.catalog.validateTableLocation(table) val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { Some(sessionState.catalog.defaultTablePath(table.identifier)) + } else if (table.tableType == CatalogTableType.TEMPORARY) { + Some(sessionState.catalog.defaultTempTablePath(tableIdentWithDB)) } else { table.storage.locationUri } @@ -181,15 +192,19 @@ case class CreateDataSourceTableAsSelectCommand( // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). schema = result.schema) - // Table location is already validated. No need to check it again during table creation. - sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) + if (DDLUtils.isTemporaryTable(table)) { + sessionState.catalog.createTempTable(newTable, ignoreIfExists = false) + } else { + // Table location is already validated. No need to check it again during table creation. + sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) - result match { - case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && + result match { + case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && sparkSession.sqlContext.conf.manageFilesourcePartitions => - // Need to recover partitions into the metastore so our saved data is visible. - sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd - case _ => + // Need to recover partitions into the metastore so our saved data is visible. + sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd + case _ => + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f41c4eca203af..a86c318feba98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -218,7 +218,7 @@ case class DropTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val isTempView = catalog.isTemporaryTable(tableName) + val isTempView = catalog.isTemporaryView(tableName) if (!isTempView && catalog.tableExists(tableName)) { // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view @@ -851,6 +851,10 @@ object DDLUtils { table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER } + def isTemporaryTable(table: CatalogTable): Boolean = { + table.tableType == CatalogTableType.TEMPORARY + } + def readHiveTable(table: CatalogTable): HiveTableRelation = { HiveTableRelation( table, @@ -891,7 +895,7 @@ object DDLUtils { catalog: SessionCatalog, tableMetadata: CatalogTable, isView: Boolean): Unit = { - if (!catalog.isTemporaryTable(tableMetadata.identifier)) { + if (!catalog.isTemporaryView(tableMetadata.identifier)) { tableMetadata.tableType match { case CatalogTableType.VIEW if !isView => throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index fc8cc11bb1067..d0f53cf2c824a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -187,7 +187,7 @@ case class AlterTableRenameCommand( val catalog = sparkSession.sessionState.catalog // If this is a temp view, just rename the view. // Otherwise, if this is a real table, we also need to uncache and invalidate the table. - if (catalog.isTemporaryTable(oldName)) { + if (catalog.isTemporaryView(oldName)) { catalog.renameTable(oldName, newName) } else { val table = catalog.getTableMetadata(oldName) @@ -630,7 +630,7 @@ case class DescribeTableCommand( val result = new ArrayBuffer[Row] val catalog = sparkSession.sessionState.catalog - if (catalog.isTemporaryTable(table)) { + if (catalog.isTemporaryView(table)) { if (partitionSpec.nonEmpty) { throw new AnalysisException( s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}") @@ -871,7 +871,7 @@ case class ShowTablesCommand( tables.map { tableIdent => val database = tableIdent.database.getOrElse("") val tableName = tableIdent.table - val isTemp = catalog.isTemporaryTable(tableIdent) + val isTemp = catalog.isTemporaryView(tableIdent) || catalog.isTemporaryTable(tableIdent) if (isExtended) { val information = catalog.getTempViewOrPermanentTableMetadata(tableIdent).simpleString Row(database, tableName, isTemp, s"$information\n") @@ -889,7 +889,7 @@ case class ShowTablesCommand( val partition = catalog.getPartition(tableIdent, partitionSpec.get) val database = table.database.getOrElse("") val tableName = table.table - val isTemp = catalog.isTemporaryTable(table) + val isTemp = catalog.isTemporaryView(table) val information = partition.simpleString Seq(Row(database, tableName, isTemp, s"$information\n")) } @@ -919,7 +919,7 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (catalog.isTemporaryTable(table)) { + if (catalog.isTemporaryView(table)) { Seq.empty[Row] } else { val catalogTable = catalog.getTableMetadata(table) @@ -1118,7 +1118,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (catalog.isTemporaryTable(table)) { + if (catalog.isTemporaryView(table)) { throw new AnalysisException( s"SHOW CREATE TABLE is not supported on a temporary view: ${table.identifier}") } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 23f1d6c983413..57db990d3c5ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -323,7 +323,7 @@ case class ShowViewsCommand( views.map { tableIdent => val namespace = tableIdent.database.toArray.quoted val tableName = tableIdent.table - val isTemp = catalog.isTemporaryTable(tableIdent) + val isTemp = catalog.isTemporaryView(tableIdent) Row(namespace, tableName, isTemp) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 2ed33b867183b..22f0db3a7e3ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, Catal import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -88,7 +89,15 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) val location = Option(properties.get(TableCatalog.PROP_LOCATION)) val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) .copy(locationUri = location.map(CatalogUtils.stringToURI)) - val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED + val tableType = + if (Option(properties.get(TableCatalog.PROP_TYPE, "")) + .contains(CatalogUtils.TEMPORARY_TABLE)) { + CatalogTableType.TEMPORARY + } else if (location.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } val tableDesc = CatalogTable( identifier = ident.asTableIdentifier, @@ -103,7 +112,11 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) comment = Option(properties.get(TableCatalog.PROP_COMMENT))) try { - catalog.createTable(tableDesc, ignoreIfExists = false) + if (DDLUtils.isTemporaryTable(tableDesc)) { + catalog.createTempTable(tableDesc, ignoreIfExists = false) + } else { + catalog.createTable(tableDesc, ignoreIfExists = false) + } } catch { case _: TableAlreadyExistsException => throw new TableAlreadyExistsException(ident) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 7ca9fbb40d9f5..31f53329d434a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table} import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} @@ -114,7 +115,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } catch { case NonFatal(_) => None } - val isTemp = sessionCatalog.isTemporaryTable(tableIdent) + val isTemp = sessionCatalog.isTemporaryView(tableIdent) new Table( name = tableIdent.table, database = metadata.map(_.identifier.database).getOrElse(tableIdent.database).orNull, @@ -256,7 +257,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ override def tableExists(dbName: String, tableName: String): Boolean = { val tableIdent = TableIdentifier(tableName, Option(dbName)) - sessionCatalog.isTemporaryTable(tableIdent) || sessionCatalog.tableExists(tableIdent) + sessionCatalog.isTemporaryView(tableIdent) || sessionCatalog.tableExists(tableIdent) } /** @@ -377,6 +378,26 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } } + /** + * Drops the temporary table with the given table name in the catalog. + * If the table has been cached/persisted before, it's also unpersisted. + * + * @param tableName the identifier of the temporary table to be dropped. + * @group ddl_ops + * @since 3.1.0 + */ + override def dropTempTable(tableName: String): Boolean = { + sparkSession.sessionState.catalog.getTempTable(tableName).foreach { table => + table.viewText.map(CatalystSqlParser.parsePlan).foreach { plan => + sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession, plan, true) + } + } + val nameParts = tableName.split("\\.").toSeq + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val table = nameParts.asTableIdentifier + sessionCatalog.dropTempTable(table) + } + /** * Recovers all the partitions in the directory of a table and update the catalog. * Only works with a partitioned table, and not a temporary view. @@ -432,7 +453,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ override def uncacheTable(tableName: String): Unit = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - val cascade = !sessionCatalog.isTemporaryTable(tableIdent) + val cascade = !sessionCatalog.isTemporaryView(tableIdent) sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName), cascade) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index c6a533dfae4d0..c44dcd481ede4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -685,7 +685,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { test("create table - temporary") { val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)" val e = intercept[ParseException] { parser.parsePlan(query) } - assert(e.message.contains("CREATE TEMPORARY TABLE is not supported yet")) + assert(e.message.contains("CREATE TEMPORARY (Hive) TABLE is not supported yet")) } test("create table - external") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e4709e469dca3..d8d83dba10dce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.config import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException, TempViewAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER @@ -983,7 +983,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { Row("1997", "Ford") :: Nil) // Fails if creating a new view with the same name - intercept[TempTableAlreadyExistsException] { + intercept[TempViewAlreadyExistsException] { sql( s""" |CREATE TEMPORARY VIEW testview @@ -1048,7 +1048,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { withTempView("tab1") { sql( """ - |CREATE TEMPORARY TABLE tab1 + |CREATE TEMPORARY VIEW tab1 |USING org.apache.spark.sql.sources.DDLScanSource |OPTIONS ( | From '1', @@ -1111,7 +1111,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { withTempView("tab1", "tab2") { sql( """ - |CREATE TEMPORARY TABLE tab1 + |CREATE TEMPORARY VIEW tab1 |USING org.apache.spark.sql.sources.DDLScanSource |OPTIONS ( | From '1', @@ -1122,7 +1122,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql( """ - |CREATE TEMPORARY TABLE tab2 + |CREATE TEMPORARY VIEW tab2 |USING org.apache.spark.sql.sources.DDLScanSource |OPTIONS ( | From '1', @@ -2140,18 +2140,18 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { test("block creating duplicate temp table") { withTempView("t_temp") { - sql("CREATE TEMPORARY VIEW t_temp AS SELECT 1, 2") + sql("CREATE TEMPORARY TABLE t_temp USING parquet AS SELECT 1, 2") val e = intercept[TempTableAlreadyExistsException] { sql("CREATE TEMPORARY TABLE t_temp (c3 int, c4 string) USING JSON") }.getMessage - assert(e.contains("Temporary view 't_temp' already exists")) + assert(e.contains("Temporary table 'default.t_temp' already exists")) } } test("block creating duplicate temp view") { withTempView("t_temp") { sql("CREATE TEMPORARY VIEW t_temp AS SELECT 1, 2") - val e = intercept[TempTableAlreadyExistsException] { + val e = intercept[TempViewAlreadyExistsException] { sql("CREATE TEMPORARY VIEW t_temp (c3 int, c4 string) USING JSON") }.getMessage assert(e.contains("Temporary view 't_temp' already exists")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TemporaryTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TemporaryTableSuite.scala new file mode 100644 index 0000000000000..fc50a5ae59297 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TemporaryTableSuite.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import java.io.File + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils + +class TemporaryTableSuite extends QueryTest with SharedSparkSession { + + override def afterEach(): Unit = { + try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() + } finally { + Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath)) + super.afterEach() + } + } + + test("create temporary table using data source") { + withTempTable("tt1", "sameName", "`bad.name`") { + sql("create temporary table tt1 (id int) using parquet") + sql("insert into table tt1 values (1)") + assert(spark.table("tt1").collect === Array(Row(1))) + + withTempView("same") { + sql("create temporary view same as select 1") + val e = intercept[AnalysisException]( + sql("create temporary table same (id int) using parquet")) + assert(e.message.contains("Temporary view 'same' already exists")) + } + + withTempView("same") { + sql("create temporary table same using parquet as select 1") + val e = intercept[AnalysisException]( + sql("create temporary view same (id int) using parquet")) + assert(e.message.contains("Temporary table 'default.same' already exists")) + } + + val e = intercept[AnalysisException]( + sql("create temporary table `bad.name` (id int) using parquet")) + assert(e.message.contains("`bad.name` is not a valid name for tables/databases.")) + } + } + + test("create temporary table using data source as select") { + withTempTable("tt1", "sameName", "`bad.name`") { + sql("create temporary table tt1 using parquet as select 1") + assert(spark.table("tt1").collect === Array(Row(1))) + + withTempView("same") { + sql("create temporary view same as select 1") + val e = intercept[AnalysisException]( + sql("create temporary table same using parquet as select 1")) + assert(e.message.contains("Temporary view 'same' already exists")) + } + + withTempView("same") { + sql("create temporary table same using parquet as select 1") + val e = intercept[AnalysisException]( + sql("create temporary view same as select 1")) + assert(e.message.contains("Temporary table 'default.same' already exists")) + } + + val e = intercept[AnalysisException]( + sql("create temporary table `bad.name` using parquet as select 1")) + assert(e.message.contains("`bad.name` is not a valid name for tables/databases.")) + } + } + + test("create temporary table with location is not allowed") { + withTempTable("tt1") { + val e = intercept[AnalysisException]( + sql( + """ + |create temporary table tt1 (id int) using parquet + |LOCATION '/PATH' + |""".stripMargin)) + assert(e.message.contains("specify LOCATION in temporary table")) + } + } + + test("create a same name temporary table and permanent table") { + withDatabase("db1") { + sql("create database db1") + withTable("same1", "same2", "same3", "same4", "same5", "same6") { + withTempTable("same1", "same2", "same3", "same4", "same5", "same6") { + // create temporary table then create table + sql("create temporary table same using parquet as select 'temp_table'") + var e = intercept[AnalysisException]( + sql("create table same using parquet as select 'table'") + ).getMessage + assert(e.contains("already exists")) + + sql("create temporary table same2 (key int) using parquet") + e = intercept[AnalysisException]( + sql("create table same2 (key int) using parquet") + ).getMessage + assert(e.contains("already exists")) + + sql("create temporary table db1.same3 (key int) using parquet") + sql("create table same3 (key int) using parquet") + e = intercept[AnalysisException]( + sql("create table db1.same3 (key int) using parquet") + ).getMessage + assert(e.contains("already exists")) + + // create table then create temporary table + sql("create table same4 using parquet as select 'table'") + e = intercept[AnalysisException]( + sql("create temporary table same4 using parquet as select 'temp_table'") + ).getMessage + assert(e.contains("already exists")) + + sql("create table same5 (key int) using parquet") + e = intercept[AnalysisException]( + sql("create temporary table same5 (key int) using parquet") + ).getMessage + assert(e.contains("already exists")) + + sql("create table db1.same6 (key int) using parquet") + sql("create temporary table same6 (key int) using parquet") + e = intercept[AnalysisException]( + sql("create temporary table db1.same6 (key int) using parquet") + ).getMessage + assert(e.contains("already exists")) + } + } + } + } + + test("drop temporary table") { + withDatabase("db1") { + withTempTable("t1", "db1.t1") { + sql("create temporary table t1 using parquet as select 1") + assert(spark.table("t1").collect === Array(Row(1))) + sql("drop table t1") + assert(spark.sessionState.catalog.getTempTable("t1").isEmpty) + + sql("CREATE DATABASE db1") + sql("create temporary table db1.t1 using parquet as select 1") + assert(spark.table("db1.t1").collect === Array(Row(1))) + sql("drop table db1.t1") + assert(spark.sessionState.catalog.getTempTable("db1.t1").isEmpty) + } + } + } + + + test("create temporary partition table is not allowed") { + withTempTable("t1") { + val e = intercept[ParseException] { + sql("create temporary table t1 (a int, b int) " + + "using parquet partitioned by (b)") + }.getMessage + assert(e.contains("Operation not allowed")) + } + } + + test("insert into temporary table") { + withDatabase("dba") { + sql("CREATE DATABASE dba") + withTempTable("dba.tt1", "tt1", "tt2") { + sql("CREATE TEMPORARY TABLE dba.tt1 (key int) USING PARQUET") + sql("CREATE TEMPORARY TABLE tt1 USING PARQUET AS SELECT 1") + checkAnswer(spark.table("dba.tt1"), Seq()) + checkAnswer(spark.table("tt1"), Seq(Row(1))) + + sql("INSERT INTO TABLE dba.tt1 SELECT * FROM tt1") + sql("INSERT OVERWRITE TABLE tt1 VALUES (2)") + checkAnswer(spark.table("dba.tt1"), Seq(Row(1))) + checkAnswer(spark.table("tt1"), Seq(Row(2))) + } + } + } + + test("show tables with temporary tables") { + withDatabase("db1") { + withTempTable("tt1", "tt2", "db1.tt1", "db1.tt2") { + sql("create database db1") + sql("create temporary table tt1 using parquet as select 1") + sql("create table tt2 using parquet as select 1") + sql("create temporary table db1.tt1 using parquet as select 1") + sql("create table db1.tt2 using parquet as select 1") + + checkAnswer(sql("show tables in db1"), + Seq(Row("db1", "tt1", true), Row("db1", "tt2", false))) + checkAnswer(sql("show tables"), + Seq(Row("default", "tt1", true), Row("default", "tt2", false))) + } + } + } + + private def withStaticSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { + pairs.foreach { case (k, v) => + SQLConf.get.setConfString(k, v) + } + try f finally { + pairs.foreach { case (k, _) => + SQLConf.get.unsetConf(k) + } + } + } + + test("check temporary table limitation") { + withTempView("tt1", "tt2", "tt3") { + withStaticSQLConf(StaticSQLConf.TEMPORARY_TABLE_MAX_SIZE_PER_SESSION.key -> "0") { + sql("CREATE TEMPORARY TABLE tt1 (key int) USING PARQUET") + val e = intercept[AnalysisException] { + sql("CREATE TEMPORARY TABLE tt2 USING PARQUET AS SELECT 1") + }.getMessage + assert(e.contains("The total size of temporary tables in this session has reached")) + } + withStaticSQLConf(StaticSQLConf.TEMPORARY_TABLE_MAX_SIZE_PER_SESSION.key -> "1000") { + sql("CREATE TEMPORARY TABLE tt2 USING PARQUET AS SELECT 1") + } + withStaticSQLConf(StaticSQLConf.TEMPORARY_TABLE_MAX_NUM_PER_SESSION.key -> "2") { + val e = intercept[AnalysisException] { + sql("CREATE TEMPORARY TABLE tt3 USING PARQUET AS SELECT 1") + }.getMessage + assert(e.contains("The total number of temporary tables in this session has reached")) + } + withStaticSQLConf(StaticSQLConf.TEMPORARY_TABLE_MAX_NUM_PER_SESSION.key -> "3") { + sql("CREATE TEMPORARY TABLE tt3 USING PARQUET AS SELECT 1") + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 983209051c8ae..3d5f74458a966 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -153,20 +153,18 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession { } } - test("disallows CREATE TEMPORARY TABLE ... USING ... AS query") { - withTable("t") { + test("disallows CREATE TEMPORARY VIEW ... USING ... AS query") { + withTempView("t") { val error = intercept[ParseException] { sql( s""" - |CREATE TEMPORARY TABLE t USING PARQUET + |CREATE TEMPORARY VIEW t USING PARQUET |OPTIONS (PATH '${path.toURI}') |PARTITIONED BY (a) |AS SELECT 1 AS a, 2 AS b """.stripMargin ) - }.getMessage - assert(error.contains("Operation not allowed") && - error.contains("CREATE TEMPORARY TABLE ... USING ... AS query")) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 7be15e9d87004..8457dbeb321c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -319,6 +319,24 @@ private[sql] trait SQLTestUtilsBase ) } + /** + * Drops temporary table `tableNames` after calling `f`. + */ + protected def withTempTable(tableNames: String*)(f: => Unit): Unit = { + try f finally { + // If the test failed part way, we don't want to mask the failure by failing to remove + // temp tables that never got created. + tableNames.foreach { name => + try { + spark.catalog.dropTempTable(name) + } catch { + case _: NoSuchTableException => // db.name or `db`.`name` format + spark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + } + /** * Drops cache `cacheName` after calling `f`. */ @@ -335,7 +353,7 @@ private[sql] trait SQLTestUtilsBase // Blocking uncache table for tests protected def uncacheTable(tableName: String): Unit = { val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(tableName) - val cascade = !spark.sessionState.catalog.isTemporaryTable(tableIdent) + val cascade = !spark.sessionState.catalog.isTemporaryView(tableIdent) spark.sharedState.cacheManager.uncacheQuery( spark, spark.table(tableName).logicalPlan, diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index ca8ad5e6ad134..b328878ea7481 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -87,7 +87,7 @@ private[hive] class SparkGetColumnsOperation( } val db2Tabs = catalog.listDatabases(schemaPattern).map { dbName => - (dbName, catalog.listTables(dbName, tablePattern, includeLocalTempViews = false)) + (dbName, catalog.listTables(dbName, tablePattern, includeTempViewsAndTables = false)) }.toMap if (isAuthV2Enabled) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index a1d21e2d60c63..f2c4f125dc1b6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -88,7 +88,7 @@ private[hive] class SparkGetTablesOperation( try { // Tables and views matchingDbs.foreach { dbName => - val tables = catalog.listTables(dbName, tablePattern, includeLocalTempViews = false) + val tables = catalog.listTables(dbName, tablePattern, includeTempViewsAndTables = false) catalog.getTablesByName(tables).foreach { table => val tableType = tableTypeString(table.tableType) if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) { @@ -102,7 +102,7 @@ private[hive] class SparkGetTablesOperation( val globalTempViewDb = catalog.globalTempViewManager.database val databasePattern = Pattern.compile(CLIServiceUtils.patternToRegex(schemaName)) val tempViews = if (databasePattern.matcher(globalTempViewDb).matches()) { - catalog.listTables(globalTempViewDb, tablePattern, includeLocalTempViews = true) + catalog.listTables(globalTempViewDb, tablePattern, includeTempViewsAndTables = true) } else { catalog.listLocalTempViews(tablePattern) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala index 3da568cfa256e..59e364ac4685d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog.CatalogTableType -import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, TEMPORARY, VIEW} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -90,6 +90,7 @@ private[hive] trait SparkOperation extends Operation with Logging { def tableTypeString(tableType: CatalogTableType): String = tableType match { case EXTERNAL | MANAGED => "TABLE" case VIEW => "VIEW" + case TEMPORARY => "LOCAL TEMPORARY" case t => throw new IllegalArgumentException(s"Unknown table type is found: $t") } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 8944b93d9b697..5819baeb197e8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -67,6 +67,9 @@ private[hive] object SparkSQLEnv extends Logging { /** Cleans up and shuts down the Spark SQL environments. */ def stop(): Unit = { logDebug("Shutting down Spark SQL Environment") + if (sqlContext == null) { + sqlContext.clearTempTables() + } // Stop the SparkContext if (SparkSQLEnv.sparkContext != null) { sparkContext.stop() diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index e10e7ed1a2769..26f5b748415ed 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -77,6 +77,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: HiveThriftServer2.eventManager.onSessionClosed(sessionHandle.getSessionId.toString) val ctx = sparkSqlOperationManager.sessionToContexts.getOrDefault(sessionHandle, sqlContext) ctx.sparkSession.sessionState.catalog.getTempViewNames().foreach(ctx.uncacheTable) + ctx.clearTempTables() super.closeSession(sessionHandle) sparkSqlOperationManager.sessionToContexts.remove(sessionHandle) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 4e6d4e104021f..b67779ca55efe 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -862,6 +862,19 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } } + + test("temporary table is session-bound") { + withMultipleConnectionJdbcStatement()({ + statement => + statement.execute("CREATE TEMPORARY TABLE default.tempTbl1 USING PARQUET AS SELECT 1") + }, { + statement => + val e = intercept[SQLException] { + statement.execute("Table or view not found: default.tempTbl1") + } + assert(e.getMessage.contains(":")) + }) + } } class SingleSessionSuite extends HiveThriftJdbcTest { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index f7ee3e0a46cd1..bda630f04b949 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -179,7 +179,7 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { withJdbcStatement() { statement => val metaData = statement.getConnection.getMetaData - checkResult(metaData.getTableTypes, Seq("TABLE", "VIEW")) + checkResult(metaData.getTableTypes, Seq("TABLE", "VIEW", "LOCAL TEMPORARY")) } }