Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ trait FunctionRegistry {

/** Checks if a function with a given name exists. */
def functionExists(name: String): Boolean = lookupFunction(name).isDefined

/** Clear all registered functions. */
def clear(): Unit

}

class SimpleFunctionRegistry extends FunctionRegistry {
Expand Down Expand Up @@ -93,6 +97,10 @@ class SimpleFunctionRegistry extends FunctionRegistry {
functionBuilders.remove(name).isDefined
}

override def clear(): Unit = {
functionBuilders.clear()
}

def copy(): SimpleFunctionRegistry = synchronized {
val registry = new SimpleFunctionRegistry
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
Expand Down Expand Up @@ -132,6 +140,10 @@ object EmptyFunctionRegistry extends FunctionRegistry {
throw new UnsupportedOperationException
}

override def clear(): Unit = {
throw new UnsupportedOperationException
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,18 @@ class SessionCatalog(
dbTables ++ _tempTables
}

// TODO: It's strange that we have both refresh and invalidate here.

/**
* Refresh the cache entry for a metastore table, if any.
*/
def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }

/**
* Invalidate the cache entry for a metastore table, if any.
*/
def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ }

/**
* Drop all existing temporary tables.
* For testing only.
Expand Down Expand Up @@ -595,6 +602,11 @@ class SessionCatalog(
}
}

/**
* List all functions in the specified database, including temporary functions.
*/
def listFunctions(db: String): Seq[FunctionIdentifier] = listFunctions(db, "*")

/**
* List all matching functions in the specified database, including temporary functions.
*/
Expand All @@ -609,4 +621,34 @@ class SessionCatalog(
// So, the returned list may have two entries for the same function.
dbFunctions ++ loadedFunctions
}


// -----------------
// | Other methods |
// -----------------

/**
* Drop all existing databases (except "default") along with all associated tables,
* partitions and functions, and set the current database to "default".
*
* This is mainly used for tests.
*/
private[sql] def reset(): Unit = {
val default = "default"
listDatabases().filter(_ != default).foreach { db =>
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
}
tempTables.clear()
functionRegistry.clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a FunctionRegistry also hold all builtin functions, is it call for removing all builders or just removing all non-builtin builders?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now this removes all functions (builtin or not). This is mainly used for tests only so I think it's OK.

// restore built-in functions
FunctionRegistry.builtin.listFunction().foreach { f =>
val expressionInfo = FunctionRegistry.builtin.lookupFunction(f)
val functionBuilder = FunctionRegistry.builtin.lookupFunctionBuilder(f)
require(expressionInfo.isDefined, s"built-in function '$f' is missing expression info")
require(functionBuilder.isDefined, s"built-in function '$f' is missing function builder")
functionRegistry.registerFunction(f, expressionInfo.get, functionBuilder.get)
}
setCurrentDatabase(default)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ class StringKeyHashMap[T](normalizer: (String) => String) {
def remove(key: String): Option[T] = base.remove(normalizer(key))

def iterator: Iterator[(String, T)] = base.toIterator

def clear(): Unit = base.clear()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution

import scala.collection.JavaConverters._

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, AstBuilder, ParseException}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
Expand Down Expand Up @@ -378,8 +378,7 @@ class SparkSqlAstBuilder extends AstBuilder {
override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableRename(
visitTableIdentifier(ctx.from),
visitTableIdentifier(ctx.to))(
command(ctx))
visitTableIdentifier(ctx.to))
}

/**
Expand All @@ -395,26 +394,24 @@ class SparkSqlAstBuilder extends AstBuilder {
ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
AlterTableSetProperties(
visitTableIdentifier(ctx.tableIdentifier),
visitTablePropertyList(ctx.tablePropertyList))(
command(ctx))
visitTablePropertyList(ctx.tablePropertyList))
}

/**
* Create an [[AlterTableUnsetProperties]] command.
*
* For example:
* {{{
* ALTER TABLE table UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
* ALTER VIEW view UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
* ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* ALTER VIEW view UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* }}}
*/
override def visitUnsetTableProperties(
ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
AlterTableUnsetProperties(
visitTableIdentifier(ctx.tableIdentifier),
visitTablePropertyList(ctx.tablePropertyList),
ctx.EXISTS != null)(
command(ctx))
visitTablePropertyList(ctx.tablePropertyList).keys.toSeq,
ctx.EXISTS != null)
}

