Skip to content

Commit

Permalink
[SPARK-34561][SQL] Fix drop/add columns from/to a dataset of v2 `DESC…
Browse files Browse the repository at this point in the history
…RIBE TABLE`

### What changes were proposed in this pull request?
In the PR, I propose to generate "stable" output attributes per the logical node of the `DESCRIBE TABLE` command.

### Why are the changes needed?
This fixes the issue demonstrated by the example:
```scala
val tbl = "testcat.ns1.ns2.tbl"
sql(s"CREATE TABLE $tbl (c0 INT) USING _")
val description = sql(s"DESCRIBE TABLE $tbl")
description.drop("comment")
```
The `drop()` method fails with the error:
```
org.apache.spark.sql.AnalysisException: Resolved attribute(s) col_name#102,data_type#103 missing from col_name#29,data_type#30,comment#31 in operator !Project [col_name#102, data_type#103]. Attribute(s) with the same name appear in the operation: col_name,data_type. Please check if the right attribute(s) are used.;
!Project [col_name#102, data_type#103]
+- LocalRelation [col_name#29, data_type#30, comment#31]

	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:51)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:50)
```

### Does this PR introduce _any_ user-facing change?
Yes. After the changes, `drop()`/`add()` works as expected:
```scala
description.drop("comment").show()
+---------------+---------+
|       col_name|data_type|
+---------------+---------+
|             c0|      int|
|               |         |
| # Partitioning|         |
|Not partitioned|         |
+---------------+---------+
```

### How was this patch tested?
1. Run new test:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2SQLSuite"
```
2. Run existing test suite:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CatalogedDDLSuite"
```

Closes #31676 from MaxGekk/describe-table-drop-column.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and cloud-fan committed Mar 1, 2021
1 parent a6cc5e6 commit 984ff39
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 11 deletions.
Expand Up @@ -345,9 +345,13 @@ object ShowNamespaces {
case class DescribeRelation(
relation: LogicalPlan,
partitionSpec: TablePartitionSpec,
isExtended: Boolean) extends Command {
isExtended: Boolean,
override val output: Seq[Attribute] = DescribeRelation.getOutputAttrs) extends Command {
override def children: Seq[LogicalPlan] = Seq(relation)
override def output: Seq[Attribute] = DescribeCommandSchema.describeTableAttributes()
}

object DescribeRelation {
def getOutputAttrs: Seq[Attribute] = DescribeCommandSchema.describeTableAttributes()
}

/**
Expand Down
Expand Up @@ -200,7 +200,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AlterTableRenameCommand(oldName.asTableIdentifier, newName.asTableIdentifier, isView)

// Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet.
case DescribeRelation(ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended) =>
case DescribeRelation(ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended, _) =>
DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended)

case DescribeColumn(ResolvedViewIdentifier(ident), column: UnresolvedAttribute, isExtended) =>
Expand Down
Expand Up @@ -270,11 +270,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case desc @ DescribeNamespace(ResolvedNamespace(catalog, ns), extended) =>
DescribeNamespaceExec(desc.output, catalog.asNamespaceCatalog, ns, extended) :: Nil

case desc @ DescribeRelation(r: ResolvedTable, partitionSpec, isExtended) =>
case DescribeRelation(r: ResolvedTable, partitionSpec, isExtended, output) =>
if (partitionSpec.nonEmpty) {
throw new AnalysisException("DESCRIBE does not support partition for v2 tables.")
}
DescribeTableExec(desc.output, r.table, isExtended) :: Nil
DescribeTableExec(output, r.table, isExtended) :: Nil

case desc @ DescribeColumn(_: ResolvedTable, column, isExtended) =>
column match {
Expand Down
Expand Up @@ -539,7 +539,7 @@ EXPLAIN EXTENDED DESC t
struct<plan:string>
-- !query output
== Parsed Logical Plan ==
'DescribeRelation false
'DescribeRelation false, [col_name#x, data_type#x, comment#x]
+- 'UnresolvedTableOrView [t], DESCRIBE TABLE, true

== Analyzed Logical Plan ==
Expand Down
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION}
import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources.SimpleScanSource
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{BooleanType, LongType, MetadataBuilder, StringType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -2576,6 +2576,30 @@ class DataSourceV2SQLSuite
}
}

test("SPARK-34561: drop/add columns to a dataset of `DESCRIBE TABLE`") {
val tbl = s"${catalogAndNamespace}tbl"
withTable(tbl) {
sql(s"CREATE TABLE $tbl (c0 INT) USING $v2Format")
val description = sql(s"DESCRIBE TABLE $tbl")
val noCommentDataset = description.drop("comment")
val expectedSchema = new StructType()
.add(
name = "col_name",
dataType = StringType,
nullable = false,
metadata = new MetadataBuilder().putString("comment", "name of the column").build())
.add(
name = "data_type",
dataType = StringType,
nullable = false,
metadata = new MetadataBuilder().putString("comment", "data type of the column").build())
assert(noCommentDataset.schema === expectedSchema)
val isNullDataset = noCommentDataset
.withColumn("is_null", noCommentDataset("col_name").isNull)
assert(isNullDataset.schema === expectedSchema.add("is_null", BooleanType, false))
}
}

private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
Expand Down
Expand Up @@ -875,13 +875,13 @@ class PlanResolutionSuite extends AnalysisTest {
comparePlans(parsed2, expected2)
} else {
parsed1 match {
case DescribeRelation(_: ResolvedTable, _, isExtended) =>
case DescribeRelation(_: ResolvedTable, _, isExtended, _) =>
assert(!isExtended)
case _ => fail("Expect DescribeTable, but got:\n" + parsed1.treeString)
}

parsed2 match {
case DescribeRelation(_: ResolvedTable, _, isExtended) =>
case DescribeRelation(_: ResolvedTable, _, isExtended, _) =>
assert(isExtended)
case _ => fail("Expect DescribeTable, but got:\n" + parsed2.treeString)
}
Expand All @@ -895,7 +895,7 @@ class PlanResolutionSuite extends AnalysisTest {
comparePlans(parsed3, expected3)
} else {
parsed3 match {
case DescribeRelation(_: ResolvedTable, partitionSpec, isExtended) =>
case DescribeRelation(_: ResolvedTable, partitionSpec, isExtended, _) =>
assert(!isExtended)
assert(partitionSpec == Map("a" -> "1"))
case _ => fail("Expect DescribeTable, but got:\n" + parsed2.treeString)
Expand Down Expand Up @@ -1198,7 +1198,7 @@ class PlanResolutionSuite extends AnalysisTest {
case AppendData(r: DataSourceV2Relation, _, _, _, _) =>
assert(r.catalog.exists(_ == catalogIdent))
assert(r.identifier.exists(_.name() == tableIdent))
case DescribeRelation(r: ResolvedTable, _, _) =>
case DescribeRelation(r: ResolvedTable, _, _, _) =>
assert(r.catalog == catalogIdent)
assert(r.identifier.name() == tableIdent)
case ShowTableProperties(r: ResolvedTable, _, _) =>
Expand Down

0 comments on commit 984ff39

Please sign in to comment.