Skip to content

Commit

Permalink
[HUDI-3722] Fix truncate hudi table's error (#5140)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuQianJin-Stars committed Mar 29, 2022
1 parent d074089 commit 72e0b52
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 75 deletions.
Expand Up @@ -19,30 +19,28 @@ package org.apache.spark.sql.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}

import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Locale, Properties}

import scala.collection.JavaConverters._
import scala.collection.immutable.Map

Expand Down Expand Up @@ -321,4 +319,57 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child
}
}

def normalizePartitionSpec[T](
partitionSpec: Map[String, T],
partColNames: Seq[String],
tblName: String,
resolver: Resolver): Map[String, T] = {
val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) =>
val normalizedKey = partColNames.find(resolver(_, key)).getOrElse {
throw new AnalysisException(s"$key is not a valid partition column in table $tblName.")
}
normalizedKey -> value
}

if (normalizedPartSpec.size < partColNames.size) {
throw new AnalysisException(
"All partition columns need to be specified for Hoodie's partition")
}

val lowerPartColNames = partColNames.map(_.toLowerCase)
if (lowerPartColNames.distinct.length != lowerPartColNames.length) {
val duplicateColumns = lowerPartColNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => s"`$x`"
}
throw new AnalysisException(
s"Found duplicate column(s) in the partition schema: ${duplicateColumns.mkString(", ")}")
}

normalizedPartSpec.toMap
}

def getPartitionPathToDrop(
hoodieCatalogTable: HoodieCatalogTable,
normalizedSpecs: Seq[Map[String, String]]): String = {
val table = hoodieCatalogTable.table
val allPartitionPaths = hoodieCatalogTable.getPartitionPaths
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
val partitionsToDrop = normalizedSpecs.map { spec =>
hoodieCatalogTable.partitionFields.map { partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
} else {
spec(partitionColumn)
}
if (enableHiveStylePartitioning) {
partitionColumn + "=" + encodedPartitionValue
} else {
encodedPartitionValue
}
}.mkString("/")
}.mkString(",")
partitionsToDrop
}
}
Expand Up @@ -20,14 +20,12 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor}
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.DDLUtils
Expand Down Expand Up @@ -115,57 +113,4 @@ case class AlterHoodieTableDropPartitionCommand(
)
}
}

def normalizePartitionSpec[T](
partitionSpec: Map[String, T],
partColNames: Seq[String],
tblName: String,
resolver: Resolver): Map[String, T] = {
val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) =>
val normalizedKey = partColNames.find(resolver(_, key)).getOrElse {
throw new AnalysisException(s"$key is not a valid partition column in table $tblName.")
}
normalizedKey -> value
}

if (normalizedPartSpec.size < partColNames.size) {
throw new AnalysisException(
"All partition columns need to be specified for Hoodie's dropping partition")
}

val lowerPartColNames = partColNames.map(_.toLowerCase)
if (lowerPartColNames.distinct.length != lowerPartColNames.length) {
val duplicateColumns = lowerPartColNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => s"`$x`"
}
throw new AnalysisException(
s"Found duplicate column(s) in the partition schema: ${duplicateColumns.mkString(", ")}")
}

normalizedPartSpec.toMap
}

def getPartitionPathToDrop(
hoodieCatalogTable: HoodieCatalogTable,
normalizedSpecs: Seq[Map[String, String]]): String = {
val table = hoodieCatalogTable.table
val allPartitionPaths = hoodieCatalogTable.getPartitionPaths
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
val partitionsToDrop = normalizedSpecs.map { spec =>
hoodieCatalogTable.partitionFields.map { partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
} else {
spec(partitionColumn)
}
if (enableHiveStylePartitioning) {
partitionColumn + "=" + encodedPartitionValue
} else {
encodedPartitionValue
}
}.mkString("/")
}.mkString(",")
partitionsToDrop
}
}
Expand Up @@ -17,42 +17,107 @@

package org.apache.spark.sql.hudi.command

import org.apache.hadoop.fs.Path
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.TruncateTableCommand
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getPartitionPathToDrop, normalizePartitionSpec}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}

import scala.util.control.NonFatal

/**
* Command for truncate hudi table.
*/
class TruncateHoodieTableCommand(
case class TruncateHoodieTableCommand(
tableIdentifier: TableIdentifier,
partitionSpec: Option[TablePartitionSpec])
extends TruncateTableCommand(tableIdentifier, partitionSpec) {
extends HoodieLeafRunnableCommand {

override def run(spark: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
logInfo(s"start execute truncate table command for $fullTableName")

override def run(sparkSession: SparkSession): Seq[Row] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val hoodieCatalogTable = HoodieCatalogTable(spark, tableIdentifier)
val properties = hoodieCatalogTable.tableConfig.getProps

try {
// Delete all data in the table directory
super.run(sparkSession)
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableIdentifier)
val tableIdentWithDB = table.identifier.quotedString

if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
}

if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
s"for tables that are not partitioned: $tableIdentWithDB")
}

val basePath = hoodieCatalogTable.tableLocation
val partCols = table.partitionColumnNames
val locations = if (partitionSpec.isEmpty || partCols.isEmpty) {
Seq(basePath)
} else {
val normalizedSpec: Seq[Map[String, String]] = Seq(partitionSpec.map { spec =>
normalizePartitionSpec(
spec,
partCols,
table.identifier.quotedString,
spark.sessionState.conf.resolver)
}.get)

val fullPartitionPath = FSUtils.getPartitionPath(basePath, getPartitionPathToDrop(hoodieCatalogTable, normalizedSpec))

Seq(fullPartitionPath)
}

val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
val path = new Path(location.toString)
try {
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
fs.mkdirs(path)
} catch {
case NonFatal(e) =>
throw new AnalysisException(
s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " +
s"because of ${e.toString}")
}
}

// Also try to drop the contents of the table from the columnar cache
try {
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), cascade = true)
} catch {
case NonFatal(_) =>
}

if (table.stats.nonEmpty) {
// empty table after truncation
val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0))
catalog.alterTableStats(tableIdentifier, Some(newStats))
}
Seq.empty[Row]
} catch {
// TruncateTableCommand will delete the related directories first, and then refresh the table.
// It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step.
// So here ignore this failure, and refresh table later.
case NonFatal(_) =>
case NonFatal(e) =>
throw new AnalysisException(s"Exception when attempting to truncate table ${tableIdentifier.quotedString}: " + e)
}

// If we have not specified the partition, truncate will delete all the data in the table path
// include the hoodie.properties. In this case we should reInit the table.
if (partitionSpec.isEmpty) {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val hadoopConf = spark.sessionState.newHadoopConf()
// ReInit hoodie.properties
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
Expand All @@ -61,7 +126,7 @@ class TruncateHoodieTableCommand(

// After deleting the data, refresh the table to make sure we don't keep around a stale
// file relation in the metastore cache and cached table data in the cache manager.
sparkSession.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
spark.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
Seq.empty[Row]
}
}
Expand Up @@ -224,7 +224,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {

// not specified all partition column
checkExceptionContain(s"alter table $tableName drop partition (year='2021', month='10')")(
"All partition columns need to be specified for Hoodie's dropping partition"
"All partition columns need to be specified for Hoodie's partition"
)
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")
Expand Down

0 comments on commit 72e0b52

Please sign in to comment.