Skip to content

Commit

Permalink
[Spark] Rewrite DESCRIBE HISTORY to use Spark Table Resolution
Browse files Browse the repository at this point in the history
This PR rewrites the Delta DESCRIBE HISTORY command to use Spark's table resolution logic instead of resolving the target table manually at command execution time. For that, it changes DescribeDeltaHistory to a UnaryNode that takes either a UnresolvedTable or UnresolvedPathBasedDeltaTable as a child plan node, which will be resolved by Spark. Once resolved, the DescribeDeltaHistory node is transformed to an actual runnable command (DescribeDeltaHistoryCommand) in DeltaAnalysis. The resolved table is passed to the command in the form of a DeltaTableV2.

This is mainly a refactor and the existing DescribeDeltaHistory suite already contains a large set of tests, which this PR relies on. The PR also updates the DeltaSqlParserSuite to check that commands are correctly parsed into a DescribeDeltaHistory.

No

Closes #2090

GitOrigin-RevId: 75eb8c8ea06350612b8b51fc6a88e11845e21b92
  • Loading branch information
LukasRupprecht authored and vkorukanti committed Sep 25, 2023
1 parent 6f08187 commit 4eb177e
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 56 deletions.
5 changes: 2 additions & 3 deletions spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Expand Up @@ -380,11 +380,10 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {

override def visitDescribeDeltaHistory(
ctx: DescribeDeltaHistoryContext): LogicalPlan = withOrigin(ctx) {
DescribeDeltaHistoryCommand(
DescribeDeltaHistory(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.limit).map(_.getText.toInt),
Map.empty)
Option(ctx.limit).map(_.getText.toInt))
}

override def visitGenerate(ctx: GenerateContext): LogicalPlan = withOrigin(ctx) {
Expand Down
Expand Up @@ -434,6 +434,7 @@ class DeltaAnalysis(session: SparkSession)
}
DataSourceV2Relation.create(table, None, Some(u.identifier), u.options)

case d: DescribeDeltaHistory if d.childrenResolved => d.toCommand

// This rule falls back to V1 nodes, since we don't have a V2 reader for Delta right now
case dsv2 @ DataSourceV2Relation(d: DeltaTableV2, _, _, _, options) =>
Expand Down
Expand Up @@ -17,85 +17,101 @@
package org.apache.spark.sql.delta.commands

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{DeltaErrors, DeltaHistory, DeltaLog, DeltaTableIdentifier}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaHistory, DeltaLog, DeltaTableIdentifier, UnresolvedDeltaPathOrIdentifier, UnresolvedPathBasedDeltaTable}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedTable}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, UnaryNode}
import org.apache.spark.sql.execution.command.LeafRunnableCommand

object DescribeDeltaHistory {
/**
* Alternate constructor that converts a provided path or table identifier into the
* correct child LogicalPlan node. If both path and tableIdentifier are specified (or
* if both are None), this method will throw an exception. If a table identifier is
* specified, the child LogicalPlan will be an [[UnresolvedTable]] whereas if a path
* is specified, it will be an [[UnresolvedPathBasedDeltaTable]].
*
* Note that the returned command will have an *unresolved* child table and hence, the command
* needs to be analyzed before it can be executed.
*/
def apply(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
limit: Option[Int]): DescribeDeltaHistory = {
val plan = UnresolvedDeltaPathOrIdentifier(path, tableIdentifier, COMMAND_NAME)
DescribeDeltaHistory(plan, limit)
}

val COMMAND_NAME = "DESCRIBE HISTORY"
}

/**
* A logical placeholder for describing a Delta table's history, so that the history can be
* leveraged in subqueries. Replaced with `DescribeDeltaHistoryCommand` during planning.
*
* @param options: Hadoop file system options used for read and write.
*/
case class DescribeDeltaHistory(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
override val child: LogicalPlan,
limit: Option[Int],
options: Map[String, String],
output: Seq[Attribute] = ExpressionEncoder[DeltaHistory]().schema.toAttributes)
extends LeafNode with MultiInstanceRelation {
override def computeStats(): Statistics = Statistics(sizeInBytes = conf.defaultSizeInBytes)
override val output: Seq[Attribute] = ExpressionEncoder[DeltaHistory]().schema.toAttributes)
extends UnaryNode
with MultiInstanceRelation
with DeltaCommand {

override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))

override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(child = newChild)

/**
* Define this operator as having no attributes provided by children in order to prevent column
* pruning from trying to insert projections above the source relation.
*/
override lazy val references: AttributeSet = AttributeSet.empty
override def inputSet: AttributeSet = AttributeSet.empty
assert(!child.isInstanceOf[Project],
s"The child operator of DescribeDeltaHistory must not contain any projection: $child")

/** Converts this operator into an executable command. */
def toCommand: DescribeDeltaHistoryCommand = {
// Max array size
if (limit.exists(_ > Int.MaxValue - 8)) {
throw DeltaErrors.maxArraySizeExceeded()
}
val deltaTableV2: DeltaTableV2 = getDeltaTable(child, DescribeDeltaHistory.COMMAND_NAME)
DescribeDeltaHistoryCommand(table = deltaTableV2, limit = limit, output = output)
}
}

