Skip to content

Commit

Permalink
[CARMEL-3156][SPARK-32064][SQL] Supporting create temporary table (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin authored and chrysanxia committed Jul 15, 2020
1 parent f657056 commit 3e23e3d
Show file tree
Hide file tree
Showing 33 changed files with 793 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,14 @@ private[spark] class SparkHadoopUtil extends Logging {
ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY
}

def deletePath(path: Path): Boolean = {
val fs = path.getFileSystem(conf)
if (fs.exists(path)) {
fs.delete(path, true)
} else {
true
}
}
}

private[spark] object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public interface TableCatalog extends CatalogPlugin {
*/
String PROP_OWNER = "owner";

/**
* A reserved property to specify the type of the table.
* @since 3.1.0 To determinate a temporary/volatile table.
*/
String PROP_TYPE = "specific_type";

/**
* List the tables in a namespace from the catalog.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -45,9 +46,16 @@ class TableAlreadyExistsException(message: String) extends AnalysisException(mes
}
}

class TempTableAlreadyExistsException(table: String)
class TempViewAlreadyExistsException(table: String)
extends TableAlreadyExistsException(s"Temporary view '$table' already exists")

class TempTableAlreadyExistsException(table: TableIdentifier)
extends TableAlreadyExistsException(s"Temporary table '${table.unquotedString}' already exists")

class TempTablePartitionUnsupportedException(table: TableIdentifier)
extends AnalysisException(
s"Partition is unsupported in temporary table '$table'")

class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec)
extends AnalysisException(
s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, Predicate}
import org.apache.spark.sql.connector.catalog.TableCatalog

object ExternalCatalogUtils {
// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't
Expand Down Expand Up @@ -231,4 +232,11 @@ object CatalogUtils {
s"defined table columns are: ${tableCols.mkString(", ")}")
}
}

val TEMPORARY_TABLE = "temporary"

def isTemporaryTable(table: CatalogTable): Boolean = {
table.tableType == CatalogTableType.TEMPORARY ||
table.properties.get(TableCatalog.PROP_TYPE).contains(TEMPORARY_TABLE)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.analysis.TempViewAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.StringUtils

Expand Down Expand Up @@ -58,7 +58,7 @@ class GlobalTempViewManager(val database: String) {
viewDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = synchronized {
if (!overrideIfExists && viewDefinitions.contains(name)) {
throw new TempTableAlreadyExistsException(name)
throw new TempViewAlreadyExistsException(name)
}
viewDefinitions.put(name, viewDefinition)
}
Expand Down
Loading

0 comments on commit 3e23e3d

Please sign in to comment.