diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala index e7e529b12545a..efd0eacac7329 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala @@ -90,4 +90,14 @@ trait HoodieCatalystPlansUtils { def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan + /** + * Test if the logical plan is a Repair Table LogicalPlan. + */ + def isRepairTable(plan: LogicalPlan): Boolean + + /** + * Get the member of the Repair Table LogicalPlan. + */ + def getRepairTableChildren(plan: LogicalPlan): + Option[(TableIdentifier, Boolean, Boolean, String)] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index c3d8e9f8be617..eb5fbe917985c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -322,10 +322,9 @@ public static List getAllPartitionPaths(HoodieEngineContext engineContex public static Map getFilesInPartitions(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String basePathStr, - String[] partitionPaths, - String spillableMapPath) { + String[] partitionPaths) { try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr, - spillableMapPath, true)) { + FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true)) { return tableMetadata.getAllFilesInPartitions(Arrays.asList(partitionPaths)); } catch (Exception ex) { throw new HoodieException("Error get files in partitions: " + String.join(",", partitionPaths), ex); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index 92396c1820e49..a382954fd21fd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -138,7 +138,7 @@ public FileStatus[] getFilesInPartitions() { } String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new); FileStatus[] allFileStatus = FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(), - partitions, "/tmp/") + partitions) .values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new); Set candidateFiles = candidateFilesInMetadataTable(allFileStatus); if (candidateFiles == null) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index 025a224373aed..aff65672c5203 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig} import org.apache.hudi.common.fs.FSUtils @@ -78,6 +78,20 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala } + def getFilesInPartitions(spark: SparkSession, + table: CatalogTable, + partitionPaths: Seq[String]): Map[String, Array[FileStatus]] = { + val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + val metadataConfig = { + val properties = new Properties() + properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++ + table.properties).asJava) + HoodieMetadataConfig.newBuilder.fromProperties(properties).build() + } + FSUtils.getFilesInPartitions(sparkEngine, metadataConfig, getTableLocation(table, spark), + partitionPaths.toArray).asScala.toMap + } + /** * This method is used to compatible with the old non-hive-styled partition table. * By default we enable the "hoodie.datasource.write.hive_style_partitioning" diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/RepairHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/RepairHoodieTableCommand.scala new file mode 100644 index 0000000000000..cc25318c6a4b4 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/RepairHoodieTableCommand.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hadoop.fs.Path + +import org.apache.hudi.common.table.HoodieTableConfig + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.command.PartitionStatistics +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.util.ThreadUtils + +import java.util.concurrent.TimeUnit.MILLISECONDS +import scala.util.control.NonFatal + +/** + * Command for repair hudi table's partitions. + * Use the methods in HoodieSqlCommonUtils to obtain partitions and stats + * instead of scanning the file system. + */ +case class RepairHoodieTableCommand(tableName: TableIdentifier, + enableAddPartitions: Boolean, + enableDropPartitions: Boolean, + cmd: String = "MSCK REPAIR TABLE") extends HoodieLeafRunnableCommand { + + // These are list of statistics that can be collected quickly without requiring a scan of the data + // see https://github.com/apache/hive/blob/master/ + // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java + val NUM_FILES = "numFiles" + val TOTAL_SIZE = "totalSize" + val DDL_TIME = "transient_lastDdlTime" + + override def run(spark: SparkSession): Seq[Row] = { + val catalog = spark.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + val tableIdentWithDB = table.identifier.quotedString + if (table.partitionColumnNames.isEmpty) { + throw new AnalysisException( + s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB") + } + + if (table.storage.locationUri.isEmpty) { + throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " + + s"location provided: $tableIdentWithDB") + } + + val root = new Path(table.location) + logInfo(s"Recover all the partitions in $root") + + val hoodieCatalogTable = HoodieCatalogTable(spark, table.identifier) + val isHiveStyledPartitioning = hoodieCatalogTable.catalogProperties. + getOrElse(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key, "true").toBoolean + + val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = hoodieCatalogTable. + getPartitionPaths.map(partitionPath => { + var values = partitionPath.split('/') + if (isHiveStyledPartitioning) { + values = values.map(_.split('=')(1)) + } + (table.partitionColumnNames.zip(values).toMap, new Path(root, partitionPath)) + }) + + val droppedAmount = if (enableDropPartitions) { + dropPartitions(catalog, partitionSpecsAndLocs) + } else 0 + val addedAmount = if (enableAddPartitions) { + val total = partitionSpecsAndLocs.length + val partitionStats = if (spark.sqlContext.conf.gatherFastStats) { + HoodieSqlCommonUtils.getFilesInPartitions(spark, table, partitionSpecsAndLocs + .map(_._2.toString)) + .mapValues(statuses => PartitionStatistics(statuses.length, statuses.map(_.getLen).sum)) + } else { + Map.empty[String, PartitionStatistics] + } + logInfo(s"Finished to gather the fast stats for all $total partitions.") + addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) + total + } else 0 + // Updates the table to indicate that its partition metadata is stored in the Hive metastore. + // This is always the case for Hive format tables, but is not true for Datasource tables created + // before Spark 2.1 unless they are converted via `msck repair table`. + spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true)) + try { + spark.catalog.refreshTable(tableIdentWithDB) + } catch { + case NonFatal(e) => + logError(s"Cannot refresh the table '$tableIdentWithDB'. A query of the table " + + "might return wrong result if the table was cached. To avoid such issue, you should " + + "uncache the table manually via the UNCACHE TABLE command after table recovering will " + + "complete fully.", e) + } + logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).") + Seq.empty[Row] + } + + private def addPartitions(spark: SparkSession, + table: CatalogTable, + partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)], + partitionStats: Map[String, PartitionStatistics]): Unit = { + val total = partitionSpecsAndLocs.length + var done = 0L + // Hive metastore may not have enough memory to handle millions of partitions in single RPC, + // we should split them into smaller batches. Since Hive client is not thread safe, we cannot + // do this in parallel. + val batchSize = spark.sparkContext.conf.getInt("spark.sql.addPartitionInBatch.size", 100) + partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch => + val now = MILLISECONDS.toSeconds(System.currentTimeMillis()) + val parts = batch.map { case (spec, location) => + val params = partitionStats.get(location.toString).map { + case PartitionStatistics(numFiles, totalSize) => + // This two fast stat could prevent Hive metastore to list the files again. + Map(NUM_FILES -> numFiles.toString, + TOTAL_SIZE -> totalSize.toString, + // Workaround a bug in HiveMetastore that try to mutate a read-only parameters. + // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java + DDL_TIME -> now.toString) + }.getOrElse(Map.empty) + // inherit table storage format (possibly except for location) + CatalogTablePartition( + spec, + table.storage.copy(locationUri = Some(location.toUri)), + params) + } + spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) + done += parts.length + logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)") + } + } + + // Drops the partitions that do not exist in partitionSpecsAndLocs + private def dropPartitions(catalog: SessionCatalog, + partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)]): Int = { + val dropPartSpecs = ThreadUtils.parmap( + catalog.listPartitions(tableName), + "RepairTableCommand: non-existing partitions", + maxThreads = 8) { partition => + partition.storage.locationUri.flatMap { uri => + if (partitionSpecsAndLocs.map(_._2).contains(new Path(uri))) None else Some(partition.spec) + } + }.flatten + catalog.dropPartitions( + tableName, + dropPartSpecs, + ignoreIfNotExists = true, + purge = false, + // Since we have already checked that partition directories do not exist, we can avoid + // additional calls to the file system at the catalog side by setting this flag. + retainData = true) + dropPartSpecs.length + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index c5688965d7d29..5962ac867c41f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -610,6 +610,14 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic case TruncateTableCommand(tableName, partitionSpec) if sparkAdapter.isHoodieTable(tableName, sparkSession) => TruncateHoodieTableCommand(tableName, partitionSpec) + // Rewrite RepairTableCommand to RepairHoodieTableCommand + case r if sparkAdapter.getCatalystPlanUtils.isRepairTable(r) => + val (tableName, enableAddPartitions, enableDropPartitions, cmd) = sparkAdapter.getCatalystPlanUtils.getRepairTableChildren(r).get + if (sparkAdapter.isHoodieTable(tableName, sparkSession)) { + RepairHoodieTableCommand(tableName, enableAddPartitions, enableDropPartitions, cmd) + } else { + r + } case _ => plan } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala new file mode 100644 index 0000000000000..498121e1ab4a3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE +import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME + +import org.apache.spark.sql.SaveMode + +class TestRepairTable extends HoodieSparkSqlTestBase { + + test("Test msck repair non-partitioned table") { + Seq("true", "false").foreach { hiveStylePartitionEnable => + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | ts long, + | dt string, + | hh string + | ) using hudi + | location '$basePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.hive_style_partitioning = '$hiveStylePartitionEnable' + | ) + """.stripMargin) + + checkExceptionContain(s"msck repair table $tableName")( + s"Operation not allowed") + } + } + } + + test("Test msck repair partitioned table") { + Seq("true", "false").foreach { hiveStylePartitionEnable => + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | ts long, + | dt string, + | hh string + | ) using hudi + | partitioned by (dt, hh) + | location '$basePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.hive_style_partitioning = '$hiveStylePartitionEnable' + | ) + """.stripMargin) + val table = spark.sessionState.sqlParser.parseTableIdentifier(tableName) + + import spark.implicits._ + val df = Seq((1, "a1", 1000, "2022-10-06", "11"), (2, "a2", 1001, "2022-10-06", "12")) + .toDF("id", "name", "ts", "dt", "hh") + df.write.format("hudi") + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt, hh") + .option(HIVE_STYLE_PARTITIONING_ENABLE.key, hiveStylePartitionEnable) + .mode(SaveMode.Append) + .save(basePath) + + assertResult(Seq())(spark.sessionState.catalog.listPartitionNames(table)) + spark.sql(s"msck repair table $tableName") + assertResult(Seq("dt=2022-10-06/hh=11", "dt=2022-10-06/hh=12"))( + spark.sessionState.catalog.listPartitionNames(table)) + } + } + } + + test("Test msck repair partitioned table [add/drop/sync] partitions") { + if (HoodieSparkUtils.gteqSpark3_2) { + Seq("true", "false").foreach { hiveStylePartitionEnable => + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | ts long, + | dt string + | ) using hudi + | partitioned by (dt) + | location '$basePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.hive_style_partitioning = '$hiveStylePartitionEnable' + | ) + """.stripMargin) + val table = spark.sessionState.sqlParser.parseTableIdentifier(tableName) + + // test msck repair table add partitions + import spark.implicits._ + val df1 = Seq((1, "a1", 1000, "2022-10-06")).toDF("id", "name", "ts", "dt") + df1.write.format("hudi") + .option(TBL_NAME.key(), tableName) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(HIVE_STYLE_PARTITIONING_ENABLE.key, hiveStylePartitionEnable) + .mode(SaveMode.Append) + .save(basePath) + + assertResult(Seq())(spark.sessionState.catalog.listPartitionNames(table)) + spark.sql(s"msck repair table $tableName add partitions") + assertResult(Seq("dt=2022-10-06"))(spark.sessionState.catalog.listPartitionNames(table)) + + // test msck repair table drop partitions + val df2 = Seq((2, "a2", 1001, "2022-10-07")).toDF("id", "name", "ts", "dt") + df2.write.format("hudi") + .option(TBL_NAME.key(), tableName) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(HIVE_STYLE_PARTITIONING_ENABLE.key, hiveStylePartitionEnable) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertResult(Seq("dt=2022-10-06"))(spark.sessionState.catalog.listPartitionNames(table)) + spark.sql(s"msck repair table $tableName drop partitions") + assertResult(Seq())(spark.sessionState.catalog.listPartitionNames(table)) + + // test msck repair table sync partitions + spark.sql(s"msck repair table $tableName sync partitions") + assertResult(Seq("dt=2022-10-07"))(spark.sessionState.catalog.listPartitionNames(table)) + } + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala index 4156198153b4c..2672e2c4cbee3 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedRelatio import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} -import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, ExplainCommand} import org.apache.spark.sql.internal.SQLConf object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { @@ -74,4 +74,18 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = { throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2") } + + override def isRepairTable(plan: LogicalPlan): Boolean = { + plan.isInstanceOf[AlterTableRecoverPartitionsCommand] + } + + override def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)] = { + plan match { + // For Spark >= 3.2.x, AlterTableRecoverPartitionsCommand was renamed RepairTableCommand, and added two new + // parameters: enableAddPartitions and enableDropPartitions. By setting them to true and false, can restore + // AlterTableRecoverPartitionsCommand's behavior + case c: AlterTableRecoverPartitionsCommand => + Some((c.tableName, true, false, c.cmd)) + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala index 57864004df834..a4016f18cc614 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.types.StructType object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -31,4 +33,18 @@ object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { } override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = ProjectionOverSchema(schema) + + override def isRepairTable(plan: LogicalPlan): Boolean = { + plan.isInstanceOf[AlterTableRecoverPartitionsCommand] + } + + override def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)] = { + plan match { + // For Spark >= 3.2.x, AlterTableRecoverPartitionsCommand was renamed RepairTableCommand, and added two new + // parameters: enableAddPartitions and enableDropPartitions. By setting them to true and false, can restore + // AlterTableRecoverPartitionsCommand's behavior + case c: AlterTableRecoverPartitionsCommand => + Some((c.tableName, true, false, c.cmd)) + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala index 19025ce0d5d6c..0548fd47a4db8 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala @@ -20,8 +20,11 @@ package org.apache.spark.sql import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.util.ValidationUtils.checkArgument + +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation} +import org.apache.spark.sql.execution.command.RepairTableCommand import org.apache.spark.sql.types.StructType object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -54,4 +57,17 @@ object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { p.asInstanceOf[ProjectionOverSchema] } + + override def isRepairTable(plan: LogicalPlan): Boolean = { + plan.isInstanceOf[RepairTableCommand] + } + + override def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)] = { + plan match { + case rtc: RepairTableCommand => + Some((rtc.tableName, rtc.enableAddPartitions, rtc.enableDropPartitions, rtc.cmd)) + case _ => + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala index 4d4921bc03472..c3642544889ce 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation} +import org.apache.spark.sql.execution.command.RepairTableCommand import org.apache.spark.sql.types.StructType object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -39,4 +41,17 @@ object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = ProjectionOverSchema(schema, output) + + override def isRepairTable(plan: LogicalPlan): Boolean = { + plan.isInstanceOf[RepairTableCommand] + } + + override def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)] = { + plan match { + case rtc: RepairTableCommand => + Some((rtc.tableName, rtc.enableAddPartitions, rtc.enableDropPartitions, rtc.cmd)) + case _ => + None + } + } }