Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37496][SQL] Migrate ReplaceTableAsSelectStatement to v2 command #34754

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c),
orCreate = c.orCreate)

case c @ ReplaceTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.writeOptions,
orCreate = c.orCreate)
}

object NonSessionCatalogAndTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3487,7 +3487,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Replace a table, returning a [[ReplaceTableStatement]] logical plan.
* Replace a table, returning a [[ReplaceTableStatement]] or [[ReplaceTableAsSelect]]
* logical plan.
*
* Expected format:
* {{{
Expand Down Expand Up @@ -3553,9 +3554,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
ctx)

case Some(query) =>
ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
provider, options, location, comment, writeOptions = Map.empty, serdeInfo,
orCreate = orCreate)
val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment,
serdeInfo, false)
ReplaceTableAsSelect(
UnresolvedDBObjectName(table, isNamespace = false),
partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)

case _ =>
// Note: table schema includes both the table columns list and the partition columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,29 +165,6 @@ case class ReplaceTableStatement(
serde: Option[SerdeInfo],
orCreate: Boolean) extends LeafParsedStatement

/**
* A REPLACE TABLE AS SELECT command, as parsed from SQL.
*/
case class ReplaceTableAsSelectStatement(
tableName: Seq[String],
asSelect: LogicalPlan,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
writeOptions: Map[String, String],
serde: Option[SerdeInfo],
orCreate: Boolean) extends UnaryParsedStatement {

override def child: LogicalPlan = asSelect
override protected def withNewChildInternal(
newChild: LogicalPlan): ReplaceTableAsSelectStatement = copy(asSelect = newChild)
}


/**
* Column data as parsed by ALTER TABLE ... (ADD|REPLACE) COLUMNS.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
Expand Down Expand Up @@ -274,16 +273,17 @@ case class ReplaceTable(
* If the table does not exist, and orCreate is false, then an exception will be thrown.
*/
case class ReplaceTableAsSelect(
catalog: TableCatalog,
tableName: Identifier,
name: LogicalPlan,
partitioning: Seq[Transform],
query: LogicalPlan,
properties: Map[String, String],
tableSpec: TableSpec,
writeOptions: Map[String, String],
orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
orCreate: Boolean) extends BinaryCommand with V2CreateTablePlan {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper

override def tableSchema: StructType = query.schema
override def child: LogicalPlan = query
override def left: LogicalPlan = name
override def right: LogicalPlan = query

override lazy val resolved: Boolean = childrenResolved && {
// the table schema is created from the query schema, so the only resolution needed is to check
Expand All @@ -292,12 +292,19 @@ case class ReplaceTableAsSelect(
references.map(_.fieldNames).forall(query.schema.findNestedField(_).isDefined)
}

override def tableName: Identifier = {
assert(name.resolved)
name.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
}

override protected def withNewChildrenInternal(
newLeft: LogicalPlan,
newRight: LogicalPlan): LogicalPlan =
copy(name = newLeft, query = newRight)

override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
this.copy(partitioning = rewritten)
}

override protected def withNewChildInternal(newChild: LogicalPlan): ReplaceTableAsSelect =
copy(query = newChild)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Collections
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableStatement, SerdeInfo, TableSpec}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
Expand Down Expand Up @@ -314,11 +314,13 @@ private[sql] object CatalogV2Util {
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
}

def convertTableProperties(r: ReplaceTableAsSelectStatement): Map[String, String] = {
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
def convertTableProperties(t: TableSpec): Map[String, String] = {
val props = convertTableProperties(
t.properties, t.options, t.serde, t.location, t.comment, t.provider, t.external)
withDefaultOwnership(props)
}

def convertTableProperties(
private def convertTableProperties(
properties: Map[String, String],
options: Map[String, String],
serdeInfo: Option[SerdeInfo],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ class DDLParserSuite extends AnalysisTest {
case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" =>
assert(ctas.ifNotExists == expectedIfNotExists)
case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
case replace: ReplaceTableAsSelectStatement if newTableToken == "REPLACE" =>
case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" =>
case other =>
fail("First token in statement does not match the expected parsed plan; CREATE TABLE" +
" should create a CreateTableStatement, and REPLACE TABLE should create a" +
Expand Down Expand Up @@ -2323,18 +2323,18 @@ class DDLParserSuite extends AnalysisTest {
ctas.comment,
ctas.serde,
ctas.external)
case rtas: ReplaceTableAsSelectStatement =>
case rtas: ReplaceTableAsSelect =>
TableSpec(
rtas.tableName,
Some(rtas.asSelect).filter(_.resolved).map(_.schema),
rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
Some(rtas.query).filter(_.resolved).map(_.schema),
rtas.partitioning,
rtas.bucketSpec,
rtas.properties,
rtas.provider,
rtas.options,
rtas.location,
rtas.comment,
rtas.serde)
rtas.tableSpec.bucketSpec,
rtas.tableSpec.properties,
rtas.tableSpec.provider,
rtas.tableSpec.options,
rtas.tableSpec.location,
rtas.tableSpec.comment,
rtas.tableSpec.serde)
case other =>
fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" +
s" from query, got ${other.getClass.getName}.")
Expand Down
31 changes: 17 additions & 14 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import scala.collection.JavaConverters._

import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
Expand Down Expand Up @@ -586,19 +586,22 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap)

case (SaveMode.Overwrite, _) =>
ReplaceTableAsSelectStatement(
nameParts,
df.queryExecution.analyzed,
val tableSpec = TableSpec(
bucketSpec = None,
properties = Map.empty,
provider = Some(source),
options = Map.empty,
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
external = false)
ReplaceTableAsSelect(
UnresolvedDBObjectName(nameParts, isNamespace = false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we prepend catalog.name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need? I stepped into one of the test, the nameParts here is "testcat", "ns", "t", so it already contains catalog.name.

partitioningAsV2,
None,
Map.empty,
Some(source),
Map.empty,
extraOptions.get("path"),
extraOptions.get(TableCatalog.PROP_COMMENT),
extraOptions.toMap,
None,
orCreate = true) // Create the table if it doesn't exist
df.queryExecution.analyzed,
tableSpec,
writeOptions = Map.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug here. Previously we pass extraOptions.toMap as the write options, now we don't. @huaxingao can you help to fix it with a test case? thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the bug. Will fix.

orCreate = true) // Create the table if it doesn't exist

case (other, _) =>
// We have a potential race condition here in AppendMode, if the table suddenly gets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.IntegerType
Expand Down Expand Up @@ -195,20 +195,22 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
}

private def internalReplace(orCreate: Boolean): Unit = {
runCommand(
ReplaceTableAsSelectStatement(
tableName,
logicalPlan,
partitioning.getOrElse(Seq.empty),
None,
properties.toMap,
provider,
Map.empty,
None,
None,
options.toMap,
None,
orCreate = orCreate))
val tableSpec = TableSpec(
bucketSpec = None,
properties = properties.toMap,
provider = provider,
options = Map.empty,
location = None,
comment = None,
serde = None,
external = false)
runCommand(ReplaceTableAsSelect(
UnresolvedDBObjectName(tableName, isNamespace = false),
partitioning.getOrElse(Seq.empty),
logicalPlan,
tableSpec,
writeOptions = options.toMap,
orCreate = orCreate))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable => CatalystCreateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
Expand Down Expand Up @@ -144,7 +143,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ CatalystCreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
case c @ CreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider,
c.tableSpec.options,
Expand All @@ -157,7 +156,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c.tableSpec.location, c.tableSpec.comment, storageFormat,
c.tableSpec.external)
val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, None)
CreateTableV1(tableDesc, mode, None)
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
Expand All @@ -173,7 +172,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c.partitioning, c.bucketSpec, c.properties, provider, c.location,
c.comment, storageFormat, c.external)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, Some(c.asSelect))
CreateTableV1(tableDesc, mode, Some(c.asSelect))
} else {
CreateTableAsSelect(
catalog.asTableCatalog,
Expand Down Expand Up @@ -210,21 +209,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
orCreate = c.orCreate)
}

case c @ ReplaceTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _)
if isSessionCatalog(catalog) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError
} else {
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.writeOptions,
orCreate = c.orCreate)
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
tableSpec = newTableSpec)
}

case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,7 @@ case class CreateTableExec(
ignoreIfExists: Boolean) extends LeafV2CommandExec {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val tableProperties = {
val props = CatalogV2Util.convertTableProperties(
tableSpec.properties, tableSpec.options, tableSpec.serde,
tableSpec.location, tableSpec.comment, tableSpec.provider,
tableSpec.external)
CatalogV2Util.withDefaultOwnership(props)
}
val tableProperties = CatalogV2Util.convertTableProperties(tableSpec)

override protected def run(): Seq[InternalRow] = {
if (!catalog.tableExists(identifier)) {
Expand Down
Loading