Skip to content

Commit

Permalink
Spark 3.4: Remove no longer needed write extensions (apache#7443)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored and manisin committed May 9, 2023
1 parent a67f8dd commit 96195a8
Show file tree
Hide file tree
Showing 23 changed files with 209 additions and 790 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.ExtendedV2ExpressionUtils
import org.apache.spark.sql.catalyst.expressions.IsNotNull
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
import org.apache.spark.sql.catalyst.plans.FullOuter
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.LeftAnti
Expand Down Expand Up @@ -390,7 +390,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand with Predicat
}

private def resolveAttrRef(ref: NamedReference, plan: LogicalPlan): AttributeReference = {
ExtendedV2ExpressionUtils.resolveRef[AttributeReference](ref, plan)
V2ExpressionUtils.resolveRef[AttributeReference](ref, plan)
}

private def buildMergeDeltaProjections(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ProjectingInternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.ExtendedV2ExpressionUtils
import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
import org.apache.spark.sql.connector.write.RowLevelOperation
Expand Down Expand Up @@ -73,7 +73,7 @@ trait RewriteRowLevelIcebergCommand extends RewriteRowLevelCommand {

operation match {
case supportsDelta: SupportsDelta =>
val rowIdAttrs = ExtendedV2ExpressionUtils.resolveRefs[AttributeReference](
val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
supportsDelta.rowId.toSeq,
relation)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.analysis.NamedRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.ExtendedV2ExpressionUtils
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.RowDeltaUtils.OPERATION_COLUMN
import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
Expand Down Expand Up @@ -80,7 +80,7 @@ case class WriteIcebergDelta(
}

private def rowIdAttrsResolved: Boolean = {
val rowIdAttrs = ExtendedV2ExpressionUtils.resolveRefs[AttributeReference](
val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
operation.rowId.toSeq,
originalTable)

Expand All @@ -92,7 +92,7 @@ case class WriteIcebergDelta(
private def metadataAttrsResolved: Boolean = {
projections.metadataProjection match {
case Some(projection) =>
val metadataAttrs = ExtendedV2ExpressionUtils.resolveRefs[AttributeReference](
val metadataAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
operation.requiredMetadataAttributes.toSeq,
originalTable)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,100 +22,40 @@ package org.apache.spark.sql.execution.datasources.v2
import java.util.Optional
import java.util.UUID
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.plans.logical.AppendData
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.OverwriteByExpression
import org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.write.DeltaWriteBuilder
import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl
import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite
import org.apache.spark.sql.connector.write.SupportsOverwrite
import org.apache.spark.sql.connector.write.SupportsTruncate
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.AlwaysTrue
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

/**
* A rule that is inspired by V2Writes in Spark but supports Iceberg transforms.
* A rule that is inspired by V2Writes in Spark but supports Iceberg specific plans.
*/
object ExtendedV2Writes extends Rule[LogicalPlan] with PredicateHelper {

import DataSourceV2Implicits._

override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case a @ AppendData(r: DataSourceV2Relation, query, options, _, None, _) if isIcebergRelation(r) =>
val writeBuilder = newWriteBuilder(r.table, query.schema, options)
val write = writeBuilder.build()
val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
a.copy(write = Some(write), query = newQuery)

case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, options, _, None, _)
if isIcebergRelation(r) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).flatMap { pred =>
val filter = DataSourceStrategy.translateFilter(pred, supportNestedPredicatePushdown = true)
if (filter.isEmpty) {
throw QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
}
filter
}.toArray

val table = r.table
val writeBuilder = newWriteBuilder(table, query.schema, options)
val write = writeBuilder match {
case builder: SupportsTruncate if isTruncate(filters) =>
builder.truncate().build()
case builder: SupportsOverwrite =>
builder.overwrite(filters).build()
case _ =>
throw QueryExecutionErrors.overwriteTableByUnsupportedExpressionError(table)
}

val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
o.copy(write = Some(write), query = newQuery)

case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, options, _, None)
if isIcebergRelation(r) =>
val table = r.table
val writeBuilder = newWriteBuilder(table, query.schema, options)
val write = writeBuilder match {
case builder: SupportsDynamicOverwrite =>
builder.overwriteDynamicPartitions().build()
case _ =>
throw QueryExecutionErrors.dynamicPartitionOverwriteUnsupportedByTableError(table)
}
val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
o.copy(write = Some(write), query = newQuery)

case rd @ ReplaceIcebergData(r: DataSourceV2Relation, query, _, None) =>
val rowSchema = StructType.fromAttributes(rd.dataInput)
val writeBuilder = newWriteBuilder(r.table, rowSchema, Map.empty)
val write = writeBuilder.build()
val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, r.funCatalog)
rd.copy(write = Some(write), query = Project(rd.dataInput, newQuery))

case wd @ WriteIcebergDelta(r: DataSourceV2Relation, query, _, projections, None) =>
val deltaWriteBuilder = newDeltaWriteBuilder(r.table, Map.empty, projections)
val deltaWrite = deltaWriteBuilder.build()
val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(deltaWrite, query, conf)
val newQuery = DistributionAndOrderingUtils.prepareQuery(deltaWrite, query, r.funCatalog)
wd.copy(write = Some(deltaWrite), query = newQuery)
}

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}

private def newWriteBuilder(
table: Table,
rowSchema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.ExtendedV2ExpressionUtils
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
import org.apache.spark.sql.catalyst.planning.RewrittenRowLevelCommand
import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
Expand Down Expand Up @@ -80,8 +80,8 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[Logic
val matchingRowsPlan = buildMatchingRowsPlan(relation, command)

val filterAttrs = ArraySeq.unsafeWrapArray(scan.filterAttributes)
val buildKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
val pruningKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
val buildKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
val pruningKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
val dynamicPruningCond = buildDynamicPruningCond(matchingRowsPlan, buildKeys, pruningKeys)

Filter(dynamicPruningCond, r)
Expand Down
Loading

0 comments on commit 96195a8

Please sign in to comment.