diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index a978b94f49ac..28f4e7dafdf5 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -24,9 +24,11 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.expressions.UpCast import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin @@ -45,14 +47,17 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case u@UnresolvedRelation(nameParts, _, _) - if catalogManager.v1SessionCatalog.isTempView(nameParts) => + case u@UnresolvedRelation(nameParts, _, _) if isTempView(nameParts) => u case u@UnresolvedRelation(parts@CatalogAndIdentifier(catalog, ident), _, _) => loadView(catalog, ident) .map(createViewRelation(parts, _)) .getOrElse(u) + + case DropIcebergView(r@ResolvedIdentifier(catalog, ident), ifExists) + if isTempView(ident.asMultipartIdentifier) || !isViewCatalog(catalog) => + DropView(r, ifExists) } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { @@ -111,8 +116,8 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } private def qualifyFunctionIdentifiers( - plan: LogicalPlan, - catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { + plan: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { case u@UnresolvedFunction(Seq(name), _, _, _, _) => if (!isBuiltinFunction(name)) { u.copy(nameParts = catalogAndNamespace :+ name) @@ -137,10 +142,18 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } private def isCatalog(name: String): Boolean = { - spark.sessionState.catalogManager.isCatalogRegistered(name) + catalogManager.isCatalogRegistered(name) } private def isBuiltinFunction(name: String): Boolean = { - spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) + catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) + } + + private def isTempView(nameParts: Seq[String]): Boolean = { + catalogManager.v1SessionCatalog.isTempView(nameParts) + } + + private def isViewCatalog(catalog: CatalogPlugin): Boolean = { + catalog.isInstanceOf[ViewCatalog] } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 4fe0b5f283e5..c37d7c0ace25 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -35,12 +35,17 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.NonReservedContext import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.QuotedIdentifierContext +import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.catalog.TableCatalog @@ -57,6 +62,7 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI private lazy val substitutor = substitutorCtor.newInstance(SQLConf.get) private lazy val astBuilder = new IcebergSqlExtensionsAstBuilder(delegate) + private lazy val maxIterations = SQLConf.get.analyzerMaxIterations /** * Parse a string to a DataType. @@ -122,7 +128,27 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI if (isIcebergCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] } else { - delegate.parsePlan(sqlText) + ViewSubstitutionExecutor.execute(delegate.parsePlan(sqlText)) + } + } + + private object ViewSubstitutionExecutor extends RuleExecutor[LogicalPlan] { + private val fixedPoint = FixedPoint( + maxIterations, + errorOnExceed = true, + maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key) + + override protected def batches: Seq[Batch] = Seq(Batch("pre-substitution", fixedPoint, V2ViewSubstitution)) + } + + /** + * ResolveSessionCatalog exits early for some v2 View commands, + * thus they are pre-substituted here and then handled in ResolveViews + */ + private object V2ViewSubstitution extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) => + DropIcebergView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala new file mode 100644 index 000000000000..275dba6fbf5e --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.UnaryCommand + +case class DropIcebergView( + child: LogicalPlan, + ifExists: Boolean) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): DropIcebergView = + copy(child = newChild) +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala new file mode 100644 index 000000000000..4f4c665f5f02 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + + +case class DropV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + ifExists: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } else if (!ifExists) { + throw new NoSuchViewException(ident) + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DropV2View: ${ident}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 6302d8307a65..a28fda80010d 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -25,6 +25,7 @@ import org.apache.iceberg.spark.SparkSessionCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.expressions.PredicateHelper @@ -41,8 +42,10 @@ import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.execution.OrderAwareCoalesceExec import org.apache.spark.sql.execution.SparkPlan import scala.jdk.CollectionConverters._ @@ -90,6 +93,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case OrderAwareCoalesce(numPartitions, coalescer, child) => OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil + case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => + DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil + case _ => Nil } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 94e86e5ee3cc..445cebb7bfe2 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.extensions; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; @@ -590,10 +591,7 @@ public void fullFunctionIdentifier() { @Test public void fullFunctionIdentifierNotRewrittenLoadFailure() { String viewName = "fullFunctionIdentifierNotRewrittenLoadFailure"; - String sql = - String.format( - "SELECT spark_catalog.system.bucket(100, 'a') AS bucket_result, 'a' AS value", - catalogName); + String sql = "SELECT spark_catalog.system.bucket(100, 'a') AS bucket_result, 'a' AS value"; // avoid namespace failures sql("USE spark_catalog"); @@ -635,6 +633,51 @@ private Catalog tableCatalog() { return Spark3Util.loadIcebergCatalog(spark, catalogName); } + @Test + public void dropView() { + String viewName = "viewToBeDropped"; + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + TableIdentifier identifier = TableIdentifier.of(NAMESPACE, viewName); + viewCatalog + .buildView(identifier) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThat(viewCatalog.viewExists(identifier)).isTrue(); + + sql("DROP VIEW %s", viewName); + assertThat(viewCatalog.viewExists(identifier)).isFalse(); + } + + @Test + public void dropNonExistingView() { + assertThatThrownBy(() -> sql("DROP VIEW non_existing")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The view %s.%s cannot be found", NAMESPACE, "non_existing"); + + assertThatNoException().isThrownBy(() -> sql("DROP VIEW IF EXISTS non_existing")); + } + + @Test + public void dropGlobalTempView() { + String globalTempView = "globalViewToBeDropped"; + sql("CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s", globalTempView, tableName); + sql("DROP VIEW global_temp.%s", globalTempView); + } + + @Test + public void dropTempView() { + String tempView = "tempViewToBeDropped"; + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s", tempView, tableName); + sql("DROP VIEW %s", tempView); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 01804e010834..14c3d9a6b47c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -581,8 +581,11 @@ public View alterView(Identifier ident, ViewChange... changes) @Override public boolean dropView(Identifier ident) { - throw new UnsupportedOperationException( - "Dropping a view is not supported by catalog: " + catalogName); + if (null != asViewCatalog) { + return asViewCatalog.dropView(buildIdentifier(ident)); + } + + return false; } @Override