Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33808][SQL] DataSource V2: Build logical writes in the optimizer #30806

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ object MimaExcludes {

// Exclude rules for 3.2.x
lazy val v32excludes = v31excludes ++ Seq(
// [SPARK-33808][SQL] DataSource V2: Build logical writes in the optimizer
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.write.V1WriteBuilder")
)

// Exclude rules for 3.1.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public enum TableCapability {
/**
* Signals that the table supports append writes using the V1 InsertableRelation interface.
* <p>
* Tables that return this capability must create a V1WriteBuilder and may also support additional
* Tables that return this capability must create a V1Write and may also support additional
* write modes, like {@link #TRUNCATE}, and {@link #OVERWRITE_BY_FILTER}, but cannot support
* {@link #OVERWRITE_DYNAMIC}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.Write
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}

/**
Expand Down Expand Up @@ -65,7 +66,8 @@ case class AppendData(
table: NamedRelation,
query: LogicalPlan,
writeOptions: Map[String, String],
isByName: Boolean) extends V2WriteCommand {
isByName: Boolean,
write: Option[Write] = None) extends V2WriteCommand {
override def withNewQuery(newQuery: LogicalPlan): AppendData = copy(query = newQuery)
override def withNewTable(newTable: NamedRelation): AppendData = copy(table = newTable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add override lazy val resolved = ... && write.isDefined in V2WriteCommand? It's safer to make sure that the analyzer creates the Write object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea but we actually construct the Write object in the optimizer after the operator optimization is done to ensure we operate on optimal expressions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, let's leave it then.

}
Expand Down Expand Up @@ -94,7 +96,8 @@ case class OverwriteByExpression(
deleteExpr: Expression,
query: LogicalPlan,
writeOptions: Map[String, String],
isByName: Boolean) extends V2WriteCommand {
isByName: Boolean,
write: Option[Write] = None) extends V2WriteCommand {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this optional allows us to reuse the same plan before we construct a write and after. Having None here means the logical write hasn't been constructed yet. This allows us to have idempotent rules in the optimizer.

override lazy val resolved: Boolean = {
table.resolved && query.resolved && outputResolved && deleteExpr.resolved
}
Expand Down Expand Up @@ -132,7 +135,8 @@ case class OverwritePartitionsDynamic(
table: NamedRelation,
query: LogicalPlan,
writeOptions: Map[String, String],
isByName: Boolean) extends V2WriteCommand {
isByName: Boolean,
write: Option[Write] = None) extends V2WriteCommand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR. I'm thinking that if we should have an optional Scan object in DataSourceV2Relation, instead of having a new logical plan DataSourceV2ScanRelation. It's simpler and consistent with the write logical plans. cc @rdblue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question: DataSourceV2Relation is also used inside write nodes like AppendData. If we add an optional scan, will that mean we will leak a read-specific concept into write plans?

cc @rdblue @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For AppendData, we intentionally do not treat the table as a child, which means the pushdown rule won't apply for it and the Scan object will always be None in AppendData.

override def withNewQuery(newQuery: LogicalPlan): OverwritePartitionsDynamic = {
copy(query = newQuery)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Unstable;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.sources.InsertableRelation;

/**
* A logical write that should be executed using V1 InsertableRelation interface.
* <p>
* Tables that have {@link TableCapability#V1_BATCH_WRITE} in the list of their capabilities
* must build {@link V1Write}.
*/
@Unstable
public interface V1Write extends Write {
InsertableRelation toInsertableRelation();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.SchemaPruning
import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
import org.apache.spark.sql.execution.datasources.v2.{V2ScanRelationPushDown, V2Writes}
import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning}
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs}

Expand All @@ -37,7 +37,7 @@ class SparkOptimizer(

override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
// TODO: move SchemaPruning into catalyst
SchemaPruning :: V2ScanRelationPushDown :: PruneFileSourcePartitions :: Nil
SchemaPruning :: V2ScanRelationPushDown :: V2Writes :: PruneFileSourcePartitions :: Nil

override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Expand Down Expand Up @@ -70,7 +70,8 @@ class SparkOptimizer(
ExtractPythonUDFFromJoinCondition.ruleName :+
ExtractPythonUDFFromAggregate.ruleName :+ ExtractGroupingPythonUDFFromAggregate.ruleName :+
ExtractPythonUDFs.ruleName :+
V2ScanRelationPushDown.ruleName
V2ScanRelationPushDown.ruleName :+
V2Writes.ruleName

/**
* Optimization batches that are executed before the regular optimization batches (also before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartit
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
Expand Down Expand Up @@ -195,33 +196,42 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
orCreate = orCreate) :: Nil
}

case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil
case v2 =>
AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil
case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), query, writeOptions,
_, Some(write)) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
case v1Write: V1Write =>
AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r), v1Write) :: Nil
case v2Write =>
throw new AnalysisException(
s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " +
s"${v2Write.getClass.getName} is not an instance of ${classOf[V1Write].getName}")
}

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the filter conversion as it is done earlier now.

val filters = splitConjunctivePredicates(deleteExpr).map {
filter => DataSourceStrategy.translateFilter(deleteExpr,
supportNestedPredicatePushdown = true).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions,
query, refreshCache(r)) :: Nil
case v2 =>
OverwriteByExpressionExec(v2, filters,
writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil
case AppendData(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _), query, writeOptions,
_, Some(write)) =>
AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r), write) :: Nil

case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, query,
writeOptions, _, Some(write)) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
case v1Write: V1Write =>
OverwriteByExpressionExecV1(
v1, writeOptions.asOptions, query, refreshCache(r), v1Write) :: Nil
case v2Write =>
throw new AnalysisException(
s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " +
s"${v2Write.getClass.getName} is not an instance of ${classOf[V1Write].getName}")
}

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) =>
case OverwriteByExpression(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _), _, query,
writeOptions, _, Some(write)) =>
OverwriteByExpressionExec(
v2, writeOptions.asOptions, planLater(query), refreshCache(r), write) :: Nil

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _, Some(write)) =>
OverwritePartitionsDynamicExec(
r.table.asWritable, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil
r.table.asWritable, writeOptions.asOptions, planLater(query),
refreshCache(r), write) :: Nil

case DeleteFromTable(relation, condition) =>
relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {

// TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a
// a logical plan for streaming write.
case AppendData(r: DataSourceV2Relation, _, _, _) if !supportsBatchWrite(r.table) =>
case AppendData(r: DataSourceV2Relation, _, _, _, _) if !supportsBatchWrite(r.table) =>
failAnalysis(s"Table ${r.table.name()} does not support append in batch mode.")

case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _)
case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _)
if !r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_DYNAMIC) =>
failAnalysis(s"Table ${r.table.name()} does not support dynamic overwrite in batch mode.")

case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _, _) =>
case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _, _, _) =>
expr match {
case Literal(true, BooleanType) =>
if (!supportsBatchWrite(r.table) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@

package org.apache.spark.sql.execution.datasources.v2

import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
Expand All @@ -39,12 +36,8 @@ case class AppendDataExecV1(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan,
refreshCache: () => Unit) extends V1FallbackWriters {

override protected def run(): Seq[InternalRow] = {
writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache)
}
}
refreshCache: () => Unit,
write: V1Write) extends V1FallbackWriters

/**
* Physical plan node for overwrite into a v2 table with V1 write interfaces. Note that when this
Expand All @@ -59,29 +52,10 @@ case class AppendDataExecV1(
*/
case class OverwriteByExpressionExecV1(
table: SupportsWrite,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan,
refreshCache: () => Unit) extends V1FallbackWriters {

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}

override protected def run(): Seq[InternalRow] = {
newWriteBuilder() match {
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), refreshCache = refreshCache)

case builder: SupportsOverwrite =>
writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(),
refreshCache = refreshCache)

case _ =>
throw new SparkException(s"Table does not support overwrite by expression: $table")
}
}
}
refreshCache: () => Unit,
write: V1Write) extends V1FallbackWriters

/** Some helper interfaces that use V2 write semantics through the V1 writer interface. */
sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
Expand All @@ -90,23 +64,13 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {

def table: SupportsWrite
def writeOptions: CaseInsensitiveStringMap
def refreshCache: () => Unit
def write: V1Write

protected implicit class toV1WriteBuilder(builder: WriteBuilder) {
def asV1Builder: V1WriteBuilder = builder match {
case v1: V1WriteBuilder => v1
case other => throw new IllegalStateException(
s"The returned writer ${other} was no longer a V1WriteBuilder.")
}
}

protected def newWriteBuilder(): V1WriteBuilder = {
val info = LogicalWriteInfoImpl(
queryId = UUID.randomUUID().toString,
schema = plan.schema,
options = writeOptions)
val writeBuilder = table.newWriteBuilder(info)

writeBuilder.asV1Builder
override def run(): Seq[InternalRow] = {
val writtenRows = writeWithV1(write.toInsertableRelation)
refreshCache()
writtenRows
}
}

Expand All @@ -116,12 +80,8 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
trait SupportsV1Write extends SparkPlan {
def plan: LogicalPlan

protected def writeWithV1(
relation: InsertableRelation,
refreshCache: () => Unit = () => ()): Seq[InternalRow] = {
protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely simplified

relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
refreshCache()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refresh moved to run.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm


Nil
}
}
Loading