Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Dec 3, 2019
1 parent 4933701 commit 985e84d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,8 @@ class Analyzer(
}

/**
* Resolve relations to temp views. This is not an actual rule, and is only called by
* [[ResolveTables]].
* Resolve relations to temp views. This is not an actual rule, and is called by
* [[ResolveTables]] and [[ResolveRelations]].
*/
object ResolveTempViews extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
Expand All @@ -679,8 +679,7 @@ class Analyzer(
}

/**
* Resolve table relations with concrete relations from v2 catalog. This is not an actual rule,
* and is only called by [[ResolveRelations]].
* Resolve table relations with concrete relations from v2 catalog.
*
* [[ResolveRelations]] still resolves v1 tables.
*/
Expand Down Expand Up @@ -747,8 +746,7 @@ class Analyzer(
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
// have empty defaultDatabase and all the relations in viewText have database part defined.
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
case u @ UnresolvedRelation(CatalogObjectIdentifier(catalog, ident))
if CatalogV2Util.isSessionCatalog(catalog) =>
case u @ UnresolvedRelation(SessionCatalogAndIdentifier(catalog, ident)) =>
lookupRelation(catalog, ident, recurse = true).getOrElse(u)

// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
Expand All @@ -774,8 +772,8 @@ class Analyzer(

def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case i @ InsertIntoStatement(
u @ UnresolvedRelation(CatalogObjectIdentifier(catalog, ident)), _, _, _, _)
if i.query.resolved && CatalogV2Util.isSessionCatalog(catalog) =>
u @ UnresolvedRelation(SessionCatalogAndIdentifier(catalog, ident)), _, _, _, _)
if i.query.resolved =>
val relation = ResolveTempViews(u) match {
case unresolved: UnresolvedRelation =>
lookupRelation(catalog, ident, recurse = false).getOrElse(unresolved)
Expand All @@ -793,20 +791,18 @@ class Analyzer(

// Look up a relation from a given session catalog with the following logic:
// 1) If a relation is not found in the catalog, return None.
// 2) If a relation is found and is not a v1 table, create a v2 relation.
// 3) If a relation is found and is a v1 table,
// a) If it has a v2 provider, create a v2 relation.
// b) If it doesn't have a v2 provider and is not running on files, create v1 relation.
// Relation that runs directly on files will be
// c) Otherwise, return None.
// 2) If a relation is found,
// a) if it is a v1 table not running on files, create a v1 relation
// b) otherwise, create a v2 relation.
// 3) Otherwise, return None.
// If recurse is set to true, it will call `resolveRelation` recursively to resolve
// relations with the correct AnalysisContext.defaultDatabase scope.
// relations with the correct database scope.
private def lookupRelation(
catalog: CatalogPlugin,
ident: Identifier,
recurse: Boolean): Option[LogicalPlan] = {
val newIdent = withNewNamespace(ident)
require(newIdent.namespace.size == 1)
assert(newIdent.namespace.size == 1)

CatalogV2Util.loadTable(catalog, newIdent) match {
case Some(v1Table: V1Table) =>
Expand All @@ -831,15 +827,14 @@ class Analyzer(
// 1. Use the existing namespace if it is defined.
// 2. Use defaultDatabase fom AnalysisContext, if it is defined. In this case, no temporary
// objects can be used, and the default database is only used to look up a view.
// 3. Use the current namespace. Note that the catalog will be a session catalog since
// this function gets called only when the catalog is resolved to a session catalog.
// 3. Use the current namespace of the session catalog.
private def withNewNamespace(ident: Identifier): Identifier = {
if (ident.namespace.nonEmpty) {
ident
} else {
val defaultNamespace = AnalysisContext.get.defaultDatabase match {
case Some(db) => Array(db)
case None => catalogManager.currentNamespace
case None => Array(v1SessionCatalog.getCurrentDatabase)
}
Identifier.of(defaultNamespace, ident.name)
}
Expand Down Expand Up @@ -2874,28 +2869,17 @@ class Analyzer(
}

/**
* Performs the lookup of DataSourceV2 Tables. The order of resolution is:
* 1. Check if this relation is a temporary table.
* 2. Check if it has a catalog identifier. Here we try to load the table.
* If we find the table, return the v2 relation and catalog.
* 3. Try resolving the relation using the V2SessionCatalog if that is defined.
* If the V2SessionCatalog returns a V1 table definition,
* return `None` so that we can fallback to the V1 code paths.
* If the V2SessionCatalog returns a V2 table, return the v2 relation and V2SessionCatalog.
* Performs the lookup of DataSourceV2 Tables from v2 catalog.
*/
private def lookupV2RelationAndCatalog(
identifier: Seq[String]): Option[(DataSourceV2Relation, CatalogPlugin, Identifier)] =
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
identifier match {
case CatalogObjectIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) =>
case NonSessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) => Some((DataSourceV2Relation.create(table), catalog, ident))
case Some(table) => Some(DataSourceV2Relation.create(table))
case None => None
}
case _ => None
}

private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
lookupV2RelationAndCatalog(identifier).map(_._1)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ private[sql] trait LookupCatalog extends Logging {
}
}

/**
* Extract session catalog and identifier from a multi-part identifier.
*/
object SessionCatalogAndIdentifier {
def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = parts match {
case CatalogObjectIdentifier(catalog, ident) if CatalogV2Util.isSessionCatalog(catalog) =>
Some(catalog, ident)
case _ => None
}
}

/**
* Extract non-session catalog and identifier from a multi-part identifier.
*/
object NonSessionCatalogAndIdentifier {
def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = parts match {
case CatalogObjectIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) =>
Some(catalog, ident)
case _ => None
}
}

/**
* Extract catalog and namespace from a multi-part identifier with the current catalog if needed.
* Catalog name takes precedence over namespaces.
Expand Down

0 comments on commit 985e84d

Please sign in to comment.