From f9eb91a9ef70f865c9e1496e1474c740649907dc Mon Sep 17 00:00:00 2001 From: KnightChess <981159963@qq.com> Date: Wed, 10 Aug 2022 16:30:31 +0800 Subject: [PATCH] [HUDI-3503] Support more feature to call procedure CleanCommand (#6353) --- .../procedures/RunCleanProcedure.scala | 23 +++++--- .../hudi/procedure/TestCleanProcedure.scala | 56 ++++++++++++++++++- 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala index 2a0143bafba8..36580176d0f7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala @@ -21,6 +21,7 @@ import org.apache.hudi.HoodieCLIUtils import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.config.HoodieCleanConfig +import org.apache.hudi.table.action.clean.CleaningTriggerStrategy import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} @@ -33,8 +34,12 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType, false), ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType, true), - ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, None), - ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 10) + ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, HoodieCleanConfig.CLEANER_POLICY.defaultValue()), + ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, HoodieCleanConfig.CLEANER_COMMITS_RETAINED.defaultValue().toInt), + ProcedureParameter.optional(5, "hours_retained", DataTypes.IntegerType, HoodieCleanConfig.CLEANER_HOURS_RETAINED.defaultValue().toInt), + ProcedureParameter.optional(6, "file_versions_retained", DataTypes.IntegerType, HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.defaultValue().toInt), + ProcedureParameter.optional(7, "trigger_strategy", DataTypes.StringType, HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.defaultValue()), + ProcedureParameter.optional(8, "trigger_max_commits", DataTypes.IntegerType, HoodieCleanConfig.CLEAN_MAX_COMMITS.defaultValue().toInt) ) private val OUTPUT_TYPE = new StructType(Array[StructField]( @@ -64,16 +69,16 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging val tableName = getArgValueOrDefault(args, PARAMETERS(0)) val skipLocking = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean] val scheduleInLine = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Boolean] - val cleanPolicy = getArgValueOrDefault(args, PARAMETERS(3)) - val retainCommits = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Integer] val basePath = getBasePath(tableName, Option.empty) val cleanInstantTime = HoodieActiveTimeline.createNewInstantTime() - var props: Map[String, String] = Map( - HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> String.valueOf(retainCommits) + val props: Map[String, String] = Map( + HoodieCleanConfig.CLEANER_POLICY.key() -> getArgValueOrDefault(args, PARAMETERS(3)).get.toString, + HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> getArgValueOrDefault(args, PARAMETERS(4)).get.toString, + HoodieCleanConfig.CLEANER_HOURS_RETAINED.key() -> getArgValueOrDefault(args, PARAMETERS(5)).get.toString, + HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() -> getArgValueOrDefault(args, PARAMETERS(6)).get.toString, + HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> getArgValueOrDefault(args, PARAMETERS(7)).get.toString, + HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, PARAMETERS(8)).get.toString ) - if (cleanPolicy.isDefined) { - props += (HoodieCleanConfig.CLEANER_POLICY.key() -> String.valueOf(cleanPolicy.get)) - } val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props) val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala index 7986c304c7e5..4dd950e382c7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala @@ -46,6 +46,7 @@ class TestCleanProcedure extends HoodieSparkProcedureTestBase { spark.sql(s"update $tableName set price = 12 where id = 1") spark.sql(s"update $tableName set price = 13 where id = 1") + // KEEP_LATEST_COMMITS val result1 = spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") .collect() .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), row.getString(3), row.getString(4), row.getInt(5))) @@ -53,6 +54,10 @@ class TestCleanProcedure extends HoodieSparkProcedureTestBase { assertResult(1)(result1.length) assertResult(2)(result1(0)(2)) + val result11 = spark.sql( + s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect() + assertResult(2)(result11.length) + checkAnswer(s"select id, name, price, ts from $tableName order by id") ( Seq(1, "a1", 13, 1000) ) @@ -60,7 +65,56 @@ class TestCleanProcedure extends HoodieSparkProcedureTestBase { val result2 = spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") .collect() assertResult(0)(result2.length) + + // KEEP_LATEST_FILE_VERSIONS + spark.sql(s"update $tableName set price = 14 where id = 1") + + val result3 = spark.sql( + s"call run_clean(table => '$tableName', clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 3)").collect() + assertResult(0)(result3.length) + + val result4 = spark.sql( + s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect() + assertResult(3)(result4.length) + + val result5 = spark.sql( + s"call run_clean(table => '$tableName', clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect() + assertResult(1)(result5.length) + assertResult(2)(result5(0)(2)) + + val result6 = spark.sql( + s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect() + assertResult(1)(result6.length) + + checkAnswer(s"select id, name, price, ts from $tableName order by id") ( + Seq(1, "a1", 14, 1000) + ) + + // trigger time + spark.sql(s"update $tableName set price = 15 where id = 1") + + // no trigger, only has 1 commit + val result7 = spark.sql( + s"call run_clean(table => '$tableName', trigger_max_commits => 2, clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect() + assertResult(0)(result7.length) + + val result8 = spark.sql( + s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect() + assertResult(2)(result8.length) + + // trigger + val result9 = spark.sql( + s"call run_clean(table => '$tableName', trigger_max_commits => 1, clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect() + assertResult(1)(result9.length) + assertResult(1)(result9(0)(2)) + + val result10 = spark.sql( + s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect() + assertResult(1)(result10.length) + + checkAnswer(s"select id, name, price, ts from $tableName order by id") ( + Seq(1, "a1", 15, 1000) + ) } } - }