/**
Expand All @@ -432,116 +429,41 @@ class SparkSqlAstBuilder extends AstBuilder {
Option(ctx.STRING).map(string),
Option(ctx.tablePropertyList).map(visitTablePropertyList),
// TODO a partition spec is allowed to have optional values. This is currently violated.
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
command(ctx))
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

/**
* Create an [[AlterTableStorageProperties]] command.
*
* For example:
* {{{
* ALTER TABLE table CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS;
* }}}
*/
// TODO: don't even bother parsing alter table commands related to bucketing and skewing

override def visitBucketTable(ctx: BucketTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableStorageProperties(
visitTableIdentifier(ctx.tableIdentifier),
visitBucketSpec(ctx.bucketSpec))(
command(ctx))
throw new AnalysisException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you throw ParseExceptions instead of AnalysisExceptions. You could also move these rules to the unsupportedHiveNativeCommands rule in SqlBase.g4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried doing that, but the thing is some tests are in SQL so they'll end up using the SQL parser, which doesn't understand hive native commands. Let's fix that separately (see the TODO I left).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Owww... Forgot about that. +1 for fixing it separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Operation not allowed: ALTER TABLE ... CLUSTERED BY ... INTO N BUCKETS")
}

/**
* Create an [[AlterTableNotClustered]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT CLUSTERED;
* }}}
*/
override def visitUnclusterTable(ctx: UnclusterTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotClustered(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT CLUSTERED")
}

/**
* Create an [[AlterTableNotSorted]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT SORTED;
* }}}
*/
override def visitUnsortTable(ctx: UnsortTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotSorted(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SORTED")
}

/**
* Create an [[AlterTableSkewed]] command.
*
* For example:
* {{{
* ALTER TABLE table SKEWED BY (col1, col2)
* ON ((col1_value, col2_value) [, (col1_value, col2_value), ...])
* [STORED AS DIRECTORIES];
* }}}
*/
override def visitSkewTable(ctx: SkewTableContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier)
val (cols, values, storedAsDirs) = visitSkewSpec(ctx.skewSpec)
AlterTableSkewed(table, cols, values, storedAsDirs)(command(ctx))
throw new AnalysisException("Operation not allowed: ALTER TABLE ... SKEWED BY ...")
}

/**
* Create an [[AlterTableNotSorted]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT SKEWED;
* }}}
*/
override def visitUnskewTable(ctx: UnskewTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotSkewed(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SKEWED")
}

/**
* Create an [[AlterTableNotStoredAsDirs]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT STORED AS DIRECTORIES
* }}}
*/
override def visitUnstoreTable(ctx: UnstoreTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotStoredAsDirs(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... NOT STORED AS DIRECTORIES")
}

/**
* Create an [[AlterTableSkewedLocation]] command.
*
* For example:
* {{{
* ALTER TABLE table SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] );
* }}}
*/
override def visitSetTableSkewLocations(
ctx: SetTableSkewLocationsContext): LogicalPlan = withOrigin(ctx) {
val skewedMap = ctx.skewedLocationList.skewedLocation.asScala.flatMap {
slCtx =>
val location = string(slCtx.STRING)
if (slCtx.constant != null) {
Seq(visitStringConstant(slCtx.constant) -> location)
} else {
// TODO this is similar to what was in the original implementation. However this does not
// make to much sense to me since we should be storing a tuple of values (not column
// names) for which we want a dedicated storage location.
visitConstantList(slCtx.constantList).map(_ -> location)
}
}.toMap

AlterTableSkewedLocation(
visitTableIdentifier(ctx.tableIdentifier),
skewedMap)(
command(ctx))
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... SET SKEWED LOCATION ...")
}

/**
Expand Down Expand Up @@ -703,8 +625,7 @@ class SparkSqlAstBuilder extends AstBuilder {
AlterTableSetLocation(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
visitLocationSpec(ctx.locationSpec))(
command(ctx))
visitLocationSpec(ctx.locationSpec))
}

/**
Expand Down
Loading