Skip to content

Commit

Permalink
Make SupportsDelete to be simple mix-in of DSV2
Browse files Browse the repository at this point in the history
  • Loading branch information
xy_xin committed Jul 29, 2019
1 parent bc9daf9 commit 620e6f5
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 172 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.Filter;

public interface SupportsDelete {
/**
* Delete data from a data source table that matches filter expressions.
* <p>
* Rows are deleted from the data source iff all of the filter expressions match. That is, the
* expressions must be interpreted as a set of filters that are ANDed together.
* <p>
* Implementations may reject a delete operation if the delete isn't possible without significant
* effort. For example, partitioned data sources may reject deletes that do not filter by
* partition columns because the filter may require rewriting files without deleted records.
* To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
* error message that identifies which expression was rejected.
*
* @param filters filter expressions, used to select rows to delete when all expressions match
* @throws IllegalArgumentException If the delete is rejected due to required effort
*/
void deleteWhere(Filter[] filters);
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ object DataSourceV2Implicits {
}
}

def asMaintainable: SupportsMaintenance = {
def asDeletable: SupportsDelete = {
table match {
case support: SupportsMaintenance =>
case support: SupportsDelete =>
support
case _ =>
throw new AnalysisException(s"Table does not support maintenance: ${table.name}")
throw new AnalysisException(s"Table does not support deletes: ${table.name}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
Expand Down Expand Up @@ -191,7 +190,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray
DeleteFromTableExec(r.table.asMaintainable, r.options, filters, planLater(r)) :: Nil
DeleteFromTableExec(r.table.asDeletable, r.options, filters, planLater(r)) :: Nil

case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,23 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.SupportsMaintenance
import org.apache.spark.sql.sources.v2.maintain.SupportsDelete
import org.apache.spark.sql.sources.v2.SupportsDelete
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class DeleteFromTableExec(
table: SupportsMaintenance,
table: SupportsDelete,
options: CaseInsensitiveStringMap,
deleteWhere: Array[Filter],
query: SparkPlan) extends UnaryExecNode {

override protected def doExecute(): RDD[InternalRow] = {
table.newMaintainerBuilder(options).build() match {
case maintainer: SupportsDelete =>
maintainer.delete(deleteWhere)
case _ =>
throw new SparkException(s"Table does not support delete: $table")
}

table.deleteWhere(deleteWhere)
sparkContext.emptyRDD
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Literal, Not}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.maintain.{Maintainer, MaintainerBuilder, SupportsDelete}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types._
Expand All @@ -41,7 +40,7 @@ private[v2] class InMemoryTable(
val name: String,
val schema: StructType,
override val properties: util.Map[String, String])
extends Table with SupportsRead with SupportsWrite with SupportsMaintenance {
extends Table with SupportsRead with SupportsWrite with SupportsDelete {

def this(
name: String,
Expand Down Expand Up @@ -92,26 +91,6 @@ private[v2] class InMemoryTable(
}
}

override def newMaintainerBuilder(options: CaseInsensitiveStringMap): MaintainerBuilder = {
() => {
Delete
}
}

private object Delete extends Maintainer with SupportsDelete {

override def delete(filters: Array[Filter]): Unit = {
val filtered = data.map {
rows =>
val newRows = filter(rows.rows, filters)
val newBufferedRows = new BufferedRows()
newBufferedRows.rows.appendAll(newRows)
newBufferedRows
}.filter(_.rows.nonEmpty)
replaceData(filtered)
}
}

def filter(rows: mutable.ArrayBuffer[InternalRow],
filters: Array[Filter]): Array[InternalRow] = {
if (rows.isEmpty) {
Expand Down Expand Up @@ -164,6 +143,17 @@ private[v2] class InMemoryTable(
override def abort(messages: Array[WriterCommitMessage]): Unit = {
}
}

override def deleteWhere(filters: Array[Filter]): Unit = {
val filtered = data.map {
rows =>
val newRows = filter(rows.rows, filters)
val newBufferedRows = new BufferedRows()
newBufferedRows.rows.appendAll(newRows)
newBufferedRows
}.filter(_.rows.nonEmpty)
replaceData(filtered)
}
}

private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
Expand Down

0 comments on commit 620e6f5

Please sign in to comment.