Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19598][SQL]Remove the alias parameter in UnresolvedRelation #16956

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -180,12 +180,8 @@ class Analyzer(
def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = {
plan transformDown {
case u : UnresolvedRelation =>
val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
.map(_._2).map { relation =>
val withAlias = u.alias.map(SubqueryAlias(_, relation, None))
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
.map(_._2).getOrElse(u)
case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other transformExpressions {
Expand Down Expand Up @@ -623,7 +619,7 @@ class Analyzer(
val tableIdentWithDb = u.tableIdentifier.copy(
database = u.tableIdentifier.database.orElse(defaultDatabase))
try {
catalog.lookupRelation(tableIdentWithDb, u.alias)
catalog.lookupRelation(tableIdentWithDb)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}")
Expand Down
Expand Up @@ -54,10 +54,8 @@ object ResolveHints {

val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
case r: UnresolvedRelation =>
val alias = r.alias.getOrElse(r.tableIdentifier.table)
if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan

case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually you can't remove this case entirely. you still need to match on UnresolvedRelation, but just use r.tableIdentifier.table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, yes, remove the case will miss the hint on table without alias

BroadcastHint(plan)
case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
BroadcastHint(plan)

Expand Down
Expand Up @@ -37,10 +37,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
/**
* Holds the name of a relation that has yet to be looked up in a catalog.
*/
case class UnresolvedRelation(
tableIdentifier: TableIdentifier,
alias: Option[String] = None) extends LeafNode {

case class UnresolvedRelation(tableIdentifier: TableIdentifier) extends LeafNode {
/** Returns a `.` separated name for this relation. */
def tableName: String = tableIdentifier.unquotedString

Expand Down
Expand Up @@ -572,16 +572,14 @@ class SessionCatalog(
* wrap the logical plan in a [[SubqueryAlias]] which will track the name of the view.
*
* @param name The name of the table/view that we look up.
* @param alias The alias name of the table/view that we look up.
*/
def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
def lookupRelation(name: TableIdentifier): LogicalPlan = {
synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
val relationAlias = alias.getOrElse(table)
if (db == globalTempViewManager.database) {
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, None)
SubqueryAlias(table, viewDef, None)
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
Expand All @@ -594,12 +592,12 @@ class SessionCatalog(
desc = metadata,
output = metadata.schema.toAttributes,
child = parser.parsePlan(viewText))
SubqueryAlias(relationAlias, child, Some(name.copy(table = table, database = Some(db))))
SubqueryAlias(table, child, Some(name.copy(table = table, database = Some(db))))
} else {
SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), None)
SubqueryAlias(table, SimpleCatalogRelation(metadata), None)
}
} else {
SubqueryAlias(relationAlias, tempTables(table), None)
SubqueryAlias(table, tempTables(table), None)
}
}
}
Expand Down
Expand Up @@ -280,11 +280,10 @@ package object dsl {
object expressions extends ExpressionConversions // scalastyle:ignore

object plans { // scalastyle:ignore
def table(ref: String): LogicalPlan =
UnresolvedRelation(TableIdentifier(ref), None)
def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref))

def table(db: String, ref: String): LogicalPlan =
UnresolvedRelation(TableIdentifier(ref, Option(db)), None)
UnresolvedRelation(TableIdentifier(ref, Option(db)))

implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) {
def select(exprs: Expression*): LogicalPlan = {
Expand Down Expand Up @@ -369,10 +368,7 @@ package object dsl {
analysis.UnresolvedRelation(TableIdentifier(tableName)),
Map.empty, logicalPlan, overwrite, false)

def as(alias: String): LogicalPlan = logicalPlan match {
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
case plan => SubqueryAlias(alias, plan, None)
}
def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan, None)

def repartition(num: Integer): LogicalPlan =
Repartition(num, shuffle = true, logicalPlan)
Expand Down
Expand Up @@ -179,7 +179,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}

InsertIntoTable(
UnresolvedRelation(tableIdent, None),
UnresolvedRelation(tableIdent),
partitionKeys,
query,
ctx.OVERWRITE != null,
Expand Down Expand Up @@ -645,17 +645,21 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
* }}}
*/
override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) {
UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier), None)
UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier))
}

/**
* Create an aliased table reference. This is typically used in FROM clauses.
*/
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
val table = UnresolvedRelation(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.strictIdentifier).map(_.getText))
table.optionalMap(ctx.sample)(withSample)
val table = UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier))

val tableWithAlias = Option(ctx.strictIdentifier).map(_.getText) match {
case Some(strictIdentifier) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be kept in the last line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I can not modify the original default expression order between alias and and sample.

SubqueryAlias(strictIdentifier, table, None)
case _ => table
}
tableWithAlias.optionalMap(ctx.sample)(withSample)
}

/**
Expand Down
Expand Up @@ -61,23 +61,23 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers {

checkAnalysis(
Project(Seq(UnresolvedAttribute("TbL.a")),
UnresolvedRelation(TableIdentifier("TaBlE"), Some("TbL"))),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")), None)),
Project(testRelation.output, testRelation))

assertAnalysisError(
Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
TableIdentifier("TaBlE"), Some("TbL"))),
Project(Seq(UnresolvedAttribute("tBl.a")),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")), None)),
Seq("cannot resolve"))

checkAnalysis(
Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(
TableIdentifier("TaBlE"), Some("TbL"))),
Project(Seq(UnresolvedAttribute("TbL.a")),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")), None)),
Project(testRelation.output, testRelation),
caseSensitive = false)

checkAnalysis(
Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
TableIdentifier("TaBlE"), Some("TbL"))),
Project(Seq(UnresolvedAttribute("tBl.a")),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")), None)),
Project(testRelation.output, testRelation),
caseSensitive = false)
}
Expand Down Expand Up @@ -166,12 +166,12 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers {
}

test("resolve relations") {
assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq())
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe")), Seq())
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE")), testRelation)
checkAnalysis(
UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
UnresolvedRelation(TableIdentifier("tAbLe")), testRelation, caseSensitive = false)
checkAnalysis(
UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
UnresolvedRelation(TableIdentifier("TaBlE")), testRelation, caseSensitive = false)
}

test("divide should be casted into fractional types") {
Expand Down Expand Up @@ -429,4 +429,14 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers {
assertAnalysisSuccess(r1)
assertAnalysisSuccess(r2)
}

test("resolve as with an already existed alias") {
checkAnalysis(
Project(Seq(UnresolvedAttribute("tbl2.a")),
SubqueryAlias("tbl", testRelation, None).as("tbl2")),
Project(testRelation.output, testRelation),
caseSensitive = false)

checkAnalysis(SubqueryAlias("tbl", testRelation, None).as("tbl2"), testRelation)
}
}
Expand Up @@ -444,28 +444,6 @@ class SessionCatalogSuite extends PlanTest {
== SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
}

test("lookup table relation with alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val alias = "monster"
val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
val relation = SubqueryAlias("tbl1", SimpleCatalogRelation(tableMetadata), None)
val relationWithAlias =
SubqueryAlias(alias,
SimpleCatalogRelation(tableMetadata), None)
assert(catalog.lookupRelation(
TableIdentifier("tbl1", Some("db2")), alias = None) == relation)
assert(catalog.lookupRelation(
TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias)
}

test("lookup view with view name in alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val tmpView = Range(1, 10, 2, 10)
catalog.createTempView("vw1", tmpView, overrideIfExists = false)
val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
assert(plan == SubqueryAlias("range", tmpView, None))
}

test("look up view relation") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
Expand Down
Expand Up @@ -52,8 +52,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
throw new AnalysisException("Unsupported data source type for direct query on files: " +
s"${u.tableIdentifier.database.get}")
}
val plan = LogicalRelation(dataSource.resolveRelation())
u.alias.map(a => SubqueryAlias(a, plan, None)).getOrElse(plan)
LogicalRelation(dataSource.resolveRelation())
} catch {
case _: ClassNotFoundException => u
case e: Exception =>
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Expand Up @@ -364,8 +364,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
upperCaseData.where('N <= 4).createOrReplaceTempView("`left`")
upperCaseData.where('N >= 3).createOrReplaceTempView("`right`")

val left = UnresolvedRelation(TableIdentifier("left"), None)
val right = UnresolvedRelation(TableIdentifier("right"), None)
val left = UnresolvedRelation(TableIdentifier("left"))
val right = UnresolvedRelation(TableIdentifier("right"))

checkAnswer(
left.join(right, $"left.N" === $"right.N", "full"),
Expand Down
Expand Up @@ -73,13 +73,13 @@ object TPCDSQueryBenchmark {
// per-row processing time for those cases.
val queryRelations = scala.collection.mutable.HashSet[String]()
spark.sql(queryString).queryExecution.logical.map {
case ur @ UnresolvedRelation(t: TableIdentifier, _) =>
case ur @ UnresolvedRelation(t: TableIdentifier) =>
queryRelations.add(t.table)
case lp: LogicalPlan =>
lp.expressions.foreach { _ foreach {
case subquery: SubqueryExpression =>
subquery.plan.foreach {
case ur @ UnresolvedRelation(t: TableIdentifier, _) =>
case ur @ UnresolvedRelation(t: TableIdentifier) =>
queryRelations.add(t.table)
case _ =>
}
Expand Down
Expand Up @@ -483,7 +483,7 @@ private[hive] class TestHiveQueryExecution(
// Make sure any test tables referenced are loaded.
val referencedTables =
describedTables ++
logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table }
logical.collect { case UnresolvedRelation(tableIdent) => tableIdent.table }
val referencedTestTables = referencedTables.filter(sparkSession.testTables.contains)
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
referencedTestTables.foreach(sparkSession.loadTestTable)
Expand Down