Skip to content

Commit

Permalink
refactor view canonicalization
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangxb1987 committed Dec 6, 2016
1 parent 7863c62 commit 434009e
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand Down Expand Up @@ -509,32 +509,42 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = {
private def lookupTableFromCatalog(
u: UnresolvedRelation, db: Option[String] = None): LogicalPlan = {
try {
catalog.lookupRelation(u.tableIdentifier, u.alias)
catalog.lookupRelation(u.tableIdentifier, u.alias, db)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"Table or view not found: ${u.tableName}")
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
case u: UnresolvedRelation =>
val table = u.tableIdentifier
if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) &&
def apply(plan: LogicalPlan): LogicalPlan = {
var currentDatabase = catalog.getCurrentDatabase
plan resolveOperators {
case i@InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
case u: UnresolvedRelation =>
val table = u.tableIdentifier
if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) &&
(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) {
// If the database part is specified, and we support running SQL directly on files, and
// it's not a temporary view, and the table does not exist, then let's just return the
// original UnresolvedRelation. It is possible we are matching a query like "select *
// from parquet.`/path/to/query`". The plan will get resolved later.
// Note that we are testing (!db_exists || !table_exists) because the catalog throws
// an exception from tableExists if the database does not exist.
u
} else {
lookupTableFromCatalog(u)
}
// If the database part is specified, and we support running SQL directly on files, and
// it's not a temporary view, and the table does not exist, then let's just return the
// original UnresolvedRelation. It is possible we are matching a query like "select *
// from parquet.`/path/to/query`". The plan will get resolved later.
// Note that we are testing (!db_exists || !table_exists) because the catalog throws
// an exception from tableExists if the database does not exist.
u
} else {
val logicalPlan = lookupTableFromCatalog(u, Some(currentDatabase))
currentDatabase = logicalPlan.collectFirst {
case relation: CatalogRelation if relation.catalogTable.currentDatabase.isDefined =>
relation.catalogTable.currentDatabase.get
}.getOrElse(currentDatabase)

logicalPlan
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,12 @@ class SessionCatalog(
* If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will
* track the name of the view.
*/
def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
def lookupRelation(
name: TableIdentifier,
alias: Option[String] = None,
databaseHint: Option[String] = None): LogicalPlan = {
synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val db = formatDatabaseName(name.database.getOrElse(databaseHint.getOrElse(currentDb)))
val table = formatTableName(name.table)
val relationAlias = alias.getOrElse(table)
if (db == globalTempViewManager.database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ case class CatalogTable(
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: StructType,
originalSchema: Option[StructType] = None,
provider: Option[String] = None,
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
Expand All @@ -164,6 +165,7 @@ case class CatalogTable(
stats: Option[Statistics] = None,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
currentDatabase: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false) {
Expand Down Expand Up @@ -221,11 +223,14 @@ case class CatalogTable(
s"Last Access: ${new Date(lastAccessTime).toString}",
s"Type: ${tableType.name}",
if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "",
if (originalSchema.isDefined) s"Original Schema: ${originalSchema.mkString("[", ", ", "]")}"
else "",
if (provider.isDefined) s"Provider: ${provider.get}" else "",
if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else ""
) ++ bucketStrings ++ Seq(
viewOriginalText.map("Original View: " + _).getOrElse(""),
viewText.map("View: " + _).getOrElse(""),
currentDatabase.map("Database Hint: " + _).getOrElse(""),
comment.map("Comment: " + _).getOrElse(""),
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.command
import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.sql.types.{MetadataBuilder, StructType}


/**
Expand Down Expand Up @@ -64,9 +64,8 @@ object PersistedView extends ViewType


/**
* Create or replace a view with given query plan. This command will convert the query plan to
* canonicalized SQL string, and store it as view text in metastore, if we need to create a
* permanent view.
* Create or replace a view with given query plan. This command will store the originalText as
* view text in metastore, if we need to create a permanent view.
*
* @param name the name of this view.
* @param userSpecifiedColumns the output column names and optional comments specified by users,
Expand All @@ -75,8 +74,7 @@ object PersistedView extends ViewType
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API.
* @param child the logical plan that represents the view; this is used to generate a canonicalized
* version of the SQL that can be saved in the catalog.
* @param child the logical plan that represents the view.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
Expand Down Expand Up @@ -207,31 +205,56 @@ case class CreateViewCommand(
}

/**
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
* SQL based on the analyzed plan, and also creates the proper schema for the view.
* Returns a [[CatalogTable]] that can be used to save in the catalog. This stores the following
* properties for a view:
* 1. The `viewText` which is used to generate a logical plan when we resolve a view;
* 2. The `currentDatabase` which sets the current database on Analyze stage;
* 3. The `schema` which ensure we generate the correct output.
*/
private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = {
val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase

// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
} catch {
case NonFatal(e) =>
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
}
if (originalText.isDefined) {
val viewSQL = originalText.get

// Validate the view SQL - make sure we can resolve it with currentDatabase.
val originalSchema = try {
val unresolvedPlan = sparkSession.sessionState.sqlParser.parsePlan(viewSQL)
val resolvedPlan = sparkSession.sessionState.analyzer.execute(unresolvedPlan)
sparkSession.sessionState.analyzer.checkAnalysis(resolvedPlan)

resolvedPlan.schema
} catch {
case NonFatal(e) =>
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
}

CatalogTable(
identifier = name,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = aliasedPlan.schema,
properties = properties,
viewOriginalText = originalText,
viewText = Some(viewSQL),
comment = comment
)
CatalogTable(
identifier = name,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = aliasedPlan.schema,
originalSchema = Some(originalSchema),
properties = properties,
viewOriginalText = originalText,
viewText = Some(viewSQL),
currentDatabase = Some(currentDatabase),
comment = comment
)
} else {
CatalogTable(
identifier = name,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = aliasedPlan.schema,
originalSchema = None,
properties = properties,
viewOriginalText = None,
viewText = None,
currentDatabase = Some(currentDatabase),
comment = comment
)
}
}
}

Expand All @@ -244,8 +267,7 @@ case class CreateViewCommand(
* @param name the name of this view.
* @param originalText the original SQL text of this view. Note that we can only alter a view by
* SQL API, which means we always have originalText.
* @param query the logical plan that represents the view; this is used to generate a canonicalized
* version of the SQL that can be saved in the catalog.
* @param query the logical plan that represents the view.
*/
case class AlterViewAsCommand(
name: TableIdentifier,
Expand Down Expand Up @@ -275,20 +297,26 @@ case class AlterViewAsCommand(
throw new AnalysisException(s"${viewMeta.identifier} is not a view.")
}

val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL
// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
session.sql(viewSQL).queryExecution.assertAnalyzed()
val currentDatabase = session.sessionState.catalog.getCurrentDatabase

// Validate the view SQL - make sure we can resolve it with currentDatabase.
val originalSchema = try {
val unresolvedPlan = session.sessionState.sqlParser.parsePlan(originalText)
val resolvedPlan = session.sessionState.analyzer.execute(unresolvedPlan)
session.sessionState.analyzer.checkAnalysis(resolvedPlan)

resolvedPlan.schema
} catch {
case NonFatal(e) =>
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $originalText", e)
}

val updatedViewMeta = viewMeta.copy(
schema = analyzedPlan.schema,
originalSchema = Some(originalSchema),
viewOriginalText = Some(originalText),
viewText = Some(viewSQL))
viewText = Some(originalText),
currentDatabase = Some(currentDatabase))

session.sessionState.catalog.alterTable(updatedViewMeta)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.spark.sql.hive

import scala.util.control.NonFatal

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -32,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.types._


/**
* Legacy catalog for interacting with the Hive metastore.
*
Expand Down Expand Up @@ -114,9 +117,26 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
} else if (table.tableType == CatalogTableType.VIEW) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
val unresolvedPlan = sparkSession.sessionState.sqlParser.parsePlan(viewText).transform {
case u: UnresolvedRelation if u.tableIdentifier.database.isEmpty =>
u.copy(tableIdentifier = TableIdentifier(u.tableIdentifier.table, table.currentDatabase))
}
// Resolve the plan and check whether the analyzed plan is valid.
val resolvedPlan = try {
val resolvedPlan = sparkSession.sessionState.analyzer.execute(unresolvedPlan)
sparkSession.sessionState.analyzer.checkAnalysis(resolvedPlan)

resolvedPlan
} catch {
case NonFatal(e) =>
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewText", e)
}
val planWithProjection = table.originalSchema.map(withProjection(resolvedPlan, _))
.getOrElse(resolvedPlan)

SubqueryAlias(
alias.getOrElse(table.identifier.table),
sparkSession.sessionState.sqlParser.parsePlan(viewText),
aliasColumns(planWithProjection, table.schema.fields),
Option(table.identifier))
} else {
val qualifiedTable =
Expand All @@ -126,6 +146,55 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

/**
* Apply Projection on unresolved logical plan to:
* 1. Omit the columns which are not referenced by the view;
* 2. Reorder the columns to keep the same order with the view;
*/
private def withProjection(plan: LogicalPlan, schema: StructType): LogicalPlan = {
// All fields in schema should exist in plan.schema, or we should throw an AnalysisException
// to notify the underlying schema has been changed.
if (schema.fields.forall { field =>
plan.schema.fields.exists(other => compareStructField(field, other))}) {
val output = schema.fields.map { field =>
plan.output.find { expr =>
expr.name == field.name && expr.dataType == field.dataType}.getOrElse(
throw new AnalysisException("The underlying schema doesn't match the original " +
s"schema, expected ${schema.sql} but got ${plan.schema.sql}")
)}
Project(output, plan)
} else {
throw new AnalysisException("The underlying schema doesn't match the original schema, " +
s"expected ${schema.sql} but got ${plan.schema.sql}")
}
}

/**
* Compare the both [[StructField]] to verify whether they have the same name and dataType.
*/
private def compareStructField(field: StructField, other: StructField): Boolean = {
field.name == other.name && field.dataType == other.dataType
}

/**
* Aliases the schema of the LogicalPlan to the view attribute names
*/
private def aliasColumns(plan: LogicalPlan, fields: Seq[StructField]): LogicalPlan = {
val output = fields.map(field => (field.name, field.getComment))
if (plan.output.size != output.size) {
throw new AnalysisException("The output of plan does not have the same size with the " +
s"view schema, expected ${plan.output.size} but got ${output.mkString("[", ",", "]")}")
} else {
val aliasedOutput = plan.output.zip(output).map {
case (attr, (colName, None)) => Alias(attr, colName)()
case (attr, (colName, Some(colComment))) =>
val meta = new MetadataBuilder().putString("comment", colComment).build()
Alias(attr, colName)(explicitMetadata = Some(meta))
}
Project(aliasedOutput, plan)
}
}

private def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[Path],
Expand Down

0 comments on commit 434009e

Please sign in to comment.