Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5366,7 +5366,7 @@
},
"REQUIRES_SINGLE_PART_NAMESPACE" : {
"message" : [
"<sessionCatalog> requires a single-part namespace, but got <namespace>."
"<sessionCatalog> requires a single-part namespace, but got identifier <identifier>."
],
"sqlState" : "42K05"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ private[sql] object CatalogV2Implicits {
def asMultipartIdentifier: Seq[String] = (ident.namespace :+ ident.name).toImmutableArraySeq

def asTableIdentifier: TableIdentifier = ident.namespace match {
case ns if ns.isEmpty => TableIdentifier(ident.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious here, is it necessary to remove these if ns is not empty? I see that asTableIdentifierOpt still handles the empty ns case.

I assume in V1SessionCatalog case, the namespace is never empty, but just checking if its necessary to remove, in case some v2 connector is using the method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case Array(dbName) => TableIdentifier(ident.name, Some(dbName))
case _ => throw QueryCompilationErrors.identifierTooManyNamePartsError(original)
case _ =>
throw QueryCompilationErrors.requiresSinglePartNamespaceError(asMultipartIdentifier)
}

/**
Expand All @@ -192,9 +192,9 @@ private[sql] object CatalogV2Implicits {
}

def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match {
case ns if ns.isEmpty => FunctionIdentifier(ident.name())
case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
case _ => throw QueryCompilationErrors.identifierTooManyNamePartsError(original)
case _ =>
throw QueryCompilationErrors.requiresSinglePartNamespaceError(asMultipartIdentifier)
}

def toQualifiedNameParts(catalog: CatalogPlugin): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package org.apache.spark.sql.connector.catalog

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.util.ArrayImplicits._

/**
* A trait to encapsulate catalog lookup function and helpful extractors.
Expand Down Expand Up @@ -109,29 +107,24 @@ private[sql] trait LookupCatalog extends Logging {

def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
assert(nameParts.nonEmpty)
val (catalog, ident) = if (nameParts.length == 1) {
(currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head))
if (nameParts.length == 1) {
Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
// API does not support view yet, and we have to use v1 commands to deal with global temp
// views. To simplify the implementation, we put global temp views in a special namespace
// in the session catalog. The special namespace has higher priority during name resolution.
// For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
// this custom catalog can't be accessed.
(catalogManager.v2SessionCatalog, nameParts.asIdentifier)
Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
} else {
try {
(catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier)
Some((catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier))
} catch {
case _: CatalogNotFoundException =>
(currentCatalog, nameParts.asIdentifier)
Some((currentCatalog, nameParts.asIdentifier))
}
}
if (CatalogV2Util.isSessionCatalog(catalog) && ident.namespace().length != 1) {
throw QueryCompilationErrors.requiresSinglePartNamespaceError(
ident.namespace().toImmutableArraySeq)
}
Some((catalog, ident))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1542,12 +1542,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
new TableAlreadyExistsException(ident.asMultipartIdentifier)
}

def requiresSinglePartNamespaceError(namespace: Seq[String]): Throwable = {
def requiresSinglePartNamespaceError(identifier: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "REQUIRES_SINGLE_PART_NAMESPACE",
messageParameters = Map(
"sessionCatalog" -> CatalogManager.SESSION_CATALOG_NAME,
"namespace" -> toSQLId(namespace)))
"identifier" -> toSQLId(identifier)))
}

def namespaceAlreadyExistsError(namespace: Array[String]): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {

test("catalog and identifier") {
Seq(
// Session catalog with single-part namespace
("tbl", sessionCatalog, Seq("default"), "tbl"),
("db.tbl", sessionCatalog, Seq("db"), "tbl"),
(s"$globalTempDB.tbl", sessionCatalog, Seq(globalTempDB), "tbl"),
(s"$globalTempDB.ns1.ns2.tbl", sessionCatalog, Seq(globalTempDB, "ns1", "ns2"), "tbl"),
("ns1.ns2.tbl", sessionCatalog, Seq("ns1", "ns2"), "tbl"),
("`db.tbl`", sessionCatalog, Seq("default"), "db.tbl"),
("parquet.`file:/tmp/db.tbl`", sessionCatalog, Seq("parquet"), "file:/tmp/db.tbl"),
("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", sessionCatalog,
Seq("org.apache.spark.sql.json"), "s3://buck/tmp/abc.json"),
// Non-session catalogs (no namespace restriction)
("prod.func", catalogs("prod"), Seq.empty, "func"),
("prod.db.tbl", catalogs("prod"), Seq("db"), "tbl"),
("test.db.tbl", catalogs("test"), Seq("db"), "tbl"),
Expand All @@ -79,21 +79,6 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
}
}

test("session catalog requires single-part namespace") {
// Multi-part namespaces are not allowed for session catalog
Seq(
"ns1.ns2.tbl", // two-part namespace
s"$globalTempDB.ns1.ns2.tbl" // three-part namespace
).foreach { sql =>
val e = intercept[org.apache.spark.sql.AnalysisException] {
parseMultipartIdentifier(sql) match {
case CatalogAndIdentifier(_, _) =>
}
}
assert(e.getCondition === "REQUIRES_SINGLE_PART_NAMESPACE")
}
}

test("table identifier") {
Seq(
("tbl", "tbl", None),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructField, StructType}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.SparkStringUtils

/**
Expand Down Expand Up @@ -727,10 +726,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedPersistentView(catalog, ident, _) =>
assert(isSessionCatalog(catalog))
assert(ident.namespace().length == 1)
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
Some(ident.asTableIdentifier.copy(catalog = Some(catalog.name)))

case ResolvedTempView(ident, _) => Some(ident.asTableIdentifier)
case ResolvedTempView(ident, _) =>
Some(TableIdentifier(ident.name(), ident.namespace().headOption))

case _ => None
}
Expand Down Expand Up @@ -763,24 +762,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
object ResolvedV1Identifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedIdentifier(catalog, ident) if supportsV1Command(catalog) =>
if (ident.namespace().length != 1) {
throw QueryCompilationErrors
.requiresSinglePartNamespaceError(ident.namespace().toImmutableArraySeq)
}
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
Some(ident.asTableIdentifier.copy(catalog = Some(catalog.name)))
case _ => None
}
}

// Use this object to help match commands that do not have a v2 implementation.
object ResolvedIdentifierInSessionCatalog{
object ResolvedIdentifierInSessionCatalog {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) =>
if (ident.namespace().length != 1) {
throw QueryCompilationErrors
.requiresSinglePartNamespaceError(ident.namespace().toImmutableArraySeq)
}
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
Some(ident.asTableIdentifier.copy(catalog = Some(catalog.name)))
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,12 +698,12 @@ class SparkSqlAstBuilder extends AstBuilder {
}

withIdentClause(ctx.identifierReference(), Seq(qPlan), (ident, otherPlans) => {
val tableIdentifier = ident.asTableIdentifier
if (tableIdentifier.database.isDefined) {
if (ident.length > 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this case covered by some test? (its easy to understand though, so not blocking)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Temporary view names should NOT contain database prefix like "database.table"
throw QueryParsingErrors
.notAllowedToAddDBPrefixForTempViewError(tableIdentifier.nameParts, ctx)
.notAllowedToAddDBPrefixForTempViewError(ident, ctx)
}
val tableIdentifier = TableIdentifier(ident.head)

CreateViewCommand(
tableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.{QualifiedTableName, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
import org.apache.spark.sql.catalyst.util.TypeUtils._
Expand All @@ -45,6 +45,7 @@ import org.apache.spark.util.Utils
*/
class V2SessionCatalog(catalog: SessionCatalog)
extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import V2SessionCatalog._

override val defaultNamespace: Array[String] = Array(conf.defaultDatabase)
Expand Down Expand Up @@ -367,26 +368,6 @@ class V2SessionCatalog(catalog: SessionCatalog)
catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
}

implicit class TableIdentifierHelper(ident: Identifier) {
def asTableIdentifier: TableIdentifier = {
ident.namespace match {
case Array(db) =>
TableIdentifier(ident.name, Some(db))
case other =>
throw QueryCompilationErrors.requiresSinglePartNamespaceError(other.toImmutableArraySeq)
}
}

def asFunctionIdentifier: FunctionIdentifier = {
ident.namespace match {
case Array(db) =>
FunctionIdentifier(ident.name, Some(db))
case other =>
throw QueryCompilationErrors.requiresSinglePartNamespaceError(other.toImmutableArraySeq)
}
}
}

override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
case Array(db) =>
catalog.databaseExists(db)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class JDBCTableCatalog extends TableCatalog
with FunctionCatalog
with DataTypeErrorsBase
with Logging {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

private var catalogName: String = null
private var options: JDBCOptions = _
Expand Down Expand Up @@ -434,7 +433,8 @@ class JDBCTableCatalog extends TableCatalog

override def loadFunction(ident: Identifier): UnboundFunction = {
if (ident.namespace().nonEmpty) {
throw QueryCompilationErrors.noSuchFunctionError(ident.asFunctionIdentifier)
throw QueryCompilationErrors.identifierTooManyNamePartsError(
(ident.namespace().toSeq :+ ident.name()))
}
functions.get(ident.name()) match {
case Some(func) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
condition = "REQUIRES_SINGLE_PART_NAMESPACE",
parameters = Map(
"sessionCatalog" -> "spark_catalog",
"namespace" -> "`default`.`ns1`.`ns2`")
"identifier" -> "`default`.`ns1`.`ns2`.`fun`")
)
}

Expand All @@ -204,7 +204,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
condition = "REQUIRES_SINGLE_PART_NAMESPACE",
parameters = Map(
"sessionCatalog" -> "spark_catalog",
"namespace" -> "`default`.`ns1`.`ns2`")
"identifier" -> "`default`.`ns1`.`ns2`.`fun`")
)
}

Expand Down Expand Up @@ -240,7 +240,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
condition = "REQUIRES_SINGLE_PART_NAMESPACE",
parameters = Map(
"sessionCatalog" -> "spark_catalog",
"namespace" -> "`default`.`ns1`.`ns2`")
"identifier" -> "`default`.`ns1`.`ns2`.`fun`")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2582,7 +2582,7 @@ class DataSourceV2SQLSuiteV1Filter
condition = "REQUIRES_SINGLE_PART_NAMESPACE",
parameters = Map(
"sessionCatalog" -> "spark_catalog",
"namespace" -> "`global_temp`.`ns1`.`ns2`"))
"identifier" -> "`global_temp`.`ns1`.`ns2`.`tbl`"))
}

test("table name same as catalog can be used") {
Expand Down Expand Up @@ -2616,7 +2616,9 @@ class DataSourceV2SQLSuiteV1Filter
checkError(
exception = analysisException(sql),
condition = "REQUIRES_SINGLE_PART_NAMESPACE",
parameters = Map("sessionCatalog" -> "spark_catalog", "namespace" -> ""))
parameters = Map(
"sessionCatalog" -> "spark_catalog",
"identifier" -> "`t`"))
}

verify(s"select * from $t")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,15 +712,21 @@ class QueryCompilationErrorsSuite
)
}

test("IDENTIFIER_TOO_MANY_NAME_PARTS: " +
test("TEMP_VIEW_NAME_TOO_MANY_NAME_PARTS: " +
"create temp view doesn't support identifiers consisting of more than 2 parts") {
val sqlText =
"CREATE TEMPORARY VIEW db_name.schema_name.view_name AS SELECT '1' as test_column"
checkError(
exception = intercept[ParseException] {
sql("CREATE TEMPORARY VIEW db_name.schema_name.view_name AS SELECT '1' as test_column")
sql(sqlText)
},
condition = "IDENTIFIER_TOO_MANY_NAME_PARTS",
sqlState = "42601",
parameters = Map("identifier" -> "`db_name`.`schema_name`.`view_name`", "limit" -> "2")
condition = "TEMP_VIEW_NAME_TOO_MANY_NAME_PARTS",
sqlState = "428EK",
parameters = Map("actualName" -> "`db_name`.`schema_name`.`view_name`"),
context = ExpectedContext(
fragment = sqlText,
start = 0,
stop = sqlText.length - 1)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait ShowColumnsSuiteBase extends command.ShowColumnsSuiteBase {
condition = "REQUIRES_SINGLE_PART_NAMESPACE",
parameters = Map(
"sessionCatalog" -> catalog,
"namespace" -> "`a`.`b`.`c`"
"identifier" -> "`a`.`b`.`c`.`tbl`"
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,8 +1162,10 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
val testIdent: IdentifierHelper = Identifier.of(Array("a", "b"), "c")
checkError(
exception = intercept[AnalysisException](testIdent.asTableIdentifier),
condition = "IDENTIFIER_TOO_MANY_NAME_PARTS",
parameters = Map("identifier" -> "`a`.`b`.`c`", "limit" -> "2")
condition = "REQUIRES_SINGLE_PART_NAMESPACE",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just opinion that previously having the identifier was a bit easier to debug. I wonder if we can extend the error message to have a version with identifier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, working on it.

parameters = Map(
"sessionCatalog" -> "spark_catalog",
"identifier" -> "`a`.`b`.`c`")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CreateNamespaceSuite extends v1.CreateNamespaceSuiteBase with CommandSuite
condition = "REQUIRES_SINGLE_PART_NAMESPACE",
parameters = Map(
"sessionCatalog" -> catalog,
"namespace" -> "`ns1`.`ns2`"
"identifier" -> "`ns1`.`ns2`"
)
)
}
Expand Down