From 84241e0ab1c35a5333447202573a8eea8244660e Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 16 Dec 2020 17:24:08 +0200 Subject: [PATCH 1/2] [SPARK-33808][SQL] DataSource V2: Build logical writes in the optimizer Lead-authored-by: Anton Okolnychyi Co-authored-by: Ryan Blue --- .../connector/catalog/TableCapability.java | 2 +- .../catalyst/plans/logical/v2Commands.scala | 10 +- .../spark/sql/connector/write/V1Write.java | 33 +++++++ .../sql/connector/write/V1WriteBuilder.java | 45 --------- .../spark/sql/execution/SparkOptimizer.scala | 7 +- .../datasources/v2/DataSourceV2Strategy.scala | 56 ++++++----- .../datasources/v2/TableCapabilityCheck.scala | 6 +- .../datasources/v2/V1FallbackWriters.scala | 66 +++---------- .../execution/datasources/v2/V2Writes.scala | 95 +++++++++++++++++++ .../v2/WriteToDataSourceV2Exec.scala | 72 +++++--------- .../v2/jdbc/JDBCWriteBuilder.scala | 6 +- .../sql/connector/V1WriteFallbackSuite.scala | 12 +-- .../command/PlanResolutionSuite.scala | 2 +- 13 files changed, 221 insertions(+), 191 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/connector/write/V1Write.java delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/connector/write/V1WriteBuilder.java create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java index 68161d7225fcf..5bb42fb4b313d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java @@ -96,7 +96,7 @@ public enum TableCapability { /** * Signals that the table supports append writes using the V1 InsertableRelation interface. *

- * Tables that return this capability must create a V1WriteBuilder and may also support additional + * Tables that return this capability must create a V1Write and may also support additional * write modes, like {@link #TRUNCATE}, and {@link #OVERWRITE_BY_FILTER}, but cannot support * {@link #OVERWRITE_DYNAMIC}. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 87d81d5330574..a24ec585909e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.write.Write import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType} /** @@ -65,7 +66,8 @@ case class AppendData( table: NamedRelation, query: LogicalPlan, writeOptions: Map[String, String], - isByName: Boolean) extends V2WriteCommand { + isByName: Boolean, + write: Option[Write] = None) extends V2WriteCommand { override def withNewQuery(newQuery: LogicalPlan): AppendData = copy(query = newQuery) override def withNewTable(newTable: NamedRelation): AppendData = copy(table = newTable) } @@ -94,7 +96,8 @@ case class OverwriteByExpression( deleteExpr: Expression, query: LogicalPlan, writeOptions: Map[String, String], - isByName: Boolean) extends V2WriteCommand { + isByName: Boolean, + write: Option[Write] = None) extends V2WriteCommand { override lazy val resolved: Boolean = { table.resolved && query.resolved && outputResolved && deleteExpr.resolved } @@ -132,7 +135,8 @@ case class OverwritePartitionsDynamic( table: NamedRelation, query: LogicalPlan, writeOptions: Map[String, String], - isByName: Boolean) extends V2WriteCommand { + isByName: Boolean, + write: Option[Write] = None) extends V2WriteCommand { override def withNewQuery(newQuery: LogicalPlan): OverwritePartitionsDynamic = { copy(query = newQuery) } diff --git a/sql/core/src/main/java/org/apache/spark/sql/connector/write/V1Write.java b/sql/core/src/main/java/org/apache/spark/sql/connector/write/V1Write.java new file mode 100644 index 0000000000000..a299967ee8bcf --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/connector/write/V1Write.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.write; + +import org.apache.spark.annotation.Unstable; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.sources.InsertableRelation; + +/** + * A logical write that should be executed using V1 InsertableRelation interface. + *

+ * Tables that have {@link TableCapability#V1_BATCH_WRITE} in the list of their capabilities + * must build {@link V1Write}. + */ +@Unstable +public interface V1Write extends Write { + InsertableRelation toInsertableRelation(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/connector/write/V1WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/connector/write/V1WriteBuilder.java deleted file mode 100644 index 89b567b5231ac..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/connector/write/V1WriteBuilder.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.write; - -import org.apache.spark.annotation.Unstable; -import org.apache.spark.sql.sources.InsertableRelation; - -/** - * A trait that should be implemented by V1 DataSources that would like to leverage the DataSource - * V2 write code paths. The InsertableRelation will be used only to Append data. Other - * instances of the [[WriteBuilder]] interface such as [[SupportsOverwrite]], [[SupportsTruncate]] - * should be extended as well to support additional operations other than data appends. - * - * This interface is designed to provide Spark DataSources time to migrate to DataSource V2 and - * will be removed in a future Spark release. - * - * @since 3.0.0 - */ -@Unstable -public interface V1WriteBuilder extends WriteBuilder { - /** - * Creates an InsertableRelation that allows appending a DataFrame to a - * a destination (using data source-specific parameters). The insert method will only be - * called with `overwrite=false`. The DataSource should implement the overwrite behavior as - * part of the [[SupportsOverwrite]], and [[SupportsTruncate]] interfaces. - * - * @since 3.0.0 - */ - InsertableRelation buildForV1Write(); -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 33b86a2b5340c..dde5dc2be0556 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions import org.apache.spark.sql.execution.datasources.SchemaPruning -import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown +import org.apache.spark.sql.execution.datasources.v2.{V2ScanRelationPushDown, V2Writes} import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning} import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs} @@ -37,7 +37,7 @@ class SparkOptimizer( override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = // TODO: move SchemaPruning into catalyst - SchemaPruning :: V2ScanRelationPushDown :: PruneFileSourcePartitions :: Nil + SchemaPruning :: V2ScanRelationPushDown :: V2Writes :: PruneFileSourcePartitions :: Nil override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ @@ -70,7 +70,8 @@ class SparkOptimizer( ExtractPythonUDFFromJoinCondition.ruleName :+ ExtractPythonUDFFromAggregate.ruleName :+ ExtractGroupingPythonUDFFromAggregate.ruleName :+ ExtractPythonUDFs.ruleName :+ - V2ScanRelationPushDown.ruleName + V2ScanRelationPushDown.ruleName :+ + V2Writes.ruleName /** * Optimization batches that are executed before the regular optimization batches (also before diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 635117a9932ac..dd1d9bff706a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartit import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TableChange} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} +import org.apache.spark.sql.connector.write.V1Write import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -195,33 +196,42 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat orCreate = orCreate) :: Nil } - case AppendData(r: DataSourceV2Relation, query, writeOptions, _) => - r.table.asWritable match { - case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => - AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil - case v2 => - AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil + case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), query, writeOptions, + _, Some(write)) if v1.supports(TableCapability.V1_BATCH_WRITE) => + write match { + case v1Write: V1Write => + AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r), v1Write) :: Nil + case v2Write => + throw new AnalysisException( + s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " + + s"${v2Write.getClass} is not an instance of ${classOf[V1Write]}") } - case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) => - // fail if any filter cannot be converted. correctness depends on removing all matching data. - val filters = splitConjunctivePredicates(deleteExpr).map { - filter => DataSourceStrategy.translateFilter(deleteExpr, - supportNestedPredicatePushdown = true).getOrElse( - throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) - }.toArray - r.table.asWritable match { - case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => - OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, - query, refreshCache(r)) :: Nil - case v2 => - OverwriteByExpressionExec(v2, filters, - writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil + case AppendData(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _), query, writeOptions, + _, Some(write)) => + AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r), write) :: Nil + + case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, query, + writeOptions, _, Some(write)) if v1.supports(TableCapability.V1_BATCH_WRITE) => + write match { + case v1Write: V1Write => + OverwriteByExpressionExecV1( + v1, writeOptions.asOptions, query, refreshCache(r), v1Write) :: Nil + case v2Write => + throw new AnalysisException( + s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " + + s"${v2Write.getClass} is not an instance of ${classOf[V1Write]}") } - case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) => + case OverwriteByExpression(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _), _, query, + writeOptions, _, Some(write)) => + OverwriteByExpressionExec( + v2, writeOptions.asOptions, planLater(query), refreshCache(r), write) :: Nil + + case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _, Some(write)) => OverwritePartitionsDynamicExec( - r.table.asWritable, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil + r.table.asWritable, writeOptions.asOptions, planLater(query), + refreshCache(r), write) :: Nil case DeleteFromTable(relation, condition) => relation match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala index cb4a2994de1f4..f697aba46d0df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala @@ -49,14 +49,14 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) { // TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a // a logical plan for streaming write. - case AppendData(r: DataSourceV2Relation, _, _, _) if !supportsBatchWrite(r.table) => + case AppendData(r: DataSourceV2Relation, _, _, _, _) if !supportsBatchWrite(r.table) => failAnalysis(s"Table ${r.table.name()} does not support append in batch mode.") - case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _) + case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _) if !r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_DYNAMIC) => failAnalysis(s"Table ${r.table.name()} does not support dynamic overwrite in batch mode.") - case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _, _) => + case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _, _, _) => expr match { case Literal(true, BooleanType) => if (!supportsBatchWrite(r.table) || diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index 080e977121efb..3363172a85286 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -17,17 +17,14 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - -import org.apache.spark.SparkException import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.SupportsWrite -import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} +import org.apache.spark.sql.connector.write.V1Write import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation} +import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -39,12 +36,8 @@ case class AppendDataExecV1( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - refreshCache: () => Unit) extends V1FallbackWriters { - - override protected def run(): Seq[InternalRow] = { - writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache) - } -} + refreshCache: () => Unit, + write: V1Write) extends V1FallbackWriters /** * Physical plan node for overwrite into a v2 table with V1 write interfaces. Note that when this @@ -59,29 +52,10 @@ case class AppendDataExecV1( */ case class OverwriteByExpressionExecV1( table: SupportsWrite, - deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan, - refreshCache: () => Unit) extends V1FallbackWriters { - - private def isTruncate(filters: Array[Filter]): Boolean = { - filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] - } - - override protected def run(): Seq[InternalRow] = { - newWriteBuilder() match { - case builder: SupportsTruncate if isTruncate(deleteWhere) => - writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), refreshCache = refreshCache) - - case builder: SupportsOverwrite => - writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(), - refreshCache = refreshCache) - - case _ => - throw new SparkException(s"Table does not support overwrite by expression: $table") - } - } -} + refreshCache: () => Unit, + write: V1Write) extends V1FallbackWriters /** Some helper interfaces that use V2 write semantics through the V1 writer interface. */ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write { @@ -90,23 +64,13 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write { def table: SupportsWrite def writeOptions: CaseInsensitiveStringMap + def refreshCache: () => Unit + def write: V1Write - protected implicit class toV1WriteBuilder(builder: WriteBuilder) { - def asV1Builder: V1WriteBuilder = builder match { - case v1: V1WriteBuilder => v1 - case other => throw new IllegalStateException( - s"The returned writer ${other} was no longer a V1WriteBuilder.") - } - } - - protected def newWriteBuilder(): V1WriteBuilder = { - val info = LogicalWriteInfoImpl( - queryId = UUID.randomUUID().toString, - schema = plan.schema, - options = writeOptions) - val writeBuilder = table.newWriteBuilder(info) - - writeBuilder.asV1Builder + override def run(): Seq[InternalRow] = { + val writtenRows = writeWithV1(write.toInsertableRelation) + refreshCache() + writtenRows } } @@ -116,12 +80,8 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write { trait SupportsV1Write extends SparkPlan { def plan: LogicalPlan - protected def writeWithV1( - relation: InsertableRelation, - refreshCache: () => Unit = () => ()): Seq[InternalRow] = { + protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = { relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false) - refreshCache() - Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala new file mode 100644 index 0000000000000..a8e0731edf14c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util.UUID + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{AlwaysTrue, Filter} + +/** + * A rule that constructs logical writes. + */ +object V2Writes extends Rule[LogicalPlan] with PredicateHelper { + + import DataSourceV2Implicits._ + + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { + case a @ AppendData(r: DataSourceV2Relation, query, options, _, None) => + val writeBuilder = newWriteBuilder(r.table, query, options) + val write = writeBuilder.build() + a.copy(write = Some(write)) + + case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, options, _, None) => + // fail if any filter cannot be converted. correctness depends on removing all matching data. + val filters = splitConjunctivePredicates(deleteExpr).flatMap { pred => + val filter = DataSourceStrategy.translateFilter(pred, supportNestedPredicatePushdown = true) + if (filter.isEmpty) { + throw new AnalysisException(s"Cannot translate expression to source filter: $pred") + } + filter + }.toArray + + val table = r.table + val writeBuilder = newWriteBuilder(table, query, options) + val write = writeBuilder match { + case builder: SupportsTruncate if isTruncate(filters) => + builder.truncate().build() + case builder: SupportsOverwrite => + builder.overwrite(filters).build() + case _ => + throw new SparkException(s"Table does not support overwrite by expression: $table") + } + + o.copy(write = Some(write)) + + case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, options, _, None) => + val table = r.table + val writeBuilder = newWriteBuilder(table, query, options) + val write = writeBuilder match { + case builder: SupportsDynamicOverwrite => + builder.overwriteDynamicPartitions().build() + case _ => + throw new SparkException(s"Table does not support dynamic partition overwrite: $table") + } + o.copy(write = Some(write)) + } + + private def isTruncate(filters: Array[Filter]): Boolean = { + filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] + } + + private def newWriteBuilder( + table: Table, + query: LogicalPlan, + writeOptions: Map[String, String]): WriteBuilder = { + + val info = LogicalWriteInfoImpl( + queryId = UUID.randomUUID().toString, + query.schema, + writeOptions.asOptions) + table.asWritable.newWriteBuilder(info) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index f5f77d38b8716..e0887d52cc376 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -33,9 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, SupportsWrite, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, LogicalWriteInfoImpl, PhysicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, LogicalWriteInfoImpl, PhysicalWriteInfoImpl, V1Write, Write, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.sources.{AlwaysTrue, Filter} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{LongAccumulator, Utils} @@ -216,14 +215,8 @@ case class AppendDataExec( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, query: SparkPlan, - refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { - - override protected def run(): Seq[InternalRow] = { - val writtenRows = writeWithV2(newWriteBuilder().buildForBatch()) - refreshCache() - writtenRows - } -} + refreshCache: () => Unit, + write: Write) extends V2ExistingTableWriteExec with BatchWriteHelper /** * Physical plan node for overwrite into a v2 table. @@ -237,31 +230,10 @@ case class AppendDataExec( */ case class OverwriteByExpressionExec( table: SupportsWrite, - deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, query: SparkPlan, - refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { - - private def isTruncate(filters: Array[Filter]): Boolean = { - filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] - } - - override protected def run(): Seq[InternalRow] = { - val writtenRows = newWriteBuilder() match { - case builder: SupportsTruncate if isTruncate(deleteWhere) => - writeWithV2(builder.truncate().buildForBatch()) - - case builder: SupportsOverwrite => - writeWithV2(builder.overwrite(deleteWhere).buildForBatch()) - - case _ => - throw new SparkException(s"Table does not support overwrite by expression: $table") - } - refreshCache() - writtenRows - } -} - + refreshCache: () => Unit, + write: Write) extends V2ExistingTableWriteExec with BatchWriteHelper /** * Physical plan node for dynamic partition overwrite into a v2 table. @@ -276,20 +248,8 @@ case class OverwritePartitionsDynamicExec( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, query: SparkPlan, - refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { - - override protected def run(): Seq[InternalRow] = { - val writtenRows = newWriteBuilder() match { - case builder: SupportsDynamicOverwrite => - writeWithV2(builder.overwriteDynamicPartitions().buildForBatch()) - - case _ => - throw new SparkException(s"Table does not support dynamic partition overwrite: $table") - } - refreshCache() - writtenRows - } -} + refreshCache: () => Unit, + write: Write) extends V2ExistingTableWriteExec with BatchWriteHelper case class WriteToDataSourceV2Exec( batchWrite: BatchWrite, @@ -319,6 +279,17 @@ trait BatchWriteHelper { } } +trait V2ExistingTableWriteExec extends V2TableWriteExec { + def refreshCache: () => Unit + def write: Write + + override protected def run(): Seq[InternalRow] = { + val writtenRows = writeWithV2(write.toBatch) + refreshCache() + writtenRows + } +} + /** * The base physical plan for writing data into data source v2. */ @@ -477,9 +448,10 @@ private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1W writeOptions) val writeBuilder = table.newWriteBuilder(info) - val writtenRows = writeBuilder match { - case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write()) - case v2 => writeWithV2(v2.buildForBatch()) + val write = writeBuilder.build() + val writtenRows = write match { + case v1: V1Write => writeWithV1(v1.toInsertableRelation) + case v2 => writeWithV2(v2.toBatch) } table match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCWriteBuilder.scala index a9f7a32bf4c69..0e6c72c2cc331 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCWriteBuilder.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.StructType -case class JDBCWriteBuilder(schema: StructType, options: JdbcOptionsInWrite) extends V1WriteBuilder +case class JDBCWriteBuilder(schema: StructType, options: JdbcOptionsInWrite) extends WriteBuilder with SupportsTruncate { private var isTruncate = false @@ -33,8 +33,8 @@ case class JDBCWriteBuilder(schema: StructType, options: JdbcOptionsInWrite) ext this } - override def buildForV1Write(): InsertableRelation = new InsertableRelation { - override def insert(data: DataFrame, overwrite: Boolean): Unit = { + override def build(): V1Write = new V1Write { + override def toInsertableRelation: InsertableRelation = (data: DataFrame, _: Boolean) => { // TODO (SPARK-32595): do truncate and append atomically. if (isTruncate) { val conn = JdbcUtils.createConnectionFactory(options)() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index cba7dd35fb3bc..45ddc6a6fcfc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, V1Scan} -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1Write, WriteBuilder} import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION @@ -311,7 +311,8 @@ class InMemoryV1Provider if (mode == SaveMode.Overwrite) { writer.asInstanceOf[SupportsTruncate].truncate() } - writer.asInstanceOf[V1WriteBuilder].buildForV1Write().insert(data, overwrite = false) + val write = writer.build() + write.asInstanceOf[V1Write].toInsertableRelation.insert(data, overwrite = false) getRelation } } @@ -348,7 +349,6 @@ class InMemoryTableWithV1Fallback( private class FallbackWriteBuilder(options: CaseInsensitiveStringMap) extends WriteBuilder - with V1WriteBuilder with SupportsTruncate with SupportsOverwrite { @@ -371,9 +371,9 @@ class InMemoryTableWithV1Fallback( partIndexes.map(row.get) } - override def buildForV1Write(): InsertableRelation = { - new InsertableRelation { - override def insert(data: DataFrame, overwrite: Boolean): Unit = { + override def build(): V1Write = new V1Write { + override def toInsertableRelation: InsertableRelation = { + (data: DataFrame, overwrite: Boolean) => { assert(!overwrite, "V1 write fallbacks cannot be called with overwrite=true") val rows = data.collect() rows.groupBy(getPartitionValues).foreach { case (partition, elements) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 70cbfa194313f..6571e27b928bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -1199,7 +1199,7 @@ class PlanResolutionSuite extends AnalysisTest { case Project(_, AsDataSourceV2Relation(r)) => assert(r.catalog.exists(_ == catalogIdent)) assert(r.identifier.exists(_.name() == tableIdent)) - case AppendData(r: DataSourceV2Relation, _, _, _) => + case AppendData(r: DataSourceV2Relation, _, _, _, _) => assert(r.catalog.exists(_ == catalogIdent)) assert(r.identifier.exists(_.name() == tableIdent)) case DescribeRelation(r: ResolvedTable, _, _) => From 882e3210004c0e27aca914b58819008dafdde92a Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 21 Dec 2020 17:35:06 +0200 Subject: [PATCH 2/2] Review comments --- project/MimaExcludes.scala | 2 ++ .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 33e65c9def41b..ba879c03795d1 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,8 @@ object MimaExcludes { // Exclude rules for 3.2.x lazy val v32excludes = v31excludes ++ Seq( + // [SPARK-33808][SQL] DataSource V2: Build logical writes in the optimizer + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.write.V1WriteBuilder") ) // Exclude rules for 3.1.x diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index dd1d9bff706a6..0c92945dc6ca5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -204,7 +204,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case v2Write => throw new AnalysisException( s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " + - s"${v2Write.getClass} is not an instance of ${classOf[V1Write]}") + s"${v2Write.getClass.getName} is not an instance of ${classOf[V1Write].getName}") } case AppendData(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _), query, writeOptions, @@ -220,7 +220,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case v2Write => throw new AnalysisException( s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " + - s"${v2Write.getClass} is not an instance of ${classOf[V1Write]}") + s"${v2Write.getClass.getName} is not an instance of ${classOf[V1Write].getName}") } case OverwriteByExpression(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _), _, query,