/**
* A command for describing the history of a Delta table.
*
* @param options: Hadoop file system options used for read and write.
*/
case class DescribeDeltaHistoryCommand(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
@transient table: DeltaTableV2,
limit: Option[Int],
options: Map[String, String],
override val output: Seq[Attribute] = ExpressionEncoder[DeltaHistory]().schema.toAttributes)
extends LeafRunnableCommand with DeltaLogging {

override def run(sparkSession: SparkSession): Seq[Row] = {
val basePath =
if (path.nonEmpty) {
new Path(path.get)
} else if (tableIdentifier.nonEmpty) {
val sessionCatalog = sparkSession.sessionState.catalog
lazy val metadata = sessionCatalog.getTableMetadata(tableIdentifier.get)
extends LeafRunnableCommand
with MultiInstanceRelation
with DeltaLogging {

DeltaTableIdentifier(sparkSession, tableIdentifier.get) match {
case Some(id) if id.path.nonEmpty =>
new Path(id.path.get)
case Some(id) if id.table.nonEmpty =>
new Path(metadata.location)
case _ =>
val isView = metadata.tableType == CatalogTableType.VIEW
if (isView) {
throw DeltaErrors.describeViewHistory
}
throw DeltaErrors.notADeltaTableException("DESCRIBE HISTORY")
}
} else {
throw DeltaErrors.missingTableIdentifierException("DESCRIBE HISTORY")
}

// Max array size
if (limit.exists(_ > Int.MaxValue - 8)) {
throw DeltaErrors.maxArraySizeExceeded()
}
override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))

val deltaLog = DeltaLog.forTable(sparkSession, basePath, options)
override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = table.deltaLog
recordDeltaOperation(deltaLog, "delta.ddl.describeHistory") {
if (!deltaLog.tableExists) {
throw DeltaErrors.notADeltaTableException("DESCRIBE HISTORY")
throw DeltaErrors.notADeltaTableException(
DescribeDeltaHistory.COMMAND_NAME,
DeltaTableIdentifier(path = Some(table.path.toString))
)
}

import org.apache.spark.sql.delta.implicits._
val commits = deltaLog.history.getHistory(limit)
sparkSession.implicits.localSeqToDatasetHolder(commits).toDF().collect().toSeq
Expand Down
Expand Up @@ -21,7 +21,7 @@ import io.delta.tables.execution.VacuumTableCommand
import org.apache.spark.sql.delta.CloneTableSQLTestUtils
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.{UnresolvedPathBasedDeltaTable, UnresolvedPathBasedTable}
import org.apache.spark.sql.delta.commands.{DescribeDeltaDetailCommand, OptimizeTableCommand, DeltaReorgTable}
import org.apache.spark.sql.delta.commands.{DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable}
Expand Down Expand Up @@ -189,6 +189,19 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
Map.empty))
}

test("DESCRIBE HISTORY command is parsed as expected") {
val parser = new DeltaSqlParser(null)
var parsedCmd = parser.parsePlan("DESCRIBE HISTORY catalog_foo.db.tbl")
assert(parsedCmd.asInstanceOf[DescribeDeltaHistory].child ===
UnresolvedTable(Seq("catalog_foo", "db", "tbl"), DescribeDeltaHistory.COMMAND_NAME, None))
parsedCmd = parser.parsePlan("DESCRIBE HISTORY delta.`/path/to/tbl`")
assert(parsedCmd.asInstanceOf[DescribeDeltaHistory].child ===
UnresolvedTable(Seq("delta", "/path/to/tbl"), DescribeDeltaHistory.COMMAND_NAME, None))
parsedCmd = parser.parsePlan("DESCRIBE HISTORY '/path/to/tbl'")
assert(parsedCmd.asInstanceOf[DescribeDeltaHistory].child ===
UnresolvedPathBasedDeltaTable("/path/to/tbl", DescribeDeltaHistory.COMMAND_NAME))
}

private def targetPlanForTable(tableParts: String*): UnresolvedTable =
UnresolvedTable(tableParts.toSeq, "REORG", relationTypeMismatchHint = None)

Expand Down
Expand Up @@ -224,7 +224,8 @@ trait DescribeDeltaHistorySuiteBase
val e = intercept[AnalysisException] {
sql(s"DESCRIBE HISTORY $viewName").collect()
}
assert(e.getMessage.contains("history of a view"))
assert(e.getMessage.contains("spark_catalog.default.delta_view is a view. " +
"'DESCRIBE HISTORY' expects a table"))
}
}

Expand All @@ -237,8 +238,7 @@ trait DescribeDeltaHistorySuiteBase
val e = intercept[AnalysisException] {
sql(s"DESCRIBE HISTORY $viewName").collect()
}
assert(e.getMessage.contains("not found") ||
e.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND"))
assert(e.getMessage.contains("v is a temp view. 'DESCRIBE HISTORY' expects a table"))
}
}

Expand Down Expand Up @@ -579,7 +579,7 @@ trait DescribeDeltaHistorySuiteBase
val e = intercept[AnalysisException] {
sql(s"describe history $table").show()
}
Seq("DESCRIBE HISTORY", "only supported for Delta tables").foreach { msg =>
Seq("is not a Delta table").foreach { msg =>
assert(e.getMessage.contains(msg))
}
}
Expand Down

0 comments on commit 4eb177e

Please sign in to comment.