Skip to content

Commit

Permalink
Make SupportsDelete to be simple mix-in of DSV2
Browse files Browse the repository at this point in the history
  • Loading branch information
xy_xin committed Jul 29, 2019
1 parent 1197514 commit b9d8bb7
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 359 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.Filter;

public interface SupportsDelete {
/**
* Delete data from a data source table that matches filter expressions.
* <p>
* Rows are deleted from the data source iff all of the filter expressions match. That is, the
* expressions must be interpreted as a set of filters that are ANDed together.
* <p>
* Implementations may reject a delete operation if the delete isn't possible without significant
* effort. For example, partitioned data sources may reject deletes that do not filter by
* partition columns because the filter may require rewriting files without deleted records.
* To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
* error message that identifies which expression was rejected.
*
* @param filters filter expressions, used to select rows to delete when all expressions match
* @throws IllegalArgumentException If the delete is rejected due to required effort
*/
void deleteWhere(Filter[] filters);
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ object DataSourceV2Implicits {
}
}

def asMaintainable: SupportsMaintenance = {
def asDeletable: SupportsDelete = {
table match {
case support: SupportsMaintenance =>
case support: SupportsDelete =>
support
case _ =>
throw new AnalysisException(s"Table does not support maintenance: ${table.name}")
throw new AnalysisException(s"Table does not support deletes: ${table.name}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalog.v2.StagingTableCatalog
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
Expand Down Expand Up @@ -229,7 +228,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray
DeleteFromTableExec(r.table.asMaintainable, r.options, filters, planLater(r)) :: Nil
DeleteFromTableExec(r.table.asDeletable, r.options, filters, planLater(r)) :: Nil

case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,23 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.SupportsMaintenance
import org.apache.spark.sql.sources.v2.maintain.SupportsDelete
import org.apache.spark.sql.sources.v2.SupportsDelete
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class DeleteFromTableExec(
table: SupportsMaintenance,
table: SupportsDelete,
options: CaseInsensitiveStringMap,
deleteWhere: Array[Filter],
query: SparkPlan) extends UnaryExecNode {

override protected def doExecute(): RDD[InternalRow] = {
table.newMaintainerBuilder(options).build() match {
case maintainer: SupportsDelete =>
maintainer.delete(deleteWhere)
case _ =>
throw new SparkException(s"Table does not support delete: $table")
}

table.deleteWhere(deleteWhere)
sparkContext.emptyRDD
}

Expand Down
Loading

0 comments on commit b9d8bb7

Please sign in to comment.