Skip to content

Commit

Permalink
Update according to test
Browse files Browse the repository at this point in the history
  • Loading branch information
xy_xin committed Aug 13, 2019
1 parent e68fba2 commit 792c36b
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ class Analyzer(
*/
object ResolveTables extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident))
Expand All @@ -652,6 +653,17 @@ class Analyzer(

case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) =>
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)

case d @ DeleteFromTable(u @ UnresolvedRelation(
CatalogObjectIdentifier(None, ident)), condition) =>
// fallback to session catalog for DeleteFromTable if no catalog specified and no default
// catalog set.
val catalog = sessionCatalog
.getOrElse(throw new AnalysisException(
s"Cannot delete from ${ident.quoted} because no catalog specified" +
s" and no session catalog provided."))
.asTableCatalog
d.copy(child = loadTable(catalog, ident).map(DataSourceV2Relation.create).getOrElse(u))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ case class DeleteFromTable(
condition: Expression) extends Command {

override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,4 @@ case class DeleteFromStatement(
tableName: Seq[String],
tableAlias: Option[String],
condition: Expression)
extends ParsedStatement {

override def output: Seq[Attribute] = Seq.empty

override def children: Seq[LogicalPlan] = Seq.empty
}
extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -173,22 +173,13 @@ case class DataSourceResolution(
// only top-level adds are supported using AlterTableAddColumnsCommand
AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField))

case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) =>
throw new AnalysisException(
s"Delete from tables is not supported using the legacy / v1 Spark external catalog" +
s" API. Identifier: $table.")

case delete: DeleteFromStatement =>
val CatalogObjectIdentifier(maybeCatalog, identifier) = delete.tableName
val catalog = maybeCatalog.orElse(defaultCatalog)
.getOrElse(throw new AnalysisException(
s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
.asTableCatalog
convertDeleteFrom(catalog.asTableCatalog, identifier, delete)
val relation = UnresolvedRelation(delete.tableName)
val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)
DeleteFromTable(aliased, delete.condition)

case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) =>
UnresolvedCatalogRelation(catalogTable)

}

object V1WriteProvider {
Expand Down Expand Up @@ -322,15 +313,6 @@ case class DataSourceResolution(
orCreate = replace.orCreate)
}

private def convertDeleteFrom(
catalog: TableCatalog,
identifier: Identifier,
delete: DeleteFromStatement): DeleteFromTable = {
val relation = UnresolvedRelation(delete.tableName)
val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation)
DeleteFromTable(aliased, delete.condition)
}

private def convertTableProperties(
properties: Map[String, String],
options: Map[String, String],
Expand Down

0 comments on commit 792c36b

Please sign in to comment.