Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28635][SQL] create CatalogManager to track registered v2 catalogs #25368

Closed
wants to merge 8 commits into from

Conversation

@cloud-fan
Copy link
Contributor

commented Aug 6, 2019

What changes were proposed in this pull request?

This is a pure refactor PR, which creates a new class CatalogManager to track the registered v2 catalogs, and provide the catalog up functionality.

CatalogManager also tracks the current catalog/namespace. We will implement corresponding commands in other PRs, like USE CATALOG my_catalog

How was this patch tested?

existing tests

* Returns the default catalog specified by config.
*/
def defaultCatalog(): Option[CatalogPlugin] = {
conf.defaultV2Catalog.flatMap { catalogName =>

This comment has been minimized.

}

def v2SessionCatalog(): Option[CatalogPlugin] = {
try {

This comment has been minimized.

Copy link
@cloud-fan
@cloud-fan

This comment has been minimized.

Copy link
Contributor Author

commented Aug 6, 2019

/**
* Returns the default catalog specified by config.
*/
def defaultCatalog(): Option[CatalogPlugin] = {

This comment has been minimized.

Copy link
@rdblue

rdblue Aug 6, 2019

Contributor

Why does this require empty parentheses?

}

// Clear all the registered catalogs. Only used in tests.
def reset(): Unit = catalogs.clear()

This comment has been minimized.

Copy link
@rdblue

rdblue Aug 6, 2019

Contributor

If this is only used in tests, can we restrict its visibility?


override protected def lookupCatalog(name: String): CatalogPlugin =
throw new CatalogNotFoundException("No catalog lookup function")
override val catalogManager: CatalogManager = new CatalogManager(conf)

This comment has been minimized.

Copy link
@rdblue

rdblue Aug 6, 2019

Contributor

I thought that this was going to be part of SessionState and not Analyzer. Why not SessionState?

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Aug 7, 2019

Author Contributor

Because it's only needed by the analyzer.

Alternatively, we can put CatalogManager in BaseSessionStateBuilder, and pass it to analyzer when we create analyzer in BaseSessionStateBuilder.

Technically, it's already in SessionState because SessionState holds the analyzer.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Aug 7, 2019

Author Contributor

I can add a SessionState.catalogManager which calls analyzer.catalogManager, to make it easier to access the catalog manager. What do you think?

This comment has been minimized.

Copy link
@rdblue

rdblue Aug 7, 2019

Contributor

Let's put CatalogManager in SessionState and pass it into the analyzer. That's a better place for the catalog manager.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Aug 7, 2019

Author Contributor

Let's put CatalogManager in SessionState and pass it into the analyzer

This is not doable. The analyzer is created in BaseSessionStateBuilder. So there are 2 options:

  1. create the CatalogManager in BaseSessionStateBuilder, and pass it to the analyzer.
  2. create the CatalogManager in the analyzer, and put an accessor of it in SessionState.

I think 2 is better.

This comment has been minimized.

Copy link
@rdblue

rdblue Aug 7, 2019

Contributor

Okay, let's do #2 then.

@SparkQA

This comment has been minimized.

Copy link

commented Aug 6, 2019

Test build #108722 has finished for PR 25368 at commit de95b81.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CatalogManager(conf: SQLConf) extends Logging

@dongjoon-hyun dongjoon-hyun added the SQL label Aug 6, 2019

@SparkQA

This comment has been minimized.

Copy link

commented Aug 7, 2019

Test build #108756 has finished for PR 25368 at commit e85f495.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@rdblue

This comment has been minimized.

Copy link
Contributor

commented Aug 7, 2019

@cloud-fan, how will users set the current catalog using the catalog manager?

We want to support changing the current catalog, right? Like this:

USE CATALOG my_catalog;
@@ -137,7 +116,3 @@ trait LookupCatalog extends Logging {
}
}
}

object LookupCatalog {
val SESSION_CATALOG_NAME: String = "session"

This comment has been minimized.

@cloud-fan

This comment has been minimized.

Copy link
Contributor Author

commented Aug 8, 2019

@rdblue we need a separated PR to implement USE CATALOG my_catalog, which needs to add a new SQL syntax and the related logical/physical plans. This PR is more of a code refactor, and I've added the "currentCatalog" and "currentNamespace" in the CatalogManager.

@SparkQA

This comment has been minimized.

Copy link

commented Aug 8, 2019

Test build #108800 has finished for PR 25368 at commit 73fdb22.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan force-pushed the cloud-fan:refactor branch from 73fdb22 to a440694 Aug 9, 2019

@SparkQA

This comment has been minimized.

Copy link

commented Aug 9, 2019

Test build #108860 has finished for PR 25368 at commit a440694.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@cloud-fan

This comment has been minimized.

Copy link
Contributor Author

commented Aug 9, 2019

retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 9, 2019

Test build #108869 has finished for PR 25368 at commit a440694.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
// Returns the name of current catalog. None means the current catalog is the builtin catalog.
def currentCatalog: Option[String] = _currentCatalog

def setCurrentCatalog(catalog: String): Unit = {

This comment has been minimized.

Copy link
@brkyvz

brkyvz Aug 9, 2019

Contributor

shouldn't setting the current catalog also reset the current namespace to empty (unless we're back to using the default catalog, then it should be 'default' again)?


def currentNamespace: Array[String] = _currentNamespace

def setCurrentNamespace(namespace: Array[String]): Unit = {

This comment has been minimized.

Copy link
@brkyvz

brkyvz Aug 9, 2019

Contributor

also do these need to be synchronized as well? It's not thread-safe at the moment, but I can't think of a case where synchronizing actually buys us anything (because one user will already be in a bad state if they share sessions any way)

sparkSession.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
sparkSession.conf.set("spark.sql.default.catalog", "testcat")
sparkSession.sql(s"CREATE TABLE table_name (id bigint, data string) USING foo")
spark.conf.set("spark.sql.default.catalog", "testcat")

This comment has been minimized.

Copy link
@brkyvz

brkyvz Aug 9, 2019

Contributor

withSQLConf? Otherwise you affect all tests below

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Aug 12, 2019

Author Contributor

This test suite will clear all the configs after each test, see https://github.com/apache/spark/pull/25368/files#diff-b49f76fba19ee10a28e0e61c4b44e1a0R62

val sparkSession = spark.newSession()
sparkSession.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
sparkSession.conf.set("spark.sql.default.catalog", "testcat")
spark.conf.set("spark.sql.default.catalog", "testcat")

This comment has been minimized.

Copy link
@brkyvz

brkyvz Aug 9, 2019

Contributor

ditto

@SparkQA

This comment has been minimized.

Copy link

commented Aug 12, 2019

Test build #108946 has finished for PR 25368 at commit b07790d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@cloud-fan

This comment has been minimized.

Copy link
Contributor Author

commented Aug 13, 2019

@brkyvz @rdblue any more comments?

@brkyvz
Copy link
Contributor

left a comment

I think we can get totally rid of LookupCatalog after such a refactoring. Also, I'm feeling more like we should leave the default namespace to the catalog, rather than the CatalogManager. Thoughts? cc @rdblue

}

private var _currentNamespace = {
// The builtin catalog use "default" as the default database.

This comment has been minimized.

Copy link
@brkyvz

brkyvz Aug 14, 2019

Contributor

I'm thinking more about whether this should be the duty of the CatalogManager or the catalog itself...

My thoughts are:

  1. Once the V2SessionCatalog SupportsNamespaces, it can have the default namespace default set within it
  2. When you run a command like USE foo.bar where foo is the catalog, we should set the default namespace within that catalog to be bar.
  3. What if the default catalog doesn't support namespaces in the first place?

I'm a bit worried that having the default namespace setting in the CatalogManager can lead to a split brain situation.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Aug 14, 2019

Author Contributor

I think current namespace only make sense to the current catalog, e.g. SELECT ... FROM t, t can be a table in the current catalog's current namespace. However, for SELECT ... FROM c1.t, it's confusing to say t is a table in catalog c1's current namespace.

When a table identifier starts with a catalog name, it should be a fully qualified identifier, and we can't apply current namespace here.

catalog (including V2SessionCatalog) can report its default namespace, which will be used as the current namespace when switching to the catalog at the first time.

This comment has been minimized.

Copy link
@brkyvz

brkyvz Aug 14, 2019

Contributor

I think we're saying the same things. I totally agree that:
for a catalog c1

SELECT ... FROM c1.t

, this should be a fully qualified identifier. I'm saying that we should push the namespace configuration into the catalog that supports it. It shouldn't be part of the CatalogManager.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Aug 14, 2019

Author Contributor

I'm not sure we should ask the catalog implementation to do it. This will make catalog mutable and make it harder to implement. e.g. they need to take care of the current namespace in many methods like loadTable, createTable, etc. It also brings some risks if the catalog implementation has bugs for tracking the current namespace.

Instead, I think it's better to let Spark track the current namespace, and always extend the table identifier with the current namespace and make it a fully qualified identifier before calling catalog APIs. I'm fine for not tracking it in CatalogManager, but we should track it somewhere in Spark.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Aug 14, 2019

Author Contributor

A side story: Hive can track the current database, but Spark track the current database itself, and always call Hive APIs with fully qualified table identifier.

This comment has been minimized.

Copy link
@brkyvz

brkyvz Aug 16, 2019

Contributor

Oh, wait, I got the defaultNamespace setting in SupportsNamespaces wrong. I thought it was actually tracked there. This looks good to me. Sorry.

@@ -45,8 +45,8 @@ case class DataSourceResolution(
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
import lookup._

lazy val v2SessionCatalog: CatalogPlugin = lookup.sessionCatalog
.getOrElse(throw new AnalysisException("No v2 session catalog implementation is available"))
def v2SessionCatalog: CatalogPlugin = lookup.sessionCatalog

This comment has been minimized.

Copy link
@brkyvz

brkyvz Aug 14, 2019

Contributor

Do we even need LookupCatalog anymore?

This comment has been minimized.

Copy link
@brkyvz

brkyvz Aug 14, 2019

Contributor

Shouldn't this rule just get the CatalogManager as an input instead of LookupCatalog?

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Aug 14, 2019

Author Contributor

The LookupCatalog has some convenient utils, e.g. CatalogObjectIdentifier, AsTableIdentifier, etc. I think we should still keep it.

BTW good point about making this rule take CatalogManager directly. Will update it soon.

@cloud-fan cloud-fan force-pushed the cloud-fan:refactor branch from b07790d to 30ce779 Aug 14, 2019

@cloud-fan cloud-fan force-pushed the cloud-fan:refactor branch from 30ce779 to 9186620 Aug 14, 2019

@SparkQA

This comment has been minimized.

Copy link

commented Aug 14, 2019

Test build #109099 has finished for PR 25368 at commit 9186620.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 15, 2019

Test build #109144 has finished for PR 25368 at commit 45cbbd0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@cloud-fan

This comment has been minimized.

Copy link
Contributor Author

commented Aug 16, 2019

retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 16, 2019

Test build #109168 has finished for PR 25368 at commit 45cbbd0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@brkyvz

This comment has been minimized.

Copy link
Contributor

commented Aug 16, 2019

LGTM pending tests

@brkyvz

This comment has been minimized.

Copy link
Contributor

commented Aug 16, 2019

retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 16, 2019

Test build #109184 has finished for PR 25368 at commit 45cbbd0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@cloud-fan

This comment has been minimized.

Copy link
Contributor Author

commented Aug 16, 2019

retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 16, 2019

Test build #109202 has started for PR 25368 at commit 45cbbd0.

@cloud-fan

This comment has been minimized.

Copy link
Contributor Author

commented Aug 16, 2019

retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 16, 2019

Test build #109224 has finished for PR 25368 at commit 45cbbd0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 20, 2019

Test build #109380 has finished for PR 25368 at commit 4895a6e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class FileCommitProtocol extends Logging
  • class BindingParquetOutputCommitter(
  • class PathOutputCommitProtocol(
  • class DecisionTreeParams(Params):
@dilipbiswal

This comment has been minimized.

Copy link
Contributor

commented Aug 20, 2019

retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 20, 2019

Test build #109392 has finished for PR 25368 at commit 4895a6e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class FileCommitProtocol extends Logging
  • class BindingParquetOutputCommitter(
  • class PathOutputCommitProtocol(
  • class DecisionTreeParams(Params):
@cloud-fan

This comment has been minimized.

Copy link
Contributor Author

commented Aug 20, 2019

thanks for the review, merging to master!

@cloud-fan cloud-fan closed this in d045221 Aug 20, 2019

@rdblue

This comment has been minimized.

Copy link
Contributor

commented Aug 20, 2019

@cloud-fan, sorry for the delay reviewing this. +1 from me.


private var _currentNamespace = {
// The builtin catalog use "default" as the default database.
defaultCatalog.map(getDefaultNamespace).getOrElse(Array("default"))

This comment has been minimized.

Copy link
@imback82

imback82 Aug 21, 2019

Contributor

@cloud-fan is this the right behavior? Since CatalogManager is created in Analyzer, unless you set the default catalog (via spark.sql.default.catalog) when you create the SparkSession, this will be set to "default".

For #25247, you cannot do the following:

// Assume that 'testcat' catalog implements SupportsNamespaces and returns Array() for the defaultNamespace.
spark.conf.set("spark.sql.default.catalog", "testcat")

// Now the following will use `default` as the default namespace instead of ``.
spark.sql("SHOW TABLES")

But, I see SQL queries where "spark.sql.default.catalog" is set/used without creating a new SparkSession here:

spark.conf.set("spark.sql.default.catalog", "testcat")

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Aug 21, 2019

Author Contributor

Good catch! The currentNamespace and currentCatalog are not used anywhere and I was planning to add tests when I implement switching current namespace/catalog. Let me add some simple tests to reflect runtime config changes first.

This comment has been minimized.

Copy link
@rdblue

rdblue Aug 21, 2019

Contributor

Thanks for finding this @imback82!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants
You can’t perform that action at this time.