From bc06ef62dce68c6c835bdab86f37f1bde2c09f9e Mon Sep 17 00:00:00 2001 From: xy_xin Date: Thu, 11 Jul 2019 18:29:07 +0800 Subject: [PATCH 01/16] Support DELETE in DataSource V2 --- .../spark/sql/catalyst/parser/SqlBase.g4 | 1 + .../sql/sources/v2/SupportsMaintenance.java | 35 +++ .../sql/sources/v2/maintain/Maintainer.java | 31 +++ .../v2/maintain/MaintainerBuilder.java | 40 ++++ .../sources/v2/maintain/SupportsDelete.java | 28 +++ .../sql/catalyst/parser/AstBuilder.scala | 9 + .../plans/logical/basicLogicalOperators.scala | 9 + .../v2/DataSourceV2Implicits.scala | 11 +- .../apache/spark/sql/sources/filters.scala | 32 +++ .../datasources/DataSourceStrategy.scala | 2 +- .../datasources/v2/DataSourceV2Strategy.scala | 10 +- .../datasources/v2/DeleteFromTableExec.scala | 49 +++++ .../datasources/v2/V2WriteSupportCheck.scala | 5 +- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 22 ++ .../sql/sources/v2/SimpleInMemoryTable.scala | 206 ++++++++++++++++++ .../sources/v2/TestInMemoryTableCatalog.scala | 2 +- 16 files changed, 487 insertions(+), 5 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index aa44e677e577b..5c23373af0a0d 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -214,6 +214,7 @@ statement | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration | RESET #resetConfiguration + | DELETE FROM multipartIdentifier tableAlias whereClause #deleteFromTable | unsupportedHiveNativeCommands .*? #failNativeCommand ; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java new file mode 100644 index 0000000000000..c4bf7d105efa1 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.v2.maintain.Maintainer; +import org.apache.spark.sql.sources.v2.maintain.MaintainerBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * A mix-in interface of {@link Table}, to indicate that it can be maintained. This adds + * {@link #newMaintainerBuilder(CaseInsensitiveStringMap)} that is used to create a + * {@link Maintainer} + */ +public interface SupportsMaintenance { + + /** + * Returns an {@link MaintainerBuilder} which can be used to + */ + MaintainerBuilder newMaintainerBuilder(CaseInsensitiveStringMap options); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java new file mode 100644 index 0000000000000..1c6b26074b259 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.maintain; + +/** + * A maintainer is an instance who execute the update/delete/merge, etc, should be + * implemented by datasource. + * + *

+ * A datasource maintainer could choose one or more mix-ins, like {@link SupportsDelete}, + * to enrich the feature of the datasource. + *

+ */ +public interface Maintainer { + +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java new file mode 100644 index 0000000000000..3df2b0934d697 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.maintain; + +import org.apache.spark.annotation.Evolving; + +/** + * An interface for build {@link Maintainer}. + */ +@Evolving +public interface MaintainerBuilder { + + /** + * Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. + * Some datasource may use this id to identify queries. + * + * @return a new builder with the `queryId`. By default it returns `this`, which means the given + * `queryId` is ignored. Please override this method to take the `queryId`. + */ + default MaintainerBuilder withQueryId(String queryId) { + return this; + } + + Maintainer build(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java new file mode 100644 index 0000000000000..25ccf0a68b9a3 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.maintain; + +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in for {@link Maintainer} which add delete support. + */ +public interface SupportsDelete extends Maintainer { + + void delete(Filter[] filters); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 49ca09d9ef076..50059d1172d19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -338,6 +338,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx) } + override def visitDeleteFromTable( + ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) { + + val tableId = visitMultipartIdentifier(ctx.multipartIdentifier) + val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId)) + + DeleteFromTable(table, expression(ctx.whereClause().booleanExpression())) + } + /** * Create a partition specification map. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d9c370af47fb8..e49642db42828 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -567,6 +567,15 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm override val output = DescribeTableSchema.describeTableAttributes() } +case class DeleteFromTable( + child: LogicalPlan, + condition: Expression) + extends Command { + + override def children: Seq[LogicalPlan] = child :: Nil + override def output: Seq[Attribute] = Seq.empty +} + /** * Drop a table. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index eed69cdc8cac6..0c39560ee1ccd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.sources.v2._ object DataSourceV2Implicits { implicit class TableHelper(table: Table) { @@ -40,6 +40,15 @@ object DataSourceV2Implicits { } } + def asMaintainable: SupportsMaintenance = { + table match { + case support: SupportsMaintenance => + support + case _ => + throw new AnalysisException(s"Table does not support delete: ${table.name}") + } + } + def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability) def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala index a1ab55a7185ce..ef71f1a3ea61f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -36,10 +36,18 @@ abstract class Filter { */ def references: Array[String] + def sql: String + protected def findReferences(value: Any): Array[String] = value match { case f: Filter => f.references case _ => Array.empty } + + protected def quoteIdentifier(name: String): String = { + // Escapes back-ticks within the identifier name with double-back-ticks, and then quote the + // identifier with back-ticks. + "`" + name.replace("`", "``") + "`" + } } /** @@ -50,6 +58,8 @@ abstract class Filter { */ @Stable case class EqualTo(attribute: String, value: Any) extends Filter { + def symbol: String = "=" + override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -62,6 +72,8 @@ case class EqualTo(attribute: String, value: Any) extends Filter { */ @Stable case class EqualNullSafe(attribute: String, value: Any) extends Filter { + def symbol: String = "<=>" + override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -73,6 +85,8 @@ case class EqualNullSafe(attribute: String, value: Any) extends Filter { */ @Stable case class GreaterThan(attribute: String, value: Any) extends Filter { + def symbol: String = ">" + override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -84,6 +98,8 @@ case class GreaterThan(attribute: String, value: Any) extends Filter { */ @Stable case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter { + def symbol: String = ">=" + override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -95,6 +111,8 @@ case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter { */ @Stable case class LessThan(attribute: String, value: Any) extends Filter { + def symbol: String = "<" + override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -106,6 +124,8 @@ case class LessThan(attribute: String, value: Any) extends Filter { */ @Stable case class LessThanOrEqual(attribute: String, value: Any) extends Filter { + def symbol: String = "<=" + override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -133,6 +153,8 @@ case class In(attribute: String, values: Array[Any]) extends Filter { s"In($attribute, [${values.mkString(",")}])" } + override def sql: String = s"${quoteIdentifier(attribute)} IN (${values.mkString(",")})" + override def references: Array[String] = Array(attribute) ++ values.flatMap(findReferences) } @@ -143,6 +165,7 @@ case class In(attribute: String, values: Array[Any]) extends Filter { */ @Stable case class IsNull(attribute: String) extends Filter { + override def sql: String = s"(${quoteIdentifier(attribute)} IS NULL)" override def references: Array[String] = Array(attribute) } @@ -153,6 +176,7 @@ case class IsNull(attribute: String) extends Filter { */ @Stable case class IsNotNull(attribute: String) extends Filter { + override def sql: String = s"(${quoteIdentifier(attribute)} IS NOT NULL)" override def references: Array[String] = Array(attribute) } @@ -163,6 +187,7 @@ case class IsNotNull(attribute: String) extends Filter { */ @Stable case class And(left: Filter, right: Filter) extends Filter { + override def sql: String = s"${left.sql} AND ${right.sql}" override def references: Array[String] = left.references ++ right.references } @@ -173,6 +198,7 @@ case class And(left: Filter, right: Filter) extends Filter { */ @Stable case class Or(left: Filter, right: Filter) extends Filter { + override def sql: String = s"${left.sql} OR ${right.sql}" override def references: Array[String] = left.references ++ right.references } @@ -183,6 +209,7 @@ case class Or(left: Filter, right: Filter) extends Filter { */ @Stable case class Not(child: Filter) extends Filter { + override def sql: String = s"NOT ${child.sql}" override def references: Array[String] = child.references } @@ -194,6 +221,7 @@ case class Not(child: Filter) extends Filter { */ @Stable case class StringStartsWith(attribute: String, value: String) extends Filter { + override def sql: String = s"${quoteIdentifier(attribute)} LIKE '${value.toString}%'" override def references: Array[String] = Array(attribute) } @@ -205,6 +233,7 @@ case class StringStartsWith(attribute: String, value: String) extends Filter { */ @Stable case class StringEndsWith(attribute: String, value: String) extends Filter { + override def sql: String = s"${quoteIdentifier(attribute)} LIKE '%${value.toString}'" override def references: Array[String] = Array(attribute) } @@ -216,6 +245,7 @@ case class StringEndsWith(attribute: String, value: String) extends Filter { */ @Stable case class StringContains(attribute: String, value: String) extends Filter { + override def sql: String = s"${quoteIdentifier(attribute)} LIKE '%${value.toString}%'" override def references: Array[String] = Array(attribute) } @@ -224,6 +254,7 @@ case class StringContains(attribute: String, value: String) extends Filter { */ @Evolving case class AlwaysTrue() extends Filter { + override def sql: String = "TRUE" override def references: Array[String] = Array.empty } @@ -236,6 +267,7 @@ object AlwaysTrue extends AlwaysTrue { */ @Evolving case class AlwaysFalse() extends Filter { + override def sql: String = "FALSE" override def references: Array[String] = Array.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b8bed8569ace0..158a4d1386b7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter => _, _} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 91fc2e068af70..1b7c47ba3ff87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalog.v2.StagingTableCatalog import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -222,6 +222,14 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) => OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil + case DeleteFromTable(r: DataSourceV2Relation, deleteExpr) => + // fail if any filter cannot be converted. correctness depends on removing all matching data. + val filters = splitConjunctivePredicates(deleteExpr).map { + filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( + throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) + }.toArray + DeleteFromTableExec(r.table.asMaintainable, r.options, filters, planLater(r)) :: Nil + case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala new file mode 100644 index 0000000000000..8463884b127f3 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.SupportsMaintenance +import org.apache.spark.sql.sources.v2.maintain.SupportsDelete +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class DeleteFromTableExec( + table: SupportsMaintenance, + options: CaseInsensitiveStringMap, + deleteWhere: Array[Filter], + query: SparkPlan) extends UnaryExecNode { + + override protected def doExecute(): RDD[InternalRow] = { + table.newMaintainerBuilder(options).build() match { + case maintainer: SupportsDelete => + maintainer.delete(deleteWhere) + case _ => + throw new SparkException(s"Table does not support delete: $table") + } + + sparkContext.emptyRDD + } + + override def child: SparkPlan = query + override def output: Seq[Attribute] = Nil +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala index cf77998c122f8..55f6dad7e8d6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.BooleanType @@ -51,6 +51,9 @@ object V2WriteSupportCheck extends (LogicalPlan => Unit) { } } + case DeleteFromTable(relation, _) if !relation.isInstanceOf[DataSourceV2Relation] => + failAnalysis(s"Delete is not supported for table: ${relation.toString}") + case _ => // OK } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 9ae51d577b562..7b098f4702747 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -51,12 +51,17 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn df.createOrReplaceTempView("source") val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") df2.createOrReplaceTempView("source2") + val df3 = spark.createDataFrame(Seq((2L, "b"), (3L, "c"))).toDF("id", "data") + df3.createOrReplaceTempView("source3") } after { spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.catalog("testcat_atomic").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.catalog("session").asInstanceOf[TestInMemoryTableCatalog].clearTables() + spark.sql("DROP TABLE source") + spark.sql("DROP TABLE source2") + spark.sql("DROP TABLE source3") } test("CreateTable: use v2 plan because catalog is set") { @@ -497,6 +502,23 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn checkAnswer(sparkSession.sql(s"TABLE session.table_name"), sparkSession.table("source")) } + test("DeleteFromTable: basic") { + spark.sql( + "CREATE TABLE IF NOT EXISTS testcat.table_name USING foo AS SELECT id, data FROM source") + + val testCatalog = spark.catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + + val deleted = spark.sql("DELETE FROM testcat.table_name where id=1") + val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + + val rdd2 = spark.sparkContext.parallelize(table2.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd2, table2.schema), spark.table("source3")) + } + test("DropTable: basic") { val tableName = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala new file mode 100644 index 0000000000000..fd459becbc5dd --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Literal, Not} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.maintain.{Maintainer, MaintainerBuilder, SupportsDelete} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String + + +/** + * A simple in-memory table. Rows are stored as a buffered group produced by each output task. + */ +private[v2] class InMemoryTable( + val name: String, + val schema: StructType, + override val properties: util.Map[String, String]) + extends Table with SupportsRead with SupportsWrite with SupportsMaintenance { + + def this( + name: String, + schema: StructType, + properties: util.Map[String, String], + data: Array[BufferedRows]) = { + this(name, schema, properties) + replaceData(data) + } + + def rows: Seq[InternalRow] = data.flatMap(_.rows) + + @volatile var data: Array[BufferedRows] = Array.empty + + def replaceData(buffers: Array[BufferedRows]): Unit = synchronized { + data = buffers + } + + override def capabilities: util.Set[TableCapability] = Set( + TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE).asJava + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + () => new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition])) + } + + class InMemoryBatchScan(data: Array[InputPartition]) extends Scan with Batch { + override def readSchema(): StructType = schema + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = data + + override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory + } + + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { + new WriteBuilder with SupportsTruncate { + private var shouldTruncate: Boolean = false + + override def truncate(): WriteBuilder = { + shouldTruncate = true + this + } + + override def buildForBatch(): BatchWrite = { + if (shouldTruncate) TruncateAndAppend else Append + } + } + } + + override def newMaintainerBuilder(options: CaseInsensitiveStringMap): MaintainerBuilder = { + () => { + Delete + } + } + + private object Delete extends Maintainer with SupportsDelete { + + override def delete(filters: Array[Filter]): Unit = { + val filtered = data.map { + rows => + val newRows = filter(rows.rows, filters) + val newBufferedRows = new BufferedRows() + newBufferedRows.rows.appendAll(newRows) + newBufferedRows + }.filter(_.rows.nonEmpty) + replaceData(filtered) + } + } + + def filter(rows: mutable.ArrayBuffer[InternalRow], + filters: Array[Filter]): Array[InternalRow] = { + if (rows.isEmpty) { + rows.toArray + } + val filterStr = + filters.map { + filter => filter.sql + }.toList.mkString("AND") + val sparkSession = SparkSession.getActiveSession.getOrElse( + throw new RuntimeException("Could not get active sparkSession.") + ) + val filterExpr = sparkSession.sessionState.sqlParser.parseExpression(filterStr) + val antiFilter = Not(EqualNullSafe(filterExpr, Literal(true, BooleanType))) + val rdd = sparkSession.sparkContext.parallelize(rows) + + sparkSession.internalCreateDataFrame(rdd, schema) + .filter(Column(antiFilter)).collect().map { + row => + val values = row.toSeq.map { + case s: String => UTF8String.fromBytes(s.asInstanceOf[String].getBytes("UTF-8")) + case other => other + } + InternalRow.fromSeq(values) + } + } + + private object TruncateAndAppend extends BatchWrite { + override def createBatchWriterFactory(): DataWriterFactory = { + BufferedRowsWriterFactory + } + + override def commit(messages: Array[WriterCommitMessage]): Unit = { + replaceData(messages.map(_.asInstanceOf[BufferedRows])) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + } + } + + private object Append extends BatchWrite { + override def createBatchWriterFactory(): DataWriterFactory = { + BufferedRowsWriterFactory + } + + override def commit(messages: Array[WriterCommitMessage]): Unit = { + replaceData(data ++ messages.map(_.asInstanceOf[BufferedRows])) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + } + } +} + +private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { + val rows = new mutable.ArrayBuffer[InternalRow]() +} + +private object BufferedRowsReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + new BufferedRowsReader(partition.asInstanceOf[BufferedRows]) + } +} + +private class BufferedRowsReader(partition: BufferedRows) extends PartitionReader[InternalRow] { + private var index: Int = -1 + + override def next(): Boolean = { + index += 1 + index < partition.rows.length + } + + override def get(): InternalRow = partition.rows(index) + + override def close(): Unit = {} +} + +private object BufferedRowsWriterFactory extends DataWriterFactory { + override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = { + new BufferWriter + } +} + +private class BufferWriter extends DataWriter[InternalRow] { + private val buffer = new BufferedRows + + override def write(row: InternalRow): Unit = buffer.rows.append(row.copy()) + + override def commit(): WriterCommitMessage = buffer + + override def abort(): Unit = {} +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 7c51a29bde905..d4d96fb4ac5b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -21,7 +21,6 @@ import java.util import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, StagingTableCatalog, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.{IdentityTransform, Transform} @@ -31,6 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.sources.{And, EqualTo, Filter} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap From 5c2590c2f11db71a2957de2cd0f774c76f388ffb Mon Sep 17 00:00:00 2001 From: xy_xin Date: Thu, 11 Jul 2019 19:38:37 +0800 Subject: [PATCH 02/16] Fix a error msg in DataSourceV2Implicits#asMaintainable --- .../sql/execution/datasources/v2/DataSourceV2Implicits.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index 0c39560ee1ccd..adc7af1e80fe5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -45,7 +45,7 @@ object DataSourceV2Implicits { case support: SupportsMaintenance => support case _ => - throw new AnalysisException(s"Table does not support delete: ${table.name}") + throw new AnalysisException(s"Table does not support maintenance: ${table.name}") } } From 634f7c7b44c7af0f7afb8a45decc4124dcc299d6 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Mon, 15 Jul 2019 17:39:15 +0800 Subject: [PATCH 03/16] Fix potential syntax error of sources.Filters --- .../main/scala/org/apache/spark/sql/sources/filters.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala index ef71f1a3ea61f..3b389339d8526 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -187,7 +187,7 @@ case class IsNotNull(attribute: String) extends Filter { */ @Stable case class And(left: Filter, right: Filter) extends Filter { - override def sql: String = s"${left.sql} AND ${right.sql}" + override def sql: String = s"(${left.sql}) AND (${right.sql})" override def references: Array[String] = left.references ++ right.references } @@ -198,7 +198,7 @@ case class And(left: Filter, right: Filter) extends Filter { */ @Stable case class Or(left: Filter, right: Filter) extends Filter { - override def sql: String = s"${left.sql} OR ${right.sql}" + override def sql: String = s"(${left.sql}) OR (${right.sql})" override def references: Array[String] = left.references ++ right.references } @@ -209,7 +209,7 @@ case class Or(left: Filter, right: Filter) extends Filter { */ @Stable case class Not(child: Filter) extends Filter { - override def sql: String = s"NOT ${child.sql}" + override def sql: String = s"NOT (${child.sql})" override def references: Array[String] = child.references } From 254c2cfd079092acb3358ed9d677cb1db3b9511b Mon Sep 17 00:00:00 2001 From: xy_xin Date: Mon, 29 Jul 2019 19:17:14 +0800 Subject: [PATCH 04/16] Make SupportsDelete to be simple mix-in of DSV2 --- .../spark/sql/sources/v2/SupportsDelete.java | 39 ++++ .../sql/sources/v2/SupportsMaintenance.java | 35 --- .../sql/sources/v2/maintain/Maintainer.java | 31 --- .../v2/maintain/MaintainerBuilder.java | 40 ---- .../sources/v2/maintain/SupportsDelete.java | 28 --- .../v2/DataSourceV2Implicits.scala | 6 +- .../datasources/v2/DataSourceV2Strategy.scala | 2 +- .../datasources/v2/DeleteFromTableExec.scala | 13 +- .../sql/sources/v2/SimpleInMemoryTable.scala | 206 ------------------ .../sources/v2/TestInMemoryTableCatalog.scala | 48 +++- 10 files changed, 91 insertions(+), 357 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java new file mode 100644 index 0000000000000..39fa9275c664a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.Filter; + +public interface SupportsDelete { + /** + * Delete data from a data source table that matches filter expressions. + *

+ * Rows are deleted from the data source iff all of the filter expressions match. That is, the + * expressions must be interpreted as a set of filters that are ANDed together. + *

+ * Implementations may reject a delete operation if the delete isn't possible without significant + * effort. For example, partitioned data sources may reject deletes that do not filter by + * partition columns because the filter may require rewriting files without deleted records. + * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear + * error message that identifies which expression was rejected. + * + * @param filters filter expressions, used to select rows to delete when all expressions match + * @throws IllegalArgumentException If the delete is rejected due to required effort + */ + void deleteWhere(Filter[] filters); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java deleted file mode 100644 index c4bf7d105efa1..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsMaintenance.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.sql.sources.v2.maintain.Maintainer; -import org.apache.spark.sql.sources.v2.maintain.MaintainerBuilder; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; - -/** - * A mix-in interface of {@link Table}, to indicate that it can be maintained. This adds - * {@link #newMaintainerBuilder(CaseInsensitiveStringMap)} that is used to create a - * {@link Maintainer} - */ -public interface SupportsMaintenance { - - /** - * Returns an {@link MaintainerBuilder} which can be used to - */ - MaintainerBuilder newMaintainerBuilder(CaseInsensitiveStringMap options); -} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java deleted file mode 100644 index 1c6b26074b259..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/Maintainer.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.maintain; - -/** - * A maintainer is an instance who execute the update/delete/merge, etc, should be - * implemented by datasource. - * - *

- * A datasource maintainer could choose one or more mix-ins, like {@link SupportsDelete}, - * to enrich the feature of the datasource. - *

- */ -public interface Maintainer { - -} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java deleted file mode 100644 index 3df2b0934d697..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/MaintainerBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.maintain; - -import org.apache.spark.annotation.Evolving; - -/** - * An interface for build {@link Maintainer}. - */ -@Evolving -public interface MaintainerBuilder { - - /** - * Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. - * Some datasource may use this id to identify queries. - * - * @return a new builder with the `queryId`. By default it returns `this`, which means the given - * `queryId` is ignored. Please override this method to take the `queryId`. - */ - default MaintainerBuilder withQueryId(String queryId) { - return this; - } - - Maintainer build(); -} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java deleted file mode 100644 index 25ccf0a68b9a3..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/maintain/SupportsDelete.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.maintain; - -import org.apache.spark.sql.sources.Filter; - -/** - * A mix-in for {@link Maintainer} which add delete support. - */ -public interface SupportsDelete extends Maintainer { - - void delete(Filter[] filters); -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index adc7af1e80fe5..ded70a29d470d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -40,12 +40,12 @@ object DataSourceV2Implicits { } } - def asMaintainable: SupportsMaintenance = { + def asDeletable: SupportsDelete = { table match { - case support: SupportsMaintenance => + case support: SupportsDelete => support case _ => - throw new AnalysisException(s"Table does not support maintenance: ${table.name}") + throw new AnalysisException(s"Table does not support deletes: ${table.name}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 1b7c47ba3ff87..51dfdfc8bee6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -228,7 +228,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) }.toArray - DeleteFromTableExec(r.table.asMaintainable, r.options, filters, planLater(r)) :: Nil + DeleteFromTableExec(r.table.asDeletable, r.options, filters, planLater(r)) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala index 8463884b127f3..03c6f69fb3918 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -17,30 +17,23 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.sources.v2.SupportsMaintenance -import org.apache.spark.sql.sources.v2.maintain.SupportsDelete +import org.apache.spark.sql.sources.v2.SupportsDelete import org.apache.spark.sql.util.CaseInsensitiveStringMap case class DeleteFromTableExec( - table: SupportsMaintenance, + table: SupportsDelete, options: CaseInsensitiveStringMap, deleteWhere: Array[Filter], query: SparkPlan) extends UnaryExecNode { override protected def doExecute(): RDD[InternalRow] = { - table.newMaintainerBuilder(options).build() match { - case maintainer: SupportsDelete => - maintainer.delete(deleteWhere) - case _ => - throw new SparkException(s"Table does not support delete: $table") - } + table.deleteWhere(deleteWhere) sparkContext.emptyRDD } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala deleted file mode 100644 index fd459becbc5dd..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleInMemoryTable.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2 - -import java.util - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.apache.spark.sql.{Column, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Literal, Not} -import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.sources.v2.maintain.{Maintainer, MaintainerBuilder, SupportsDelete} -import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.spark.unsafe.types.UTF8String - - -/** - * A simple in-memory table. Rows are stored as a buffered group produced by each output task. - */ -private[v2] class InMemoryTable( - val name: String, - val schema: StructType, - override val properties: util.Map[String, String]) - extends Table with SupportsRead with SupportsWrite with SupportsMaintenance { - - def this( - name: String, - schema: StructType, - properties: util.Map[String, String], - data: Array[BufferedRows]) = { - this(name, schema, properties) - replaceData(data) - } - - def rows: Seq[InternalRow] = data.flatMap(_.rows) - - @volatile var data: Array[BufferedRows] = Array.empty - - def replaceData(buffers: Array[BufferedRows]): Unit = synchronized { - data = buffers - } - - override def capabilities: util.Set[TableCapability] = Set( - TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE).asJava - - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - () => new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition])) - } - - class InMemoryBatchScan(data: Array[InputPartition]) extends Scan with Batch { - override def readSchema(): StructType = schema - - override def toBatch: Batch = this - - override def planInputPartitions(): Array[InputPartition] = data - - override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory - } - - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { - new WriteBuilder with SupportsTruncate { - private var shouldTruncate: Boolean = false - - override def truncate(): WriteBuilder = { - shouldTruncate = true - this - } - - override def buildForBatch(): BatchWrite = { - if (shouldTruncate) TruncateAndAppend else Append - } - } - } - - override def newMaintainerBuilder(options: CaseInsensitiveStringMap): MaintainerBuilder = { - () => { - Delete - } - } - - private object Delete extends Maintainer with SupportsDelete { - - override def delete(filters: Array[Filter]): Unit = { - val filtered = data.map { - rows => - val newRows = filter(rows.rows, filters) - val newBufferedRows = new BufferedRows() - newBufferedRows.rows.appendAll(newRows) - newBufferedRows - }.filter(_.rows.nonEmpty) - replaceData(filtered) - } - } - - def filter(rows: mutable.ArrayBuffer[InternalRow], - filters: Array[Filter]): Array[InternalRow] = { - if (rows.isEmpty) { - rows.toArray - } - val filterStr = - filters.map { - filter => filter.sql - }.toList.mkString("AND") - val sparkSession = SparkSession.getActiveSession.getOrElse( - throw new RuntimeException("Could not get active sparkSession.") - ) - val filterExpr = sparkSession.sessionState.sqlParser.parseExpression(filterStr) - val antiFilter = Not(EqualNullSafe(filterExpr, Literal(true, BooleanType))) - val rdd = sparkSession.sparkContext.parallelize(rows) - - sparkSession.internalCreateDataFrame(rdd, schema) - .filter(Column(antiFilter)).collect().map { - row => - val values = row.toSeq.map { - case s: String => UTF8String.fromBytes(s.asInstanceOf[String].getBytes("UTF-8")) - case other => other - } - InternalRow.fromSeq(values) - } - } - - private object TruncateAndAppend extends BatchWrite { - override def createBatchWriterFactory(): DataWriterFactory = { - BufferedRowsWriterFactory - } - - override def commit(messages: Array[WriterCommitMessage]): Unit = { - replaceData(messages.map(_.asInstanceOf[BufferedRows])) - } - - override def abort(messages: Array[WriterCommitMessage]): Unit = { - } - } - - private object Append extends BatchWrite { - override def createBatchWriterFactory(): DataWriterFactory = { - BufferedRowsWriterFactory - } - - override def commit(messages: Array[WriterCommitMessage]): Unit = { - replaceData(data ++ messages.map(_.asInstanceOf[BufferedRows])) - } - - override def abort(messages: Array[WriterCommitMessage]): Unit = { - } - } -} - -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { - val rows = new mutable.ArrayBuffer[InternalRow]() -} - -private object BufferedRowsReaderFactory extends PartitionReaderFactory { - override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - new BufferedRowsReader(partition.asInstanceOf[BufferedRows]) - } -} - -private class BufferedRowsReader(partition: BufferedRows) extends PartitionReader[InternalRow] { - private var index: Int = -1 - - override def next(): Boolean = { - index += 1 - index < partition.rows.length - } - - override def get(): InternalRow = partition.rows(index) - - override def close(): Unit = {} -} - -private object BufferedRowsWriterFactory extends DataWriterFactory { - override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = { - new BufferWriter - } -} - -private class BufferWriter extends DataWriter[InternalRow] { - private val buffer = new BufferedRows - - override def write(row: InternalRow): Unit = buffer.rows.append(row.copy()) - - override def commit(): WriterCommitMessage = buffer - - override def abort(): Unit = {} -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index d4d96fb4ac5b2..6a1b8949650ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -21,18 +21,21 @@ import java.util import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ +import scala.collection.mutable +import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, StagingTableCatalog, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.{IdentityTransform, Transform} import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Literal, Not} import org.apache.spark.sql.sources.{And, EqualTo, Filter} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{BooleanType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String // this is currently in the spark-sql module because the read and write API is not in catalyst // TODO(rdblue): when the v2 source API is in catalyst, merge with TestTableCatalog/InMemoryTable @@ -117,7 +120,7 @@ class InMemoryTable( val schema: StructType, override val partitioning: Array[Transform], override val properties: util.Map[String, String]) - extends Table with SupportsRead with SupportsWrite { + extends Table with SupportsRead with SupportsWrite with SupportsDelete { partitioning.foreach { t => if (!t.isInstanceOf[IdentityTransform]) { @@ -251,6 +254,45 @@ class InMemoryTable( withData(messages.map(_.asInstanceOf[BufferedRows])) } } + + override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { + val filtered = data.map { + rows => + val newRows = filter(rows.rows, filters) + val newBufferedRows = new BufferedRows() + newBufferedRows.rows.appendAll(newRows) + newBufferedRows + }.filter(_.rows.nonEmpty) + dataMap.clear() + withData(filtered) + } + + def filter(rows: mutable.ArrayBuffer[InternalRow], + filters: Array[Filter]): Array[InternalRow] = { + if (rows.isEmpty) { + rows.toArray + } + val filterStr = + filters.map { + filter => filter.sql + }.toList.mkString("AND") + val sparkSession = SparkSession.getActiveSession.getOrElse( + throw new RuntimeException("Could not get active sparkSession.") + ) + val filterExpr = sparkSession.sessionState.sqlParser.parseExpression(filterStr) + val antiFilter = Not(EqualNullSafe(filterExpr, Literal(true, BooleanType))) + val rdd = sparkSession.sparkContext.parallelize(rows) + + sparkSession.internalCreateDataFrame(rdd, schema) + .filter(Column(antiFilter)).collect().map { + row => + val values = row.toSeq.map { + case s: String => UTF8String.fromBytes(s.asInstanceOf[String].getBytes("UTF-8")) + case other => other + } + InternalRow.fromSeq(values) + } + } } object TestInMemoryTableCatalog { From ba5555cc0fb3286eec7f8f0e91ac1d832043d49f Mon Sep 17 00:00:00 2001 From: xy_xin Date: Tue, 30 Jul 2019 13:45:21 +0800 Subject: [PATCH 05/16] Revert imports --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 158a4d1386b7a..b8bed8569ace0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter => _, _} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ From d30969b193aa3fec19e5afe15c88a1d06ecbd09b Mon Sep 17 00:00:00 2001 From: xy_xin Date: Wed, 31 Jul 2019 12:30:47 +0800 Subject: [PATCH 06/16] Update according to reviews --- .../spark/sql/sources/v2/SupportsDelete.java | 7 +++ .../sql/catalyst/parser/AstBuilder.scala | 11 +++- .../logical/sql/DeleteFromStatement.scala | 32 +++++++++++ .../datasources/DataSourceResolution.scala | 31 ++++++++++- .../datasources/v2/DataSourceV2Strategy.scala | 2 +- .../datasources/v2/DeleteFromTableExec.scala | 7 +-- .../datasources/v2/V2WriteSupportCheck.scala | 3 - .../sql/sources/v2/DataSourceV2SQLSuite.scala | 33 ++++------- .../sources/v2/TestInMemoryTableCatalog.scala | 55 +++++++------------ 9 files changed, 109 insertions(+), 72 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java index 39fa9275c664a..daf3c2164536a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java @@ -19,6 +19,13 @@ import org.apache.spark.sql.sources.Filter; +/** + * A mix-in interface for {@link Table} delete support. Data sources can implement this + * interface to provide the ability to delete data from tables that matches filter expressions. + *

+ * Data sources must implement this interface to support logical operations that combine writing + * data with deleting data, like overwriting partitions. + */ public interface SupportsDelete { /** * Delete data from a data source table that matches filter expressions. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 50059d1172d19..179345225d904 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -342,9 +342,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) { val tableId = visitMultipartIdentifier(ctx.multipartIdentifier) - val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId)) + val tableAlias = if (ctx.tableAlias() != null) { + val ident = ctx.tableAlias().strictIdentifier() + if (ident != null) { Some(ident.getText) } else { None } + } else { + None + } - DeleteFromTable(table, expression(ctx.whereClause().booleanExpression())) + DeleteFromStatement(tableId, tableAlias, expression(ctx.whereClause().booleanExpression())) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala new file mode 100644 index 0000000000000..0391781f360a1 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +case class DeleteFromStatement( + tableName: Seq[String], + tableAlias: Option[String], + condition: Expression) + extends ParsedStatement { + + override def output: Seq[Attribute] = Seq.empty + + override def children: Seq[LogicalPlan] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index a150a049f33e1..6e4262f8529b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -24,11 +24,12 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} @@ -173,6 +174,19 @@ case class DataSourceResolution( // only top-level adds are supported using AlterTableAddColumnsCommand AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) + case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) => + throw new AnalysisException( + s"Delete from tables is not supported using the legacy / v1 Spark external catalog" + + s" API. Identifier: $table.") + + case delete: DeleteFromStatement => + val CatalogObjectIdentifier(maybeCatalog, identifier) = delete.tableName + val catalog = maybeCatalog.orElse(defaultCatalog) + .getOrElse(throw new AnalysisException( + s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) + .asTableCatalog + convertDeleteFrom(catalog.asTableCatalog, identifier, delete) + case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) => UnresolvedCatalogRelation(catalogTable) @@ -309,6 +323,17 @@ case class DataSourceResolution( orCreate = replace.orCreate) } + private def convertDeleteFrom( + catalog: TableCatalog, + identifier: Identifier, + delete: DeleteFromStatement): DeleteFromTable = { + val relation = CatalogV2Util.loadTable(catalog, identifier) + .map(DataSourceV2Relation.create).getOrElse(UnresolvedRelation(delete.tableName)) + DeleteFromTable( + delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation), + delete.condition) + } + private def convertTableProperties( properties: Map[String, String], options: Map[String, String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 51dfdfc8bee6f..912158d887a00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -228,7 +228,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) }.toArray - DeleteFromTableExec(r.table.asDeletable, r.options, filters, planLater(r)) :: Nil + DeleteFromTableExec(r.table.asDeletable, r.options, filters) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala index 03c6f69fb3918..297596079d26a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.sources.v2.SupportsDelete import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -28,15 +28,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap case class DeleteFromTableExec( table: SupportsDelete, options: CaseInsensitiveStringMap, - deleteWhere: Array[Filter], - query: SparkPlan) extends UnaryExecNode { + deleteWhere: Array[Filter]) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = { - table.deleteWhere(deleteWhere) sparkContext.emptyRDD } - override def child: SparkPlan = query override def output: Seq[Attribute] = Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala index 55f6dad7e8d6d..388f2f3a2c9fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala @@ -51,9 +51,6 @@ object V2WriteSupportCheck extends (LogicalPlan => Unit) { } } - case DeleteFromTable(relation, _) if !relation.isInstanceOf[DataSourceV2Relation] => - failAnalysis(s"Delete is not supported for table: ${relation.toString}") - case _ => // OK } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 7b098f4702747..3a71977f68a8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -51,17 +51,12 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn df.createOrReplaceTempView("source") val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") df2.createOrReplaceTempView("source2") - val df3 = spark.createDataFrame(Seq((2L, "b"), (3L, "c"))).toDF("id", "data") - df3.createOrReplaceTempView("source3") } after { spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.catalog("testcat_atomic").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.catalog("session").asInstanceOf[TestInMemoryTableCatalog].clearTables() - spark.sql("DROP TABLE source") - spark.sql("DROP TABLE source2") - spark.sql("DROP TABLE source3") } test("CreateTable: use v2 plan because catalog is set") { @@ -502,23 +497,6 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn checkAnswer(sparkSession.sql(s"TABLE session.table_name"), sparkSession.table("source")) } - test("DeleteFromTable: basic") { - spark.sql( - "CREATE TABLE IF NOT EXISTS testcat.table_name USING foo AS SELECT id, data FROM source") - - val testCatalog = spark.catalog("testcat").asTableCatalog - val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) - - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) - - val deleted = spark.sql("DELETE FROM testcat.table_name where id=1") - val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name")) - - val rdd2 = spark.sparkContext.parallelize(table2.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd2, table2.schema), spark.table("source3")) - } - test("DropTable: basic") { val tableName = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") @@ -1884,6 +1862,17 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } } + test("DeleteFromTable: basic") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + sql(s"DELETE FROM $t WHERE id=2") + checkAnswer(spark.table(t), Seq( + Row(3, "c", 3))) + } + } + private def testCreateAnalysisError(sqlStatement: String, expectedError: String): Unit = { val errMsg = intercept[AnalysisException] { sql(sqlStatement) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 6a1b8949650ba..41c46bd113923 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -255,43 +255,28 @@ class InMemoryTable( } } - override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { - val filtered = data.map { - rows => - val newRows = filter(rows.rows, filters) - val newBufferedRows = new BufferedRows() - newBufferedRows.rows.appendAll(newRows) - newBufferedRows - }.filter(_.rows.nonEmpty) - dataMap.clear() - withData(filtered) - } - - def filter(rows: mutable.ArrayBuffer[InternalRow], - filters: Array[Filter]): Array[InternalRow] = { - if (rows.isEmpty) { - rows.toArray + private def splitAnd(filter: Filter): Seq[Filter] = { + filter match { + case And(left, right) => splitAnd(left) ++ splitAnd(right) + case _ => filter :: Nil } - val filterStr = - filters.map { - filter => filter.sql - }.toList.mkString("AND") - val sparkSession = SparkSession.getActiveSession.getOrElse( - throw new RuntimeException("Could not get active sparkSession.") - ) - val filterExpr = sparkSession.sessionState.sqlParser.parseExpression(filterStr) - val antiFilter = Not(EqualNullSafe(filterExpr, Literal(true, BooleanType))) - val rdd = sparkSession.sparkContext.parallelize(rows) - - sparkSession.internalCreateDataFrame(rdd, schema) - .filter(Column(antiFilter)).collect().map { - row => - val values = row.toSeq.map { - case s: String => UTF8String.fromBytes(s.asInstanceOf[String].getBytes("UTF-8")) - case other => other - } - InternalRow.fromSeq(values) + } + + override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { + val deleteKeys = dataMap.keys.filter { partValues => + filters.flatMap(splitAnd).forall { + case EqualTo(attr, value) => + partFieldNames.zipWithIndex.find(_._1 == attr) match { + case Some((_, partIndex)) => + value == partValues(partIndex) + case _ => + throw new IllegalArgumentException(s"Unknown filter attribute: $attr") + } + case f => + throw new IllegalArgumentException(s"Unsupported filter type: $f") + } } + dataMap --= deleteKeys } } From 06b12be70095df4a8fc5922b8ca74a7c22921527 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Thu, 1 Aug 2019 13:01:06 +0800 Subject: [PATCH 07/16] Remove Filter.sql --- .../apache/spark/sql/sources/filters.scala | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala index 3b389339d8526..a1ab55a7185ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -36,18 +36,10 @@ abstract class Filter { */ def references: Array[String] - def sql: String - protected def findReferences(value: Any): Array[String] = value match { case f: Filter => f.references case _ => Array.empty } - - protected def quoteIdentifier(name: String): String = { - // Escapes back-ticks within the identifier name with double-back-ticks, and then quote the - // identifier with back-ticks. - "`" + name.replace("`", "``") + "`" - } } /** @@ -58,8 +50,6 @@ abstract class Filter { */ @Stable case class EqualTo(attribute: String, value: Any) extends Filter { - def symbol: String = "=" - override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -72,8 +62,6 @@ case class EqualTo(attribute: String, value: Any) extends Filter { */ @Stable case class EqualNullSafe(attribute: String, value: Any) extends Filter { - def symbol: String = "<=>" - override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -85,8 +73,6 @@ case class EqualNullSafe(attribute: String, value: Any) extends Filter { */ @Stable case class GreaterThan(attribute: String, value: Any) extends Filter { - def symbol: String = ">" - override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -98,8 +84,6 @@ case class GreaterThan(attribute: String, value: Any) extends Filter { */ @Stable case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter { - def symbol: String = ">=" - override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -111,8 +95,6 @@ case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter { */ @Stable case class LessThan(attribute: String, value: Any) extends Filter { - def symbol: String = "<" - override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -124,8 +106,6 @@ case class LessThan(attribute: String, value: Any) extends Filter { */ @Stable case class LessThanOrEqual(attribute: String, value: Any) extends Filter { - def symbol: String = "<=" - override def sql: String = s"(${quoteIdentifier(attribute)} $symbol ${value.toString})" override def references: Array[String] = Array(attribute) ++ findReferences(value) } @@ -153,8 +133,6 @@ case class In(attribute: String, values: Array[Any]) extends Filter { s"In($attribute, [${values.mkString(",")}])" } - override def sql: String = s"${quoteIdentifier(attribute)} IN (${values.mkString(",")})" - override def references: Array[String] = Array(attribute) ++ values.flatMap(findReferences) } @@ -165,7 +143,6 @@ case class In(attribute: String, values: Array[Any]) extends Filter { */ @Stable case class IsNull(attribute: String) extends Filter { - override def sql: String = s"(${quoteIdentifier(attribute)} IS NULL)" override def references: Array[String] = Array(attribute) } @@ -176,7 +153,6 @@ case class IsNull(attribute: String) extends Filter { */ @Stable case class IsNotNull(attribute: String) extends Filter { - override def sql: String = s"(${quoteIdentifier(attribute)} IS NOT NULL)" override def references: Array[String] = Array(attribute) } @@ -187,7 +163,6 @@ case class IsNotNull(attribute: String) extends Filter { */ @Stable case class And(left: Filter, right: Filter) extends Filter { - override def sql: String = s"(${left.sql}) AND (${right.sql})" override def references: Array[String] = left.references ++ right.references } @@ -198,7 +173,6 @@ case class And(left: Filter, right: Filter) extends Filter { */ @Stable case class Or(left: Filter, right: Filter) extends Filter { - override def sql: String = s"(${left.sql}) OR (${right.sql})" override def references: Array[String] = left.references ++ right.references } @@ -209,7 +183,6 @@ case class Or(left: Filter, right: Filter) extends Filter { */ @Stable case class Not(child: Filter) extends Filter { - override def sql: String = s"NOT (${child.sql})" override def references: Array[String] = child.references } @@ -221,7 +194,6 @@ case class Not(child: Filter) extends Filter { */ @Stable case class StringStartsWith(attribute: String, value: String) extends Filter { - override def sql: String = s"${quoteIdentifier(attribute)} LIKE '${value.toString}%'" override def references: Array[String] = Array(attribute) } @@ -233,7 +205,6 @@ case class StringStartsWith(attribute: String, value: String) extends Filter { */ @Stable case class StringEndsWith(attribute: String, value: String) extends Filter { - override def sql: String = s"${quoteIdentifier(attribute)} LIKE '%${value.toString}'" override def references: Array[String] = Array(attribute) } @@ -245,7 +216,6 @@ case class StringEndsWith(attribute: String, value: String) extends Filter { */ @Stable case class StringContains(attribute: String, value: String) extends Filter { - override def sql: String = s"${quoteIdentifier(attribute)} LIKE '%${value.toString}%'" override def references: Array[String] = Array(attribute) } @@ -254,7 +224,6 @@ case class StringContains(attribute: String, value: String) extends Filter { */ @Evolving case class AlwaysTrue() extends Filter { - override def sql: String = "TRUE" override def references: Array[String] = Array.empty } @@ -267,7 +236,6 @@ object AlwaysTrue extends AlwaysTrue { */ @Evolving case class AlwaysFalse() extends Filter { - override def sql: String = "FALSE" override def references: Array[String] = Array.empty } From 527137775b1c1e8fd68e413278b4618d56c650f0 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Thu, 1 Aug 2019 16:13:57 +0800 Subject: [PATCH 08/16] Fail if delete contains subquery --- .../plans/logical/basicLogicalOperators.scala | 4 +-- .../datasources/DataSourceResolution.scala | 8 ++--- .../datasources/v2/DataSourceV2Strategy.scala | 9 +++--- .../datasources/v2/V2WriteSupportCheck.scala | 7 ++++- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 30 +++++++++++++++++-- .../sources/v2/TestInMemoryTableCatalog.scala | 5 ++-- 6 files changed, 48 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index e49642db42828..bea7c2a638460 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -569,10 +569,10 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm case class DeleteFromTable( child: LogicalPlan, - condition: Expression) + condition: Filter) extends Command { - override def children: Seq[LogicalPlan] = child :: Nil + override def children: Seq[LogicalPlan] = child :: condition :: Nil override def output: Seq[Attribute] = Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 6e4262f8529b9..85ce31138546f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias} import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} @@ -329,9 +329,9 @@ case class DataSourceResolution( delete: DeleteFromStatement): DeleteFromTable = { val relation = CatalogV2Util.loadTable(catalog, identifier) .map(DataSourceV2Relation.create).getOrElse(UnresolvedRelation(delete.tableName)) - DeleteFromTable( - delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation), - delete.condition) + val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation) + val filter = Filter(delete.condition, aliased) + DeleteFromTable(aliased, filter) } private def convertTableProperties( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 912158d887a00..45f21a84b439e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -222,11 +222,12 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) => OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil - case DeleteFromTable(r: DataSourceV2Relation, deleteExpr) => + case DeleteFromTable(r: DataSourceV2Relation, filter) => // fail if any filter cannot be converted. correctness depends on removing all matching data. - val filters = splitConjunctivePredicates(deleteExpr).map { - filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( - throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) + val filters = splitConjunctivePredicates(filter.condition).map { + f => DataSourceStrategy.translateFilter(f).getOrElse( + throw new AnalysisException(s"Exec delete failed:" + + s" cannot translate expression to source filter: $f")) }.toArray DeleteFromTableExec(r.table.asDeletable, r.options, filters) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala index 388f2f3a2c9fd..3405db3f2b707 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.{Literal, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.BooleanType @@ -51,6 +51,11 @@ object V2WriteSupportCheck extends (LogicalPlan => Unit) { } } + case DeleteFromTable(_, filter) => + if (SubqueryExpression.hasSubquery(filter.condition)) { + failAnalysis(s"Delete by condition with subquery is not supported: $filter") + } + case _ => // OK } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 3a71977f68a8b..3a7c8903f4dc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -1700,6 +1700,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } } +<<<<<<< HEAD test("tableCreation: partition column case insensitive resolution") { val testCatalog = spark.catalog("testcat").asTableCatalog val sessionCatalog = spark.catalog("session").asTableCatalog @@ -1862,17 +1863,42 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } } - test("DeleteFromTable: basic") { + test("DeleteFrom: basic") { val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - sql(s"DELETE FROM $t WHERE id=2") + sql(s"DELETE FROM $t WHERE id = 2") checkAnswer(spark.table(t), Seq( Row(3, "c", 3))) } } + test("DeleteFrom: alias") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + sql(s"DELETE FROM $t tbl WHERE tbl.id = 2") + checkAnswer(spark.table(t), Seq( + Row(3, "c", 3))) + } + } + + test("DeleteFrom: fail if has subquery") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + val exc = intercept[AnalysisException] { + sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)") + } + + assert(spark.table(t).count === 3) + assert(exc.getMessage.contains("Delete by condition with subquery is not supported")) + } + } + private def testCreateAnalysisError(sqlStatement: String, expectedError: String): Unit = { val errMsg = intercept[AnalysisException] { sql(sqlStatement) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 41c46bd113923..67ebf11229dfe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable - import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, StagingTableCatalog, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.{IdentityTransform, Transform} @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Literal, Not} -import org.apache.spark.sql.sources.{And, EqualTo, Filter} +import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.types.{BooleanType, StructType} @@ -272,6 +271,8 @@ class InMemoryTable( case _ => throw new IllegalArgumentException(s"Unknown filter attribute: $attr") } + case IsNotNull(_) => + true case f => throw new IllegalArgumentException(s"Unsupported filter type: $f") } From ce751c5b4da02ae28d0109d3f77adc35c53c00fd Mon Sep 17 00:00:00 2001 From: xy_xin Date: Thu, 1 Aug 2019 16:59:36 +0800 Subject: [PATCH 09/16] Resolve the scala style problem --- .../spark/sql/sources/v2/TestInMemoryTableCatalog.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 67ebf11229dfe..9429940cd8659 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -22,19 +22,17 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.{Column, SparkSession} + import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, StagingTableCatalog, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.{IdentityTransform, Transform} import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Literal, Not} import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} -import org.apache.spark.sql.types.{BooleanType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.spark.unsafe.types.UTF8String // this is currently in the spark-sql module because the read and write API is not in catalyst // TODO(rdblue): when the v2 source API is in catalyst, merge with TestTableCatalog/InMemoryTable From 625e154b19b3a042d348863b9aef92e3fbdc84fe Mon Sep 17 00:00:00 2001 From: xy_xin Date: Fri, 2 Aug 2019 12:53:11 +0800 Subject: [PATCH 10/16] Update according to reviews --- .../spark/sql/sources/v2/SupportsDelete.java | 3 - .../plans/logical/basicLogicalOperators.scala | 3 +- .../datasources/DataSourceResolution.scala | 4 +- .../datasources/v2/DataSourceV2Strategy.scala | 2 +- .../datasources/v2/DeleteFromTableExec.scala | 1 - .../datasources/v2/V2WriteSupportCheck.scala | 2 +- .../sources/v2/TestInMemoryTableCatalog.scala | 61 ++++++++----------- 7 files changed, 29 insertions(+), 47 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java index daf3c2164536a..8650a0ef1d4ba 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java @@ -22,9 +22,6 @@ /** * A mix-in interface for {@link Table} delete support. Data sources can implement this * interface to provide the ability to delete data from tables that matches filter expressions. - *

- * Data sources must implement this interface to support logical operations that combine writing - * data with deleting data, like overwriting partitions. */ public interface SupportsDelete { /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index bea7c2a638460..c736de6bbabd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -569,8 +569,7 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm case class DeleteFromTable( child: LogicalPlan, - condition: Filter) - extends Command { + condition: Filter) extends Command { override def children: Seq[LogicalPlan] = child :: condition :: Nil override def output: Seq[Attribute] = Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 85ce31138546f..533b99b009a6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -24,7 +24,6 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} @@ -327,8 +326,7 @@ case class DataSourceResolution( catalog: TableCatalog, identifier: Identifier, delete: DeleteFromStatement): DeleteFromTable = { - val relation = CatalogV2Util.loadTable(catalog, identifier) - .map(DataSourceV2Relation.create).getOrElse(UnresolvedRelation(delete.tableName)) + val relation = UnresolvedRelation(delete.tableName) val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation) val filter = Filter(delete.condition, aliased) DeleteFromTable(aliased, filter) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 45f21a84b439e..0654c8dcf7097 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -229,7 +229,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { throw new AnalysisException(s"Exec delete failed:" + s" cannot translate expression to source filter: $f")) }.toArray - DeleteFromTableExec(r.table.asDeletable, r.options, filters) :: Nil + DeleteFromTableExec(r.table.asDeletable, filters) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala index 297596079d26a..b8735758c66f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap case class DeleteFromTableExec( table: SupportsDelete, - options: CaseInsensitiveStringMap, deleteWhere: Array[Filter]) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala index 3405db3f2b707..2daafee6aef39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Literal, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteFromTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.BooleanType diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 9429940cd8659..109dc198b0832 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -220,29 +220,9 @@ class InMemoryTable( private class Overwrite(filters: Array[Filter]) extends TestBatchWrite { override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { - val deleteKeys = dataMap.keys.filter { partValues => - filters.flatMap(splitAnd).forall { - case EqualTo(attr, value) => - partFieldNames.zipWithIndex.find(_._1 == attr) match { - case Some((_, partIndex)) => - value == partValues(partIndex) - case _ => - throw new IllegalArgumentException(s"Unknown filter attribute: $attr") - } - case f => - throw new IllegalArgumentException(s"Unsupported filter type: $f") - } - } - dataMap --= deleteKeys + dataMap --= deletesKeys(filters) withData(messages.map(_.asInstanceOf[BufferedRows])) } - - private def splitAnd(filter: Filter): Seq[Filter] = { - filter match { - case And(left, right) => splitAnd(left) ++ splitAnd(right) - case _ => filter :: Nil - } - } } private object TruncateAndAppend extends TestBatchWrite { @@ -252,6 +232,10 @@ class InMemoryTable( } } + override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { + dataMap --= deletesKeys(filters) + } + private def splitAnd(filter: Filter): Seq[Filter] = { filter match { case And(left, right) => splitAnd(left) ++ splitAnd(right) @@ -259,23 +243,28 @@ class InMemoryTable( } } - override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { - val deleteKeys = dataMap.keys.filter { partValues => - filters.flatMap(splitAnd).forall { - case EqualTo(attr, value) => - partFieldNames.zipWithIndex.find(_._1 == attr) match { - case Some((_, partIndex)) => - value == partValues(partIndex) - case _ => - throw new IllegalArgumentException(s"Unknown filter attribute: $attr") - } - case IsNotNull(_) => - true - case f => - throw new IllegalArgumentException(s"Unsupported filter type: $f") + private def deletesKeys(filters: Array[Filter]): Iterable[Seq[Any]] = { + dataMap.synchronized { + dataMap.keys.filter { partValues => + filters.flatMap(splitAnd).forall { + case EqualTo(attr, value) => + value == extractValue(attr, partValues) + case IsNotNull(attr) => + null != extractValue(attr, partValues) + case f => + throw new IllegalArgumentException(s"Unsupported filter type: $f") + } } } - dataMap --= deleteKeys + } + + private def extractValue(attr: String, partValues: Seq[Any]): Any = { + partFieldNames.zipWithIndex.find(_._1 == attr) match { + case Some((_, partIndex)) => + partValues(partIndex) + case _ => + throw new IllegalArgumentException(s"Unknown filter attribute: $attr") + } } } From 7e7ddf45e20bce0ba94b01855d784c10fae7c395 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Thu, 8 Aug 2019 17:57:48 +0800 Subject: [PATCH 11/16] Update according to reviews --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 ++ .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 4 +++- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 4 ++-- .../sql/execution/datasources/DataSourceResolution.scala | 3 +-- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 4 ++-- .../sql/execution/datasources/v2/DeleteFromTableExec.scala | 4 ++-- .../sql/execution/datasources/v2/V2WriteSupportCheck.scala | 6 +++--- 7 files changed, 15 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f8eef0cf32361..f88a8bcff5216 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1761,6 +1761,8 @@ class Analyzer( // Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries. case q: UnaryNode if q.childrenResolved => resolveSubQueries(q, q.children) + case d: DeleteFromTable if d.childrenResolved => + resolveSubQueries(d, d.children) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 519c558d12770..3c126ea8e9955 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -595,8 +595,10 @@ trait CheckAnalysis extends PredicateHelper { case inSubqueryOrExistsSubquery => plan match { case _: Filter => // Ok + case _: DeleteFromTable => // Ok case _ => - failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in a Filter: $plan") + failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" + + s" Filter/DeleteFromTable: $plan") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c736de6bbabd7..8411ada38e821 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -569,9 +569,9 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm case class DeleteFromTable( child: LogicalPlan, - condition: Filter) extends Command { + condition: Expression) extends Command { - override def children: Seq[LogicalPlan] = child :: condition :: Nil + override def children: Seq[LogicalPlan] = child :: Nil override def output: Seq[Attribute] = Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 533b99b009a6e..b5ed2e25d6ebc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -328,8 +328,7 @@ case class DataSourceResolution( delete: DeleteFromStatement): DeleteFromTable = { val relation = UnresolvedRelation(delete.tableName) val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation) - val filter = Filter(delete.condition, aliased) - DeleteFromTable(aliased, filter) + DeleteFromTable(aliased, delete.condition) } private def convertTableProperties( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 0654c8dcf7097..585fe06ce4ce7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -222,9 +222,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) => OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil - case DeleteFromTable(r: DataSourceV2Relation, filter) => + case DeleteFromTable(r: DataSourceV2Relation, condition) => // fail if any filter cannot be converted. correctness depends on removing all matching data. - val filters = splitConjunctivePredicates(filter.condition).map { + val filters = splitConjunctivePredicates(condition).map { f => DataSourceStrategy.translateFilter(f).getOrElse( throw new AnalysisException(s"Exec delete failed:" + s" cannot translate expression to source filter: $f")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala index b8735758c66f9..a5840571fff23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -27,10 +27,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap case class DeleteFromTableExec( table: SupportsDelete, - deleteWhere: Array[Filter]) extends LeafExecNode { + condition: Array[Filter]) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = { - table.deleteWhere(deleteWhere) + table.deleteWhere(condition) sparkContext.emptyRDD } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala index 2daafee6aef39..5648d5439ba5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala @@ -51,9 +51,9 @@ object V2WriteSupportCheck extends (LogicalPlan => Unit) { } } - case DeleteFromTable(_, filter) => - if (SubqueryExpression.hasSubquery(filter.condition)) { - failAnalysis(s"Delete by condition with subquery is not supported: $filter") + case DeleteFromTable(_, condition) => + if (SubqueryExpression.hasSubquery(condition)) { + failAnalysis(s"Delete by condition with subquery is not supported: $condition") } case _ => // OK From 088549033693dc6e7b904f53e709bd3d7feb2e8a Mon Sep 17 00:00:00 2001 From: xy_xin Date: Thu, 8 Aug 2019 19:08:55 +0800 Subject: [PATCH 12/16] Update according to reviews --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 3c126ea8e9955..bd54c66992dac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -585,7 +585,7 @@ trait CheckAnalysis extends PredicateHelper { // Only certain operators are allowed to host subquery expression containing // outer references. plan match { - case _: Filter | _: Aggregate | _: Project => // Ok + case _: Filter | _: Aggregate | _: Project | _: DeleteFromTable => // Ok case other => failAnalysis( "Correlated scalar sub-queries can only be used in a " + s"Filter/Aggregate/Project: $plan") @@ -594,8 +594,7 @@ trait CheckAnalysis extends PredicateHelper { case inSubqueryOrExistsSubquery => plan match { - case _: Filter => // Ok - case _: DeleteFromTable => // Ok + case _: Filter | _: DeleteFromTable => // Ok case _ => failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" + s" Filter/DeleteFromTable: $plan") From 21b02ea614eb6b2e1c8d56434d5f34d2b9f0d21d Mon Sep 17 00:00:00 2001 From: xy_xin Date: Thu, 8 Aug 2019 19:42:56 +0800 Subject: [PATCH 13/16] Do not use wildcard imports for DataSourceV2Implicits --- .../sql/execution/datasources/v2/DataSourceV2Implicits.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index ded70a29d470d..2d59c42ee8684 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability} object DataSourceV2Implicits { implicit class TableHelper(table: Table) { From e68fba26885362da04c1360f4eccc6de6c7a4b46 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Fri, 9 Aug 2019 16:31:47 +0800 Subject: [PATCH 14/16] Rebase against master --- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 3 ++- .../org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 4d42f5fb73362..1cc5dd8ce1d54 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -531,7 +531,8 @@ class AnalysisErrorSuite extends AnalysisTest { val plan = Project( Seq(a, Alias(InSubquery(Seq(a), ListQuery(LocalRelation(b))), "c")()), LocalRelation(a)) - assertAnalysisError(plan, "Predicate sub-queries can only be used in a Filter" :: Nil) + assertAnalysisError(plan, "Predicate sub-queries can only be used" + + " in Filter/DeleteFromTable" :: Nil) } test("PredicateSubQuery is used is a nested condition") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 3a7c8903f4dc6..9b1a23a1f2bbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -1700,7 +1700,6 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } } -<<<<<<< HEAD test("tableCreation: partition column case insensitive resolution") { val testCatalog = spark.catalog("testcat").asTableCatalog val sessionCatalog = spark.catalog("session").asTableCatalog From 792c36b52f7701df9b6e61c9f908ef3b2b5789ec Mon Sep 17 00:00:00 2001 From: xy_xin Date: Tue, 13 Aug 2019 10:08:44 +0800 Subject: [PATCH 15/16] Update according to test --- .../sql/catalyst/analysis/Analyzer.scala | 12 ++++++++++ .../plans/logical/basicLogicalOperators.scala | 1 - .../logical/sql/DeleteFromStatement.scala | 7 +----- .../datasources/DataSourceResolution.scala | 24 +++---------------- 4 files changed, 16 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f88a8bcff5216..08ec220cd8493 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -644,6 +644,7 @@ class Analyzer( */ object ResolveTables extends Rule[LogicalPlan] { import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._ + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident)) @@ -652,6 +653,17 @@ class Analyzer( case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) + + case d @ DeleteFromTable(u @ UnresolvedRelation( + CatalogObjectIdentifier(None, ident)), condition) => + // fallback to session catalog for DeleteFromTable if no catalog specified and no default + // catalog set. + val catalog = sessionCatalog + .getOrElse(throw new AnalysisException( + s"Cannot delete from ${ident.quoted} because no catalog specified" + + s" and no session catalog provided.")) + .asTableCatalog + d.copy(child = loadTable(catalog, ident).map(DataSourceV2Relation.create).getOrElse(u)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 8411ada38e821..968a561da9c38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -572,7 +572,6 @@ case class DeleteFromTable( condition: Expression) extends Command { override def children: Seq[LogicalPlan] = child :: Nil - override def output: Seq[Attribute] = Seq.empty } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala index 0391781f360a1..21e24127eee31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala @@ -24,9 +24,4 @@ case class DeleteFromStatement( tableName: Seq[String], tableAlias: Option[String], condition: Expression) - extends ParsedStatement { - - override def output: Seq[Attribute] = Seq.empty - - override def children: Seq[LogicalPlan] = Seq.empty -} + extends ParsedStatement diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index b5ed2e25d6ebc..217f7debbbbd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -173,22 +173,13 @@ case class DataSourceResolution( // only top-level adds are supported using AlterTableAddColumnsCommand AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) - case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) => - throw new AnalysisException( - s"Delete from tables is not supported using the legacy / v1 Spark external catalog" + - s" API. Identifier: $table.") - case delete: DeleteFromStatement => - val CatalogObjectIdentifier(maybeCatalog, identifier) = delete.tableName - val catalog = maybeCatalog.orElse(defaultCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog - convertDeleteFrom(catalog.asTableCatalog, identifier, delete) + val relation = UnresolvedRelation(delete.tableName) + val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation) + DeleteFromTable(aliased, delete.condition) case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) => UnresolvedCatalogRelation(catalogTable) - } object V1WriteProvider { @@ -322,15 +313,6 @@ case class DataSourceResolution( orCreate = replace.orCreate) } - private def convertDeleteFrom( - catalog: TableCatalog, - identifier: Identifier, - delete: DeleteFromStatement): DeleteFromTable = { - val relation = UnresolvedRelation(delete.tableName) - val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation) - DeleteFromTable(aliased, delete.condition) - } - private def convertTableProperties( properties: Map[String, String], options: Map[String, String], From bbf515666495cbf5f12731b3cdab4a23960f3d77 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Wed, 14 Aug 2019 09:55:04 +0800 Subject: [PATCH 16/16] Rollback rules for resolving tables for DeleteFromTable --- .../spark/sql/catalyst/analysis/Analyzer.scala | 12 ------------ .../execution/datasources/DataSourceResolution.scala | 5 +++++ 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 08ec220cd8493..f88a8bcff5216 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -644,7 +644,6 @@ class Analyzer( */ object ResolveTables extends Rule[LogicalPlan] { import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._ - import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident)) @@ -653,17 +652,6 @@ class Analyzer( case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) - - case d @ DeleteFromTable(u @ UnresolvedRelation( - CatalogObjectIdentifier(None, ident)), condition) => - // fallback to session catalog for DeleteFromTable if no catalog specified and no default - // catalog set. - val catalog = sessionCatalog - .getOrElse(throw new AnalysisException( - s"Cannot delete from ${ident.quoted} because no catalog specified" + - s" and no session catalog provided.")) - .asTableCatalog - d.copy(child = loadTable(catalog, ident).map(DataSourceV2Relation.create).getOrElse(u)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 217f7debbbbd9..4791fe5fb5251 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -173,6 +173,11 @@ case class DataSourceResolution( // only top-level adds are supported using AlterTableAddColumnsCommand AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) + case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) => + throw new AnalysisException( + s"Delete from tables is not supported using the legacy / v1 Spark external catalog" + + s" API. Identifier: $table.") + case delete: DeleteFromStatement => val relation = UnresolvedRelation(delete.tableName) val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)