From 620e6f5dba0723e855e35edfc18d23774eefe71a Mon Sep 17 00:00:00 2001 From: xy_xin Date: Mon, 29 Jul 2019 19:17:14 +0800 Subject: [PATCH] Make SupportsDelete to be simple mix-in of DSV2 --- .../spark/sql/sources/v2/SupportsDelete.java | 39 ++++++++++++++++++ .../sql/sources/v2/SupportsMaintenance.java | 35 ---------------- .../sql/sources/v2/maintain/Maintainer.java | 31 -------------- .../v2/maintain/MaintainerBuilder.java | 40 ------------------- .../sources/v2/maintain/SupportsDelete.java | 28 ------------- .../v2/DataSourceV2Implicits.scala | 6 +-- .../datasources/v2/DataSourceV2Strategy.scala | 5 +-- .../datasources/v2/DeleteFromTableExec.scala | 13 ++---- .../sql/sources/v2/SimpleInMemoryTable.scala | 34 ++++++---------- 9 files changed, 59 insertions(+), 172 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java new file mode 100644 index 0000000000000..39fa9275c664a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.Filter; + +public interface SupportsDelete { + /** + * Delete data from a data source table that matches filter expressions. + *

+ * Rows are deleted from the data source iff all of the filter expressions match. That is, the + * expressions must be interpreted as a set of filters that are ANDed together. + *

+ * Implementations may reject a delete operation if the delete isn't possible without significant + * effort. For example, partitioned data sources may reject deletes that do not filter by + * partition columns because the filter may require rewriting files without deleted records. + * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear + * error message that identifies which expression was rejected. + * + * @param filters filter expressions, used to select rows to delete when all expressions match + * @throws IllegalArgumentException If the delete is rejected due to required effort + */ + void deleteWhere(Filter[] filters); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java deleted file mode 100644 index c4bf7d105efa1..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.sql.sources.v2.maintain.Maintainer; -import org.apache.spark.sql.sources.v2.maintain.MaintainerBuilder; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; - -/** - * A mix-in interface of {@link Table}, to indicate that it can be maintained. This adds - * {@link #newMaintainerBuilder(CaseInsensitiveStringMap)} that is used to create a - * {@link Maintainer} - */ -public interface SupportsMaintenance { - - /** - * Returns an {@link MaintainerBuilder} which can be used to - */ - MaintainerBuilder newMaintainerBuilder(CaseInsensitiveStringMap options); -} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java deleted file mode 100644 index 1c6b26074b259..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.maintain; - -/** - * A maintainer is an instance who execute the update/delete/merge, etc, should be - * implemented by datasource. - * - *

- * A datasource maintainer could choose one or more mix-ins, like {@link SupportsDelete}, - * to enrich the feature of the datasource. - *

- */ -public interface Maintainer { - -} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java deleted file mode 100644 index 3df2b0934d697..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.maintain; - -import org.apache.spark.annotation.Evolving; - -/** - * An interface for build {@link Maintainer}. - */ -@Evolving -public interface MaintainerBuilder { - - /** - * Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. - * Some datasource may use this id to identify queries. - * - * @return a new builder with the `queryId`. By default it returns `this`, which means the given - * `queryId` is ignored. Please override this method to take the `queryId`. - */ - default MaintainerBuilder withQueryId(String queryId) { - return this; - } - - Maintainer build(); -} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java deleted file mode 100644 index 25ccf0a68b9a3..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.maintain; - -import org.apache.spark.sql.sources.Filter; - -/** - * A mix-in for {@link Maintainer} which add delete support. - */ -public interface SupportsDelete extends Maintainer { - - void delete(Filter[] filters); -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index adc7af1e80fe5..ded70a29d470d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -40,12 +40,12 @@ object DataSourceV2Implicits { } } - def asMaintainable: SupportsMaintenance = { + def asDeletable: SupportsDelete = { table match { - case support: SupportsMaintenance => + case support: SupportsDelete => support case _ => - throw new AnalysisException(s"Table does not support maintenance: ${table.name}") + throw new AnalysisException(s"Table does not support deletes: ${table.name}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 5d519b6f2fa08..7dad9a2d6acdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,8 +23,7 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation} import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -191,7 +190,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) }.toArray - DeleteFromTableExec(r.table.asMaintainable, r.options, filters, planLater(r)) :: Nil + DeleteFromTableExec(r.table.asDeletable, r.options, filters, planLater(r)) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala index 8463884b127f3..03c6f69fb3918 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -17,30 +17,23 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.sources.v2.SupportsMaintenance -import org.apache.spark.sql.sources.v2.maintain.SupportsDelete +import org.apache.spark.sql.sources.v2.SupportsDelete import org.apache.spark.sql.util.CaseInsensitiveStringMap case class DeleteFromTableExec( - table: SupportsMaintenance, + table: SupportsDelete, options: CaseInsensitiveStringMap, deleteWhere: Array[Filter], query: SparkPlan) extends UnaryExecNode { override protected def doExecute(): RDD[InternalRow] = { - table.newMaintainerBuilder(options).build() match { - case maintainer: SupportsDelete => - maintainer.delete(deleteWhere) - case _ => - throw new SparkException(s"Table does not support delete: $table") - } + table.deleteWhere(deleteWhere) sparkContext.emptyRDD } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala index fd459becbc5dd..8ffd74e9c70bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Literal, Not} import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.sources.v2.maintain.{Maintainer, MaintainerBuilder, SupportsDelete} import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.types._ @@ -41,7 +40,7 @@ private[v2] class InMemoryTable( val name: String, val schema: StructType, override val properties: util.Map[String, String]) - extends Table with SupportsRead with SupportsWrite with SupportsMaintenance { + extends Table with SupportsRead with SupportsWrite with SupportsDelete { def this( name: String, @@ -92,26 +91,6 @@ private[v2] class InMemoryTable( } } - override def newMaintainerBuilder(options: CaseInsensitiveStringMap): MaintainerBuilder = { - () => { - Delete - } - } - - private object Delete extends Maintainer with SupportsDelete { - - override def delete(filters: Array[Filter]): Unit = { - val filtered = data.map { - rows => - val newRows = filter(rows.rows, filters) - val newBufferedRows = new BufferedRows() - newBufferedRows.rows.appendAll(newRows) - newBufferedRows - }.filter(_.rows.nonEmpty) - replaceData(filtered) - } - } - def filter(rows: mutable.ArrayBuffer[InternalRow], filters: Array[Filter]): Array[InternalRow] = { if (rows.isEmpty) { @@ -164,6 +143,17 @@ private[v2] class InMemoryTable( override def abort(messages: Array[WriterCommitMessage]): Unit = { } } + + override def deleteWhere(filters: Array[Filter]): Unit = { + val filtered = data.map { + rows => + val newRows = filter(rows.rows, filters) + val newBufferedRows = new BufferedRows() + newBufferedRows.rows.appendAll(newRows) + newBufferedRows + }.filter(_.rows.nonEmpty) + replaceData(filtered) + } } private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {