Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36902][SQL] Migrate CreateTableAsSelectStatement to v2 command #34667

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) =>
ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name())

case c @ CreateTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.writeOptions,
ignoreIfExists = c.ifNotExists)

case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
ReplaceTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3410,7 +3410,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Create a table, returning a [[CreateTable]] or [[CreateTableAsSelectStatement]] logical plan.
* Create a table, returning a [[CreateTable]] or [[CreateTableAsSelect]] logical plan.
*
* Expected format:
* {{{
Expand Down Expand Up @@ -3470,9 +3470,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
ctx)

case Some(query) =>
CreateTableAsSelectStatement(
table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists)
val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment,
serdeInfo, external)
CreateTableAsSelect(
UnresolvedDBObjectName(table, isNamespace = false),
partitioning, query, tableSpec, Map.empty, ifNotExists)

case _ =>
// Note: table schema includes both the table columns list and the partition columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,6 @@ object SerdeInfo {
}
}

/**
* A CREATE TABLE AS SELECT command, as parsed from SQL.
*/
case class CreateTableAsSelectStatement(
tableName: Seq[String],
asSelect: LogicalPlan,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
writeOptions: Map[String, String],
serde: Option[SerdeInfo],
external: Boolean,
ifNotExists: Boolean) extends UnaryParsedStatement {

override def child: LogicalPlan = asSelect
override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelectStatement =
copy(asSelect = newChild)
}

/**
* A REPLACE TABLE command, as parsed from SQL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,22 @@ case class CreateTable(
* Create a new table from a select query with a v2 catalog.
*/
case class CreateTableAsSelect(
catalog: TableCatalog,
tableName: Identifier,
name: LogicalPlan,
partitioning: Seq[Transform],
query: LogicalPlan,
properties: Map[String, String],
tableSpec: TableSpec,
writeOptions: Map[String, String],
ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
ignoreIfExists: Boolean) extends BinaryCommand with V2CreateTablePlan {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper

override def tableSchema: StructType = query.schema
override def child: LogicalPlan = query
override def left: LogicalPlan = name
override def right: LogicalPlan = query

override def tableName: Identifier = {
assert(left.resolved)
left.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
}

override lazy val resolved: Boolean = childrenResolved && {
// the table schema is created from the query schema, so the only resolution needed is to check
Expand All @@ -242,8 +248,11 @@ case class CreateTableAsSelect(
this.copy(partitioning = rewritten)
}

override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelect =
copy(query = newChild)
override protected def withNewChildrenInternal(
newLeft: LogicalPlan,
newRight: LogicalPlan
): CreateTableAsSelect =
copy(name = newLeft, query = newRight)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Collections
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableStatement, SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.plans.logical.{ReplaceTableStatement, SerdeInfo, TableSpec}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
Expand Down Expand Up @@ -305,11 +305,6 @@ private[sql] object CatalogV2Util {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
}

def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = {
convertTableProperties(
c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external)
}

def convertTableProperties(r: ReplaceTableStatement): Map[String, String] = {
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ package org.apache.spark.sql.catalyst.analysis
import java.util

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode}
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, Table, TableCapability, TableCatalog}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, TableSpec}
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.expressions.Expressions
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class CreateTablePartitioningValidationSuite extends AnalysisTest {
import CreateTablePartitioningValidationSuite._

test("CreateTableAsSelect: fail missing top-level column") {
val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
catalog,
Identifier.of(Array(), "table_name"),
UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
Expressions.bucket(4, "does_not_exist") :: Nil,
TestRelation2,
Map.empty,
tableSpec,
Map.empty,
ignoreIfExists = false)

Expand All @@ -46,12 +46,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}

test("CreateTableAsSelect: fail missing top-level column nested reference") {
val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
catalog,
Identifier.of(Array(), "table_name"),
UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
Expressions.bucket(4, "does_not_exist.z") :: Nil,
TestRelation2,
Map.empty,
tableSpec,
Map.empty,
ignoreIfExists = false)

Expand All @@ -62,12 +63,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}

test("CreateTableAsSelect: fail missing nested column") {
val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
catalog,
Identifier.of(Array(), "table_name"),
UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
Expressions.bucket(4, "point.z") :: Nil,
TestRelation2,
Map.empty,
tableSpec,
Map.empty,
ignoreIfExists = false)

Expand All @@ -78,12 +80,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}

test("CreateTableAsSelect: fail with multiple errors") {
val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
catalog,
Identifier.of(Array(), "table_name"),
UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
Expressions.bucket(4, "does_not_exist", "point.z") :: Nil,
TestRelation2,
Map.empty,
tableSpec,
Map.empty,
ignoreIfExists = false)

Expand All @@ -95,38 +98,41 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}

test("CreateTableAsSelect: success with top-level column") {
val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
catalog,
Identifier.of(Array(), "table_name"),
UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
Expressions.bucket(4, "id") :: Nil,
TestRelation2,
Map.empty,
tableSpec,
Map.empty,
ignoreIfExists = false)

assertAnalysisSuccess(plan)
}

test("CreateTableAsSelect: success using nested column") {
val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
catalog,
Identifier.of(Array(), "table_name"),
UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
Expressions.bucket(4, "point.x") :: Nil,
TestRelation2,
Map.empty,
tableSpec,
Map.empty,
ignoreIfExists = false)

assertAnalysisSuccess(plan)
}

test("CreateTableAsSelect: success using complex column") {
val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
catalog,
Identifier.of(Array(), "table_name"),
UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
Expressions.bucket(4, "point") :: Nil,
TestRelation2,
Map.empty,
tableSpec,
Map.empty,
ignoreIfExists = false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,8 @@ class DDLParserSuite extends AnalysisTest {
parsedPlan match {
case create: CreateTable if newTableToken == "CREATE" =>
assert(create.ignoreIfExists == expectedIfNotExists)
case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" =>
assert(ctas.ifNotExists == expectedIfNotExists)
case ctas: CreateTableAsSelect if newTableToken == "CREATE" =>
assert(ctas.ignoreIfExists == expectedIfNotExists)
case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" =>
case other =>
Expand Down Expand Up @@ -2310,19 +2310,19 @@ class DDLParserSuite extends AnalysisTest {
replace.location,
replace.comment,
replace.serde)
case ctas: CreateTableAsSelectStatement =>
case ctas: CreateTableAsSelect =>
TableSpec(
ctas.tableName,
Some(ctas.asSelect).filter(_.resolved).map(_.schema),
ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
Some(ctas.query).filter(_.resolved).map(_.schema),
ctas.partitioning,
ctas.bucketSpec,
ctas.properties,
ctas.provider,
ctas.options,
ctas.location,
ctas.comment,
ctas.serde,
ctas.external)
ctas.tableSpec.bucketSpec,
ctas.tableSpec.properties,
ctas.tableSpec.provider,
ctas.tableSpec.options,
ctas.tableSpec.location,
ctas.tableSpec.comment,
ctas.tableSpec.serde,
ctas.tableSpec.external)
case rtas: ReplaceTableAsSelect =>
TableSpec(
rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
Expand Down
47 changes: 30 additions & 17 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
Expand Down Expand Up @@ -327,14 +327,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
supportsExtract, catalogManager, dsOptions)

val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _)

val tableSpec = TableSpec(
bucketSpec = None,
properties = Map(TableCatalog.PROP_PROVIDER -> source) ++ location,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not include the location property. We already specify it in TableSpec.location

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we should put Map.empty here. provide is also specified in TableSpec.provider

provider = Some(source),
options = Map.empty,
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
external = false)
runCommand(df.sparkSession) {
CreateTableAsSelect(
catalog,
ident,
UnresolvedDBObjectName(
catalog.name :: ident.name :: Nil,
isNamespace = false
),
partitioningAsV2,
df.queryExecution.analyzed,
Map(TableCatalog.PROP_PROVIDER -> source) ++ location,
tableSpec,
finalOptions,
ignoreIfExists = createMode == SaveMode.Ignore)
}
Expand Down Expand Up @@ -607,20 +617,23 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// We have a potential race condition here in AppendMode, if the table suddenly gets
// created between our existence check and physical execution, but this can't be helped
// in any case.
CreateTableAsSelectStatement(
nameParts,
df.queryExecution.analyzed,
val tableSpec = TableSpec(
bucketSpec = None,
properties = Map.empty,
provider = Some(source),
options = Map.empty,
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
external = false)

CreateTableAsSelect(
UnresolvedDBObjectName(nameParts, isNamespace = false),
partitioningAsV2,
None,
Map.empty,
Some(source),
df.queryExecution.analyzed,
tableSpec,
Map.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao can you fix this similar bug as well when you open the PR?

extraOptions.get("path"),
extraOptions.get(TableCatalog.PROP_COMMENT),
extraOptions.toMap,
None,
ifNotExists = other == SaveMode.Ignore,
external = false)
other == SaveMode.Ignore)
}

runCommand(df.sparkSession) {
Expand Down
Loading