Skip to content

Commit

Permalink
[HUDI-3503] Support more feature to call procedure CleanCommand (#6353)
Browse files Browse the repository at this point in the history
  • Loading branch information
KnightChess committed Aug 10, 2022
1 parent 43d6710 commit f9eb91a
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.config.HoodieCleanConfig
import org.apache.hudi.table.action.clean.CleaningTriggerStrategy
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
Expand All @@ -33,8 +34,12 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType, false),
ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType, true),
ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, None),
ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 10)
ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, HoodieCleanConfig.CLEANER_POLICY.defaultValue()),
ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, HoodieCleanConfig.CLEANER_COMMITS_RETAINED.defaultValue().toInt),
ProcedureParameter.optional(5, "hours_retained", DataTypes.IntegerType, HoodieCleanConfig.CLEANER_HOURS_RETAINED.defaultValue().toInt),
ProcedureParameter.optional(6, "file_versions_retained", DataTypes.IntegerType, HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.defaultValue().toInt),
ProcedureParameter.optional(7, "trigger_strategy", DataTypes.StringType, HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.defaultValue()),
ProcedureParameter.optional(8, "trigger_max_commits", DataTypes.IntegerType, HoodieCleanConfig.CLEAN_MAX_COMMITS.defaultValue().toInt)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down Expand Up @@ -64,16 +69,16 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val skipLocking = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean]
val scheduleInLine = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Boolean]
val cleanPolicy = getArgValueOrDefault(args, PARAMETERS(3))
val retainCommits = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Integer]
val basePath = getBasePath(tableName, Option.empty)
val cleanInstantTime = HoodieActiveTimeline.createNewInstantTime()
var props: Map[String, String] = Map(
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> String.valueOf(retainCommits)
val props: Map[String, String] = Map(
HoodieCleanConfig.CLEANER_POLICY.key() -> getArgValueOrDefault(args, PARAMETERS(3)).get.toString,
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> getArgValueOrDefault(args, PARAMETERS(4)).get.toString,
HoodieCleanConfig.CLEANER_HOURS_RETAINED.key() -> getArgValueOrDefault(args, PARAMETERS(5)).get.toString,
HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() -> getArgValueOrDefault(args, PARAMETERS(6)).get.toString,
HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, PARAMETERS(8)).get.toString
)
if (cleanPolicy.isDefined) {
props += (HoodieCleanConfig.CLEANER_POLICY.key() -> String.valueOf(cleanPolicy.get))
}
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,75 @@ class TestCleanProcedure extends HoodieSparkProcedureTestBase {
spark.sql(s"update $tableName set price = 12 where id = 1")
spark.sql(s"update $tableName set price = 13 where id = 1")

// KEEP_LATEST_COMMITS
val result1 = spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()
.map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), row.getString(3), row.getString(4), row.getInt(5)))

assertResult(1)(result1.length)
assertResult(2)(result1(0)(2))

val result11 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(2)(result11.length)

checkAnswer(s"select id, name, price, ts from $tableName order by id") (
Seq(1, "a1", 13, 1000)
)

val result2 = spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()
assertResult(0)(result2.length)

// KEEP_LATEST_FILE_VERSIONS
spark.sql(s"update $tableName set price = 14 where id = 1")

val result3 = spark.sql(
s"call run_clean(table => '$tableName', clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 3)").collect()
assertResult(0)(result3.length)

val result4 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(3)(result4.length)

val result5 = spark.sql(
s"call run_clean(table => '$tableName', clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
assertResult(1)(result5.length)
assertResult(2)(result5(0)(2))

val result6 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(1)(result6.length)

checkAnswer(s"select id, name, price, ts from $tableName order by id") (
Seq(1, "a1", 14, 1000)
)

// trigger time
spark.sql(s"update $tableName set price = 15 where id = 1")

// no trigger, only has 1 commit
val result7 = spark.sql(
s"call run_clean(table => '$tableName', trigger_max_commits => 2, clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
assertResult(0)(result7.length)

val result8 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(2)(result8.length)

// trigger
val result9 = spark.sql(
s"call run_clean(table => '$tableName', trigger_max_commits => 1, clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
assertResult(1)(result9.length)
assertResult(1)(result9(0)(2))

val result10 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(1)(result10.length)

checkAnswer(s"select id, name, price, ts from $tableName order by id") (
Seq(1, "a1", 15, 1000)
)
}
}

}

0 comments on commit f9eb91a

Please sign in to comment.