Skip to content

Commit

Permalink
[SPARK-30352][SQL] DataSourceV2: Add CURRENT_CATALOG function
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

As we support multiple catalogs with DataSourceV2, we may need the `CURRENT_CATALOG` value expression from the SQL standard.

`CURRENT_CATALOG` is a general value specification in the SQL Standard, described as:

> The value specified by CURRENT_CATALOG is the character string that represents the current default catalog name.

### Why are the changes needed?
improve catalog v2 with ANSI SQL standard.

### Does this PR introduce any user-facing change?
yes, add a new function `current_catalog()` to point the current active catalog

### How was this patch tested?

add ut

Closes #27006 from yaooqinn/SPARK-30352.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
yaooqinn authored and cloud-fan committed May 25, 2020
1 parent d400777 commit 0df8dd6
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ object FunctionRegistry {
expression[InputFileBlockLength]("input_file_block_length"),
expression[MonotonicallyIncreasingID]("monotonically_increasing_id"),
expression[CurrentDatabase]("current_database"),
expression[CurrentCatalog]("current_catalog"),
expression[CallMethodViaReflection]("reflect"),
expression[CallMethodViaReflection]("java_method", true),
expression[SparkVersion]("version"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable {
override def prettyName: String = "current_database"
}

/**
* Returns the current catalog.
*/
@ExpressionDescription(
usage = "_FUNC_() - Returns the current catalog.",
examples = """
Examples:
> SELECT _FUNC_();
spark_catalog
""",
since = "3.1.0")
case class CurrentCatalog() extends LeafExpression with Unevaluable {
override def dataType: DataType = StringType
override def foldable: Boolean = true
override def nullable: Boolean = false
override def prettyName: String = "current_catalog"
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """_FUNC_() - Returns an universally unique identifier (UUID) string. The value is returned as a canonical UUID 36-character string.""",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
ReplaceExpressions,
RewriteNonCorrelatedExists,
ComputeCurrentTime,
GetCurrentDatabase(catalogManager),
GetCurrentDatabaseAndCatalog(catalogManager),
RewriteDistinctAggregates,
ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -223,7 +223,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
EliminateView.ruleName ::
ReplaceExpressions.ruleName ::
ComputeCurrentTime.ruleName ::
GetCurrentDatabase(catalogManager).ruleName ::
GetCurrentDatabaseAndCatalog(catalogManager).ruleName ::
RewriteDistinctAggregates.ruleName ::
ReplaceDeduplicateWithAggregate.ruleName ::
ReplaceIntersectWithSemiJoin.ruleName ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,21 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
}


/** Replaces the expression of CurrentDatabase with the current database name. */
case class GetCurrentDatabase(catalogManager: CatalogManager) extends Rule[LogicalPlan] {
/**
* Replaces the expression of CurrentDatabase with the current database name.
* Replaces the expression of CurrentCatalog with the current catalog name.
*/
case class GetCurrentDatabaseAndCatalog(catalogManager: CatalogManager) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val currentNamespace = catalogManager.currentNamespace.quoted
val currentCatalog = catalogManager.currentCatalog.name()

plan transformAllExpressions {
case CurrentDatabase() =>
Literal.create(currentNamespace, StringType)
case CurrentCatalog() =>
Literal.create(currentCatalog, StringType)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<!-- Automatically generated by ExpressionsSchemaSuite -->
## Summary
- Number of queries: 336
- Number of queries: 337
- Number of expressions that missing example: 34
- Expressions missing examples: and,string,tinyint,double,smallint,date,decimal,boolean,float,binary,bigint,int,timestamp,cume_dist,dense_rank,input_file_block_length,input_file_block_start,input_file_name,lag,lead,monotonically_increasing_id,ntile,struct,!,not,or,percent_rank,rank,row_number,spark_partition_id,version,window,positive,count_min_sketch
## Schema of Built-in Functions
Expand Down Expand Up @@ -82,6 +82,7 @@
| org.apache.spark.sql.catalyst.expressions.CsvToStructs | from_csv | SELECT from_csv('1, 0.8', 'a INT, b DOUBLE') | struct<from_csv(1, 0.8):struct<a:int,b:double>> |
| org.apache.spark.sql.catalyst.expressions.Cube | cube | SELECT name, age, count(*) FROM VALUES (2, 'Alice'), (5, 'Bob') people(age, name) GROUP BY cube(name, age) | struct<name:string,age:int,count(1):bigint> |
| org.apache.spark.sql.catalyst.expressions.CumeDist | cume_dist | N/A | N/A |
| org.apache.spark.sql.catalyst.expressions.CurrentCatalog | current_catalog | SELECT current_catalog() | struct<current_catalog():string> |
| org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_database | SELECT current_database() | struct<current_database():string> |
| org.apache.spark.sql.catalyst.expressions.CurrentDate | current_date | SELECT current_date() | struct<current_date():date> |
| org.apache.spark.sql.catalyst.expressions.CurrentTimestamp | current_timestamp | SELECT current_timestamp() | struct<current_timestamp():timestamp> |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- get current_datebase and current_catalog
select current_database(), current_catalog();
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 1


-- !query
select current_database(), current_catalog()
-- !query schema
struct<current_database():string,current_catalog():string>
-- !query output
default spark_catalog

0 comments on commit 0df8dd6

Please sign in to comment.