Skip to content

Commit

Permalink
[SPARK-38869][SQL] Respect table capability ACCEPT_ANY_SCHEMA in DEFA…
Browse files Browse the repository at this point in the history
…ULT column resolution

### What changes were proposed in this pull request?

Respect table capability ACCEPT_ANY_SCHEMA in DEFAULT column resolution by leaving such tables unchanged by any DEFAULT column resolution logic.

### Why are the changes needed?

Tables with ACCEPT_ANY_SCHEMA capability declare themselves to accept inserted rows of any schema, so we should not add extra columns to INSERT VALUES lists or projections corresponding to any DEFAULT columns in the target table schema.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

This PR introduces new unit test coverage.

Closes #36475 from dtenedor/default-any-schema.

Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
dtenedor authored and gengliangwang committed May 23, 2022
1 parent 5d5e7f9 commit d27cf97
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -502,12 +502,16 @@ case class ResolveDefaultColumns(
* Returns the schema for the target table of a DML command, looking into the catalog if needed.
*/
private def getSchemaForTargetTable(table: LogicalPlan): Option[StructType] = {
// Check if the target table is already resolved. If so, return the computed schema.
// Note that we use 'collectFirst' to descend past any SubqueryAlias nodes that may be present.
// First find the source relation. Note that we use 'collectFirst' to descend past any
// SubqueryAlias nodes that may be present.
val source: Option[LogicalPlan] = table.collectFirst {
case r: NamedRelation => r
case r: NamedRelation if !r.skipSchemaResolution =>
// Here we only resolve the default columns in the tables that require schema resolution
// during write operations.
r
case r: UnresolvedCatalogRelation => r
}
// Check if the target table is already resolved. If so, return the computed schema.
source.map { r =>
if (r.schema.fields.nonEmpty) {
return Some(r.schema)
Expand All @@ -521,6 +525,7 @@ case class ResolveDefaultColumns(
case Some(r: UnresolvedCatalogRelation) => r.tableMeta.identifier
case _ => return None
}

val lookup: LogicalPlan = try {
catalog.lookupRelation(tableName)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import org.mockito.invocation.InvocationOnMock

import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedDBObjectName, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedDBObjectName, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, Cast, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.connector.FakeV2Provider
Expand Down Expand Up @@ -73,6 +73,7 @@ class PlanResolutionSuite extends AnalysisTest {

private val tableWithAcceptAnySchemaCapability: Table = {
val t = mock(classOf[Table])
when(t.name()).thenReturn("v2TableWithAcceptAnySchemaCapability")
when(t.schema()).thenReturn(new StructType().add("i", "int"))
when(t.capabilities()).thenReturn(Collections.singleton(TableCapability.ACCEPT_ANY_SCHEMA))
t
Expand Down Expand Up @@ -1024,6 +1025,9 @@ class PlanResolutionSuite extends AnalysisTest {
val sql7 = s"UPDATE defaultvalues SET i=DEFAULT, s=DEFAULT"
val sql8 = s"UPDATE $tblName SET name='Robert', age=32 WHERE p=DEFAULT"
val sql9 = s"UPDATE defaultvalues2 SET i=DEFAULT"
// Note: 'i' is the correct column name, but since the table has ACCEPT_ANY_SCHEMA capability,
// DEFAULT column resolution should skip this table.
val sql10 = s"UPDATE v2TableWithAcceptAnySchemaCapability SET i=DEFAULT"

val parsed1 = parseAndResolve(sql1)
val parsed2 = parseAndResolve(sql2)
Expand All @@ -1033,6 +1037,7 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed6 = parseAndResolve(sql6)
val parsed7 = parseAndResolve(sql7, true)
val parsed9 = parseAndResolve(sql9, true)
val parsed10 = parseAndResolve(sql10)

parsed1 match {
case UpdateTable(
Expand Down Expand Up @@ -1146,6 +1151,23 @@ class PlanResolutionSuite extends AnalysisTest {

case _ => fail("Expect UpdateTable, but got:\n" + parsed9.treeString)
}

parsed10 match {
case u: UpdateTable =>
assert(u.assignments.size == 1)
u.assignments(0).key match {
case i: AttributeReference =>
assert(i.name == "i")
}
u.assignments(0).value match {
case d: UnresolvedAttribute =>
assert(d.name == "DEFAULT")
}

case _ =>
fail("Expect UpdateTable, but got:\n" + parsed10.treeString)
}

}

val sql1 = "UPDATE non_existing SET id=1"
Expand Down Expand Up @@ -1178,6 +1200,31 @@ class PlanResolutionSuite extends AnalysisTest {
}
}

test("SPARK-38869 INSERT INTO table with ACCEPT_ANY_SCHEMA capability") {
// Note: 'i' is the correct column name, but since the table has ACCEPT_ANY_SCHEMA capability,
// DEFAULT column resolution should skip this table.
val sql1 = s"INSERT INTO v2TableWithAcceptAnySchemaCapability VALUES(DEFAULT)"
val sql2 = s"INSERT INTO v2TableWithAcceptAnySchemaCapability SELECT DEFAULT"
val parsed1 = parseAndResolve(sql1)
val parsed2 = parseAndResolve(sql2)
parsed1 match {
case InsertIntoStatement(
_, _, _,
UnresolvedInlineTable(_, Seq(Seq(UnresolvedAttribute(Seq("DEFAULT"))))),
_, _) =>

case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
}
parsed2 match {
case InsertIntoStatement(
_, _, _,
Project(Seq(UnresolvedAttribute(Seq("DEFAULT"))), _),
_, _) =>

case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
}
}

test("alter table: alter column") {
Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach {
case (tblName, useV1Command) =>
Expand Down Expand Up @@ -1906,9 +1953,9 @@ class PlanResolutionSuite extends AnalysisTest {
|MERGE INTO v2TableWithAcceptAnySchemaCapability AS target
|USING v2Table AS source
|ON target.i = source.i
|WHEN MATCHED AND (target.s='delete') THEN DELETE
|WHEN MATCHED AND (target.s='delete')THEN DELETE
|WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = source.s
|WHEN NOT MATCHED AND (target.s='insert')
|WHEN NOT MATCHED AND (target.s=DEFAULT)
| THEN INSERT (target.i, target.s) values (source.i, source.s)
""".stripMargin

Expand All @@ -1924,7 +1971,7 @@ class PlanResolutionSuite extends AnalysisTest {
updateAssigns)),
Seq(
InsertAction(
Some(EqualTo(il: UnresolvedAttribute, StringLiteral("insert"))),
Some(EqualTo(il: UnresolvedAttribute, UnresolvedAttribute(Seq("DEFAULT")))),
insertAssigns))) =>
assert(l.name == "target.i" && r.name == "source.i")
assert(dl.name == "target.s")
Expand Down

0 comments on commit d27cf97

Please sign in to comment.