Skip to content

Commit

Permalink
[HUDI-3060] DROP TABLE for spark sql
Browse files Browse the repository at this point in the history
  • Loading branch information
XuQianJin-Stars committed Dec 21, 2021
1 parent 32a44bb commit edb0803
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,22 @@ package org.apache.spark.sql.hudi.analysis

import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.SparkAdapterSupport

import scala.collection.JavaConverters._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{AnalysisException, SparkSession}

import scala.collection.JavaConverters._

object HoodieAnalysis {
def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
Expand Down Expand Up @@ -407,6 +405,10 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case CreateDataSourceTableCommand(table, ignoreIfExists)
if isHoodieTable(table) =>
CreateHoodieTableCommand(table, ignoreIfExists)
// Rewrite the DropTableCommand to DropHoodieTableCommand
case DropTableCommand(tableName, ifExists, isView, purge)
if isHoodieTable(tableName, sparkSession) =>
DropHoodieTableCommand(tableName, ifExists, isView, purge)
// Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand
case AlterTableDropPartitionCommand(tableName, specs, _, _, _)
if isHoodieTable(tableName, sparkSession) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils.isEnableHive

import scala.util.control.NonFatal

case class DropHoodieTableCommand(
tableIdentifier: TableIdentifier,
ifExists: Boolean,
isView: Boolean,
purge: Boolean) extends RunnableCommand
with SparkAdapterSupport {

override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
logInfo(s"start execute drop table command for $fullTableName")

try {
// drop catalog table for this hoodie table
dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to drop catalog table in metastore: ${e.getMessage}")
}

logInfo(s"Finish execute drop table command for $fullTableName")
Seq.empty[Row]
}

def dropTableInCatalog(sparkSession: SparkSession,
tableIdentifier: TableIdentifier,
ifExists: Boolean,
purge: Boolean): Unit = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val table = hoodieCatalogTable.table
assert(table.tableType != CatalogTableType.VIEW)

val basePath = hoodieCatalogTable.tableLocation
val catalog = sparkSession.sessionState.catalog

// Drop table in the catalog
val enableHive = isEnableHive(sparkSession)
if (enableHive) {
dropHiveDataSourceTable(sparkSession, table, ifExists, purge)
} else {
catalog.dropTable(tableIdentifier, ifExists, purge)
}

// Recursively delete table directories
if (purge) {
logInfo("Clean up " + basePath)
val targetPath = new Path(basePath)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
if (fs.exists(targetPath)) {
fs.delete(targetPath, true)
}
}
}

private def dropHiveDataSourceTable(
sparkSession: SparkSession,
table: CatalogTable,
ifExists: Boolean,
purge: Boolean): Unit = {
val dbName = table.identifier.database.get
val tableName = table.identifier.table
// check database exists
val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName)
if (!dbExists) {
throw new NoSuchDatabaseException(dbName)
}
// check table exists
if (!sparkSession.sessionState.catalog.tableExists(table.identifier)) {
throw new NoSuchTableException(dbName, table.identifier.table)
}

val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf,
sparkSession.sessionState.newHadoopConf())
// drop hive table.
client.dropTable(dbName, tableName, ifExists, purge)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi

class TestDropTable extends TestHoodieSqlBase {

test("Test Drop Table") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"DROP TABLE $tableName")
checkAnswer(s"show tables like '$tableName'")()
assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName"))
}
}
}

test("Test Drop Table with purge") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"DROP TABLE $tableName PURGE")
checkAnswer(s"show tables like '$tableName'")()
assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.sql.hudi

import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.fs.FSUtils
import org.apache.log4j.Level
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}

import java.io.File
import java.util.TimeZone

class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -115,4 +117,10 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
case _=> value
}
}

protected def existsPath(filePath: String): Boolean = {
val path = new Path(filePath)
val fs = FSUtils.getFs(filePath, spark.sparkContext.hadoopConfiguration)
fs.exists(path)
}
}

0 comments on commit edb0803

Please sign in to comment.