Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39966][SQL] Use V2 Filter in SupportsDelete #37393

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.internal.connector.PredicateUtils;
import org.apache.spark.sql.sources.AlwaysTrue;
import org.apache.spark.sql.sources.Filter;

Expand All @@ -28,7 +30,7 @@
* @since 3.0.0
*/
@Evolving
public interface SupportsDelete extends TruncatableTable {
public interface SupportsDelete extends SupportsDeleteV2 {

/**
* Checks whether it is possible to delete data from a data source table that matches filter
Expand Down Expand Up @@ -70,6 +72,18 @@ default boolean canDeleteWhere(Filter[] filters) {
*/
void deleteWhere(Filter[] filters);

default boolean canDeleteWhere(Predicate[] predicates) {
try {
return this.canDeleteWhere(PredicateUtils.toV1(predicates, false));
} catch (UnsupportedOperationException e) {
return false;
}
}

default void deleteWhere(Predicate[] predicates) {
this.deleteWhere(PredicateUtils.toV1(predicates, false));
}

@Override
default boolean truncateTable() {
Filter[] filters = new Filter[] { new AlwaysTrue() };
Expand Down
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue;
import org.apache.spark.sql.connector.expressions.filter.Predicate;

/**
* A mix-in interface for {@link Table} delete support. Data sources can implement this
* interface to provide the ability to delete data from tables that matches filter expressions.
*
* @since 3.4.0
*/
@Evolving
public interface SupportsDeleteV2 extends TruncatableTable {

/**
* Checks whether it is possible to delete data from a data source table that matches filter
* expressions.
* <p>
* Rows should be deleted from the data source iff all of the filter expressions match.
* That is, the expressions must be interpreted as a set of filters that are ANDed together.
* <p>
* Spark will call this method at planning time to check whether {@link #deleteWhere(Predicate[])}
* would reject the delete operation because it requires significant effort. If this method
* returns false, Spark will not call {@link #deleteWhere(Predicate[])} and will try to rewrite
* the delete operation and produce row-level changes if the data source table supports deleting
* individual records.
*
* @param predicates V2 filter expressions, used to select rows to delete when all expressions
* match
* @return true if the delete operation can be performed
*
* @since 3.4.0
*/
default boolean canDeleteWhere(Predicate[] predicates) {
return true;
}

/**
* Delete data from a data source table that matches filter expressions. Note that this method
* will be invoked only if {@link #canDeleteWhere(Predicate[])} returns true.
* <p>
* Rows are deleted from the data source iff all of the filter expressions match. That is, the
* expressions must be interpreted as a set of filters that are ANDed together.
* <p>
* Implementations may reject a delete operation if the delete isn't possible without significant
* effort. For example, partitioned data sources may reject deletes that do not filter by
* partition columns because the filter may require rewriting files without deleted records.
* To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
* error message that identifies which expression was rejected.
*
* @param predicates predicate expressions, used to select rows to delete when all expressions
* match
* @throws IllegalArgumentException If the delete is rejected due to required effort
*/
void deleteWhere(Predicate[] predicates);

@Override
default boolean truncateTable() {
Predicate[] predicates = new Predicate[] { new AlwaysTrue() };
boolean canDelete = canDeleteWhere(predicates);
if (canDelete) {
deleteWhere(predicates);
}
return canDelete;
}
}
Expand Up @@ -17,17 +17,12 @@

package org.apache.spark.sql.connector.read;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.internal.connector.PredicateUtils;

import scala.Option;

/**
* A mix-in interface for {@link Scan}. Data sources can implement this interface if they can
* filter initially planned {@link InputPartition}s using predicates Spark infers at runtime.
Expand All @@ -37,7 +32,7 @@
* @since 3.2.0
*/
@Experimental
public interface SupportsRuntimeFiltering extends Scan, SupportsRuntimeV2Filtering {
public interface SupportsRuntimeFiltering extends SupportsRuntimeV2Filtering {
/**
* Returns attributes this scan can be filtered by at runtime.
* <p>
Expand Down Expand Up @@ -66,16 +61,6 @@ public interface SupportsRuntimeFiltering extends Scan, SupportsRuntimeV2Filteri
void filter(Filter[] filters);

default void filter(Predicate[] predicates) {
List<Filter> filterList = new ArrayList();

for (int i = 0; i < predicates.length; i++) {
Option filter = PredicateUtils.toV1(predicates[i]);
if (filter.nonEmpty()) {
filterList.add((Filter)filter.get());
}
}

Filter[] filters = new Filter[filterList.size()];
this.filter(filterList.toArray(filters));
this.filter(PredicateUtils.toV1(predicates, true));
}
}
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, Not}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, LogicalPlan, ReplaceData}
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsRowLevelOperations, TruncatableTable}
import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, SupportsRowLevelOperations, TruncatableTable}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
import org.apache.spark.sql.connector.write.RowLevelOperationTable
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
Expand All @@ -29,7 +29,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A rule that rewrites DELETE operations using plans that operate on individual or groups of rows.
*
* If a table implements [[SupportsDelete]] and [[SupportsRowLevelOperations]], this rule will
* If a table implements [[SupportsDeleteV2]] and [[SupportsRowLevelOperations]], this rule will
* still rewrite the DELETE operation but the optimizer will check whether this particular DELETE
* statement can be handled by simply passing delete filters to the connector. If so, the optimizer
* will discard the rewritten plan and will allow the data source to delete using filters.
Expand All @@ -47,7 +47,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand {
val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty())
buildReplaceDataPlan(r, table, cond)

case DataSourceV2Relation(_: SupportsDelete, _, _, _, _) =>
case DataSourceV2Relation(_: SupportsDeleteV2, _, _, _, _) =>
// don't rewrite as the table supports deletes only with filters
d

Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.{sources, AnalysisException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
Expand All @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.trees.BinaryLike
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationTable, Write}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}
Expand Down Expand Up @@ -537,7 +538,7 @@ case class DeleteFromTable(
*/
case class DeleteFromTableWithFilters(
table: LogicalPlan,
condition: Seq[sources.Filter]) extends LeafCommand
condition: Seq[Predicate]) extends LeafCommand

/**
* The logical plan of the UPDATE TABLE command.
Expand Down
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, UnboundFunction}
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED, LEGACY_CTE_PRECEDENCE_POLICY}
import org.apache.spark.sql.sources.Filter
Expand Down Expand Up @@ -856,7 +857,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
s" cannot translate expression to source filter: $f")
}

def cannotDeleteTableWhereFiltersError(table: Table, filters: Array[Filter]): Throwable = {
def cannotDeleteTableWhereFiltersError(table: Table, filters: Array[Predicate]): Throwable = {
new AnalysisException(
s"Cannot delete from table ${table.name} where ${filters.mkString("[", ", ", "]")}")
}
Expand Down Expand Up @@ -2556,4 +2557,9 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
errorClass = "INVALID_COLUMN_OR_FIELD_DATA_TYPE",
messageParameters = Array(toSQLId(name), toSQLType(dt), toSQLType(expected)))
}

def unsupportedPredicateToFilterConversionError(predicateType: String): Throwable = {
new UnsupportedOperationException(s"conversion from data source v2 Predicate to data " +
s"source v1 Filter is not supported for this Predicate: ${predicateType}")
}
}
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable}
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDeleteV2, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable}
import org.apache.spark.sql.connector.write.RowLevelOperationTable
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
Expand All @@ -48,9 +48,9 @@ object DataSourceV2Implicits {
}
}

def asDeletable: SupportsDelete = {
def asDeletable: SupportsDeleteV2 = {
table match {
case support: SupportsDelete =>
case support: SupportsDeleteV2 =>
support
case _ =>
throw QueryCompilationErrors.tableDoesNotSupportDeletesError(table)
Expand Down
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal.connector
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
import org.apache.spark.sql.types.StringType

Expand Down Expand Up @@ -54,7 +55,7 @@ private[sql] object PredicateUtils {
Some(In(attribute, Array.empty[Any]))
}

case "=" | "<=>" | ">" | "<" | ">=" | "<=" if isValidBinaryPredicate =>
huaxingao marked this conversation as resolved.
Show resolved Hide resolved
case "=" | "<=>" | ">" | "<" | ">=" | "<=" if isValidBinaryPredicate() =>
val attribute = predicate.children()(0).toString
val value = predicate.children()(1).asInstanceOf[LiteralValue[_]]
val v1Value = CatalystTypeConverters.convertToScala(value.value, value.dataType)
Expand All @@ -77,7 +78,7 @@ private[sql] object PredicateUtils {
}
Some(v1Filter)

case "STARTS_WITH" | "ENDS_WITH" | "CONTAINS" if isValidBinaryPredicate =>
case "STARTS_WITH" | "ENDS_WITH" | "CONTAINS" if isValidBinaryPredicate() =>
val attribute = predicate.children()(0).toString
val value = predicate.children()(1).asInstanceOf[LiteralValue[_]]
if (!value.dataType.sameType(StringType)) return None
Expand Down Expand Up @@ -132,4 +133,18 @@ private[sql] object PredicateUtils {
case _ => None
}
}

def toV1(
predicates: Array[Predicate],
skipIfNotConvertible: Boolean): Array[Filter] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this more, I think we can just have

def toV1(predicates: Array[Predicate]): Array[Filter] = ...

In SupportsDelete, we can check the number of returned v1 filter

  default boolean canDeleteWhere(Predicate[] predicates) {
    Filter[] v1Filters = PredicateUtils.toV1(predicates);
    if (v1Filters.length < predicates) return false;
    return this.canDeleteWhere(v1Filters);
  }


  default void deleteWhere(Predicate[] predicates) {
    this.deleteWhere(PredicateUtils.toV1(predicates));
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually have thought about this, this works OK for SupportsDelete, but we will have to throw Exception later on for SupportsOverwrite, because we don't have an API canOverwrite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add canOverwrite?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least we can still throw error if we want to, by checking the number of returned v1 filters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I will change to this way, and check if (v1Filters.length < predicates.length) and throw Exception in SupportsOverwrite.overwrite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied without seeing your previous message.

Will add canOverwrite

predicates.flatMap { predicate =>
val filter = toV1(predicate)
if (filter.isEmpty) {
if (!skipIfNotConvertible) {
huaxingao marked this conversation as resolved.
Show resolved Hide resolved
throw QueryCompilationErrors.unsupportedPredicateToFilterConversionError(predicate.name())
}
}
filter
}.toArray
huaxingao marked this conversation as resolved.
Show resolved Hide resolved
}
}