From 792c36b52f7701df9b6e61c9f908ef3b2b5789ec Mon Sep 17 00:00:00 2001 From: xy_xin Date: Tue, 13 Aug 2019 10:08:44 +0800 Subject: [PATCH] Update according to test --- .../sql/catalyst/analysis/Analyzer.scala | 12 ++++++++++ .../plans/logical/basicLogicalOperators.scala | 1 - .../logical/sql/DeleteFromStatement.scala | 7 +----- .../datasources/DataSourceResolution.scala | 24 +++---------------- 4 files changed, 16 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f88a8bcff5216..08ec220cd8493 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -644,6 +644,7 @@ class Analyzer( */ object ResolveTables extends Rule[LogicalPlan] { import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._ + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident)) @@ -652,6 +653,17 @@ class Analyzer( case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) + + case d @ DeleteFromTable(u @ UnresolvedRelation( + CatalogObjectIdentifier(None, ident)), condition) => + // fallback to session catalog for DeleteFromTable if no catalog specified and no default + // catalog set. + val catalog = sessionCatalog + .getOrElse(throw new AnalysisException( + s"Cannot delete from ${ident.quoted} because no catalog specified" + + s" and no session catalog provided.")) + .asTableCatalog + d.copy(child = loadTable(catalog, ident).map(DataSourceV2Relation.create).getOrElse(u)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 8411ada38e821..968a561da9c38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -572,7 +572,6 @@ case class DeleteFromTable( condition: Expression) extends Command { override def children: Seq[LogicalPlan] = child :: Nil - override def output: Seq[Attribute] = Seq.empty } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala index 0391781f360a1..21e24127eee31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala @@ -24,9 +24,4 @@ case class DeleteFromStatement( tableName: Seq[String], tableAlias: Option[String], condition: Expression) - extends ParsedStatement { - - override def output: Seq[Attribute] = Seq.empty - - override def children: Seq[LogicalPlan] = Seq.empty -} + extends ParsedStatement diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index b5ed2e25d6ebc..217f7debbbbd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -173,22 +173,13 @@ case class DataSourceResolution( // only top-level adds are supported using AlterTableAddColumnsCommand AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) - case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) => - throw new AnalysisException( - s"Delete from tables is not supported using the legacy / v1 Spark external catalog" + - s" API. Identifier: $table.") - case delete: DeleteFromStatement => - val CatalogObjectIdentifier(maybeCatalog, identifier) = delete.tableName - val catalog = maybeCatalog.orElse(defaultCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog - convertDeleteFrom(catalog.asTableCatalog, identifier, delete) + val relation = UnresolvedRelation(delete.tableName) + val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation) + DeleteFromTable(aliased, delete.condition) case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) => UnresolvedCatalogRelation(catalogTable) - } object V1WriteProvider { @@ -322,15 +313,6 @@ case class DataSourceResolution( orCreate = replace.orCreate) } - private def convertDeleteFrom( - catalog: TableCatalog, - identifier: Identifier, - delete: DeleteFromStatement): DeleteFromTable = { - val relation = UnresolvedRelation(delete.tableName) - val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation) - DeleteFromTable(aliased, delete.condition) - } - private def convertTableProperties( properties: Map[String, String], options: Map[String, String],