Skip to content

Commit

Permalink
[HUDI-5057] Fix msck repair hudi table (apache#6999)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored and fengjian committed Apr 5, 2023
1 parent 6677f55 commit 928ceca
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 6 deletions.
Expand Up @@ -90,4 +90,14 @@ trait HoodieCatalystPlansUtils {
def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan

/**
* Test if the logical plan is a Repair Table LogicalPlan.
*/
def isRepairTable(plan: LogicalPlan): Boolean

/**
* Get the member of the Repair Table LogicalPlan.
*/
def getRepairTableChildren(plan: LogicalPlan):
Option[(TableIdentifier, Boolean, Boolean, String)]
}
Expand Up @@ -322,10 +322,9 @@ public static List<String> getAllPartitionPaths(HoodieEngineContext engineContex
public static Map<String, FileStatus[]> getFilesInPartitions(HoodieEngineContext engineContext,
HoodieMetadataConfig metadataConfig,
String basePathStr,
String[] partitionPaths,
String spillableMapPath) {
String[] partitionPaths) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
spillableMapPath, true)) {
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true)) {
return tableMetadata.getAllFilesInPartitions(Arrays.asList(partitionPaths));
} catch (Exception ex) {
throw new HoodieException("Error get files in partitions: " + String.join(",", partitionPaths), ex);
Expand Down
Expand Up @@ -138,7 +138,7 @@ public FileStatus[] getFilesInPartitions() {
}
String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new);
FileStatus[] allFileStatus = FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
partitions, "/tmp/")
partitions)
.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
Set<String> candidateFiles = candidateFilesInMetadataTable(allFileStatus);
if (candidateFiles == null) {
Expand Down
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
Expand Down Expand Up @@ -78,6 +78,20 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala
}

def getFilesInPartitions(spark: SparkSession,
table: CatalogTable,
partitionPaths: Seq[String]): Map[String, Array[FileStatus]] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val metadataConfig = {
val properties = new Properties()
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++
table.properties).asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
FSUtils.getFilesInPartitions(sparkEngine, metadataConfig, getTableLocation(table, spark),
partitionPaths.toArray).asScala.toMap
}

/**
* This method is used to compatible with the old non-hive-styled partition table.
* By default we enable the "hoodie.datasource.write.hive_style_partitioning"
Expand Down
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.hadoop.fs.Path

import org.apache.hudi.common.table.HoodieTableConfig

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.PartitionStatistics
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.util.ThreadUtils

import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.util.control.NonFatal

/**
* Command for repair hudi table's partitions.
* Use the methods in HoodieSqlCommonUtils to obtain partitions and stats
* instead of scanning the file system.
*/
case class RepairHoodieTableCommand(tableName: TableIdentifier,
enableAddPartitions: Boolean,
enableDropPartitions: Boolean,
cmd: String = "MSCK REPAIR TABLE") extends HoodieLeafRunnableCommand {

// These are list of statistics that can be collected quickly without requiring a scan of the data
// see https://github.com/apache/hive/blob/master/
// common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
val NUM_FILES = "numFiles"
val TOTAL_SIZE = "totalSize"
val DDL_TIME = "transient_lastDdlTime"

override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val tableIdentWithDB = table.identifier.quotedString
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
}

if (table.storage.locationUri.isEmpty) {
throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
s"location provided: $tableIdentWithDB")
}

val root = new Path(table.location)
logInfo(s"Recover all the partitions in $root")

val hoodieCatalogTable = HoodieCatalogTable(spark, table.identifier)
val isHiveStyledPartitioning = hoodieCatalogTable.catalogProperties.
getOrElse(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key, "true").toBoolean

val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = hoodieCatalogTable.
getPartitionPaths.map(partitionPath => {
var values = partitionPath.split('/')
if (isHiveStyledPartitioning) {
values = values.map(_.split('=')(1))
}
(table.partitionColumnNames.zip(values).toMap, new Path(root, partitionPath))
})

val droppedAmount = if (enableDropPartitions) {
dropPartitions(catalog, partitionSpecsAndLocs)
} else 0
val addedAmount = if (enableAddPartitions) {
val total = partitionSpecsAndLocs.length
val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
HoodieSqlCommonUtils.getFilesInPartitions(spark, table, partitionSpecsAndLocs
.map(_._2.toString))
.mapValues(statuses => PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
} else {
Map.empty[String, PartitionStatistics]
}
logInfo(s"Finished to gather the fast stats for all $total partitions.")
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
total
} else 0
// Updates the table to indicate that its partition metadata is stored in the Hive metastore.
// This is always the case for Hive format tables, but is not true for Datasource tables created
// before Spark 2.1 unless they are converted via `msck repair table`.
spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true))
try {
spark.catalog.refreshTable(tableIdentWithDB)
} catch {
case NonFatal(e) =>
logError(s"Cannot refresh the table '$tableIdentWithDB'. A query of the table " +
"might return wrong result if the table was cached. To avoid such issue, you should " +
"uncache the table manually via the UNCACHE TABLE command after table recovering will " +
"complete fully.", e)
}
logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).")
Seq.empty[Row]
}

private def addPartitions(spark: SparkSession,
table: CatalogTable,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)],
partitionStats: Map[String, PartitionStatistics]): Unit = {
val total = partitionSpecsAndLocs.length
var done = 0L
// Hive metastore may not have enough memory to handle millions of partitions in single RPC,
// we should split them into smaller batches. Since Hive client is not thread safe, we cannot
// do this in parallel.
val batchSize = spark.sparkContext.conf.getInt("spark.sql.addPartitionInBatch.size", 100)
partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
val parts = batch.map { case (spec, location) =>
val params = partitionStats.get(location.toString).map {
case PartitionStatistics(numFiles, totalSize) =>
// This two fast stat could prevent Hive metastore to list the files again.
Map(NUM_FILES -> numFiles.toString,
TOTAL_SIZE -> totalSize.toString,
// Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
// see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
DDL_TIME -> now.toString)
}.getOrElse(Map.empty)
// inherit table storage format (possibly except for location)
CatalogTablePartition(
spec,
table.storage.copy(locationUri = Some(location.toUri)),
params)
}
spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
done += parts.length
logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
}
}

// Drops the partitions that do not exist in partitionSpecsAndLocs
private def dropPartitions(catalog: SessionCatalog,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)]): Int = {
val dropPartSpecs = ThreadUtils.parmap(
catalog.listPartitions(tableName),
"RepairTableCommand: non-existing partitions",
maxThreads = 8) { partition =>
partition.storage.locationUri.flatMap { uri =>
if (partitionSpecsAndLocs.map(_._2).contains(new Path(uri))) None else Some(partition.spec)
}
}.flatten
catalog.dropPartitions(
tableName,
dropPartSpecs,
ignoreIfNotExists = true,
purge = false,
// Since we have already checked that partition directories do not exist, we can avoid
// additional calls to the file system at the catalog side by setting this flag.
retainData = true)
dropPartSpecs.length
}
}
Expand Up @@ -610,6 +610,14 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case TruncateTableCommand(tableName, partitionSpec)
if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
TruncateHoodieTableCommand(tableName, partitionSpec)
// Rewrite RepairTableCommand to RepairHoodieTableCommand
case r if sparkAdapter.getCatalystPlanUtils.isRepairTable(r) =>
val (tableName, enableAddPartitions, enableDropPartitions, cmd) = sparkAdapter.getCatalystPlanUtils.getRepairTableChildren(r).get
if (sparkAdapter.isHoodieTable(tableName, sparkSession)) {
RepairHoodieTableCommand(tableName, enableAddPartitions, enableDropPartitions, cmd)
} else {
r
}
case _ => plan
}
}
Expand Down

0 comments on commit 928ceca

Please sign in to comment.