From 932376f794fa702293351a41d57ad86d1d3a4b1a Mon Sep 17 00:00:00 2001 From: stream2000 <18889897088@163.com> Date: Fri, 20 Oct 2023 15:59:37 +0800 Subject: [PATCH] rename ttl to partition lifecycle --- rfc/rfc-65/rfc-65.md | 225 +++++++++++++++++++++++++------------------ 1 file changed, 132 insertions(+), 93 deletions(-) diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md index 7b3106ab8ba31..4bedb8387c096 100644 --- a/rfc/rfc-65/rfc-65.md +++ b/rfc/rfc-65/rfc-65.md @@ -14,49 +14,60 @@ JIRA: [HUDI-5823](https://issues.apache.org/jira/browse/HUDI-5823) ## Abstract In some classic hudi use cases, users partition hudi data by time and are only interested in data from a recent period -of time. The outdated data is useless and costly, we need a TTL(Time-To-Live) management mechanism to prevent the +of time. The outdated data is useless and costly, we need a lifecycle management mechanism to prevent the dataset from growing infinitely. -This proposal introduces Partition TTL Management strategies to hudi, people can config the strategies by table config -directly or by call commands. With proper configs set, Hudi can find out which partitions are outdated and delete them. +This proposal introduces partition lifecycle management strategies to hudi, people can config the strategies by write +configs. With proper configs set, Hudi can find out which partitions are expired and delete them. - -This proposal introduces Partition TTL Management service to hudi. TTL management is like other table services such as Clean/Compaction/Clustering. -The user can config their ttl strategies through write configs and Hudi will help users find expired partitions and delete them automatically. +This proposal introduces partition lifecycle management service to hudi. Lifecycle management is like other table +services such as Clean/Compaction/Clustering. +Users can config their partition lifecycle management strategies through write configs and Hudi will help users find +expired partitions and delete them automatically. ## Background -TTL management mechanism is an important feature for databases. Hudi already provides a `delete_partition` interface to +Lifecycle management mechanism is an important feature for databases. Hudi already provides a `delete_partition` +interface to delete outdated partitions. However, users still need to detect which partitions are outdated and -call `delete_partition` manually, which means that users need to define and implement some kind of TTL strategies, find expired partitions and call call `delete_partition` by themselves. As the scale of installations grew, it is becoming increasingly important to implement a user-friendly TTL management mechanism for hudi. +call `delete_partition` manually, which means that users need to define and implement some kind of partition lifecycle +management strategies, find expired partitions and call `delete_partition` by themselves. As the scale of installations +grew, it is becoming increasingly important to implement a user-friendly lifecycle management mechanism for hudi. ## Implementation Our main goals are as follows: -* Providing an extensible framework for partition TTL management. -* Implement a simple KEEP_BY_TIME strategy, which can be executed through independent Spark job, synchronous or asynchronous table services. +* Providing an extensible framework for partition lifecycle management. +* Implement a simple KEEP_BY_TIME strategy, which can be executed through independent Spark job, synchronous or + asynchronous table services. ### Strategy Definition -The TTL strategies is similar to existing table service strategies. We can define TTL strategies like defining a clustering/clean/compaction strategy: +The lifecycle strategies is similar to existing table service strategies. We can define lifecycle strategies like +defining a clustering/clean/compaction strategy: ```properties -hoodie.partition.ttl.management.strategy=KEEP_BY_TIME -hoodie.partition.ttl.management.strategy.class=org.apache.hudi.table.action.ttl.strategy.KeepByTimePartitionTTLManagementStrategy -hoodie.partition.ttl.days.retain=10 +hoodie.partition.lifecycle.management.strategy=KEEP_BY_TIME +hoodie.partition.lifecycle.management.strategy.class=org.apache.hudi.table.action.lifecycle.strategy.KeepByTimePartitionLifecycleManagementStrategy +hoodie.partition.lifecycle.days.retain=10 ``` -The config `hoodie.partition.ttl.management.strategy.class` is to provide a strategy class (subclass of `PartitionTTLManagementStrategy`) to get expired partition paths to delete. And `hoodie.partition.ttl.days.retain` is the strategy value used by `KeepByTimePartitionTTLManagementStrategy` which means that we will expire partitions that haven't been modified for this strategy value set. We will cover the `KeepByTimeTTLManagementStrategy` strategy in detail in the next section. +The config `hoodie.partition.lifecycle.management.strategy.class` is to provide a strategy class (subclass +of `PartitionLifecycleManagementStrategy`) to get expired partition paths to delete. +And `hoodie.partition.lifecycle.days.retain` is the strategy value used +by `KeepByTimePartitionLifecycleManagementStrategy` which means that we will expire partitions that haven't been +modified for this strategy value set. We will cover the `KeepByTimePartitionLifecycleManagementStrategy` strategy in +detail in the next section. -The core definition of `PartitionTTLManagementStrategy` looks like this: +The core definition of `PartitionLifecycleManagementStrategy` looks like this: ```java /** - * Strategy for partition-level TTL management. + * Strategy for partition-level lifecycle management. */ -public abstract class PartitionTTLManagementStrategy { +public abstract class PartitionLifecycleManagementStrategy { /** - * Get expired partition paths for a specific partition ttl management strategy. + * Get expired partition paths for a specific partition lifecycle management strategy. * * @return Expired partition paths. */ @@ -64,143 +75,171 @@ public abstract class PartitionTTLManagementStrategy { } ``` -Users can provide their own implementation of `PartitionTTLManagementStrategy` and hudi will help delete the expired partitions. +Users can provide their own implementation of `PartitionLifecycleManagementStrategy` and hudi will help delete the +expired partitions. -### KeepByTimeTTLManagementStrategy +### KeepByTimePartitionLifecycleManagementStrategy -We will provide a strategy call `KeepByTimePartitionTTLManagementStrategy` in the first version of partition TTL management implementation. +We will provide a strategy call `KeepByTimePartitionLifecycleManagementStrategy` in the first version of partition +lifecycle management implementation. -The `KeepByTimePartitionTTLManagementStrategy` will calculate the `lastModifiedTime` for each input partitions. If duration between now and 'lastModifiedTime' for the partition is larger than what `hoodie.partition.ttl.days.retain` configured, `KeepByTimePartitionTTLManagementStrategy` will mark this partition as an expired partition. We use day as the unit of expired time since it is very common-used for datalakes. Open to ideas for this. +The `KeepByTimePartitionLifecycleManagementStrategy` will calculate the `lastModifiedTime` for each input partitions. If +duration between now and 'lastModifiedTime' for the partition is larger than +what `hoodie.partition.lifecycle.days.retain` configured, `KeepByTimePartitionLifecycleManagementStrategy` will mark +this partition as an expired partition. We use day as the unit of expired time since it is very common-used for +datalakes. Open to ideas for this. we will to use the largest commit time of committed file groups in the partition as the partition's -`lastModifiedTime`. So any write (including normal DMLs, clustering etc.) with larger instant time will change the partition's `lastModifiedTime`. +`lastModifiedTime`. So any write (including normal DMLs, clustering etc.) with larger instant time will change the +partition's `lastModifiedTime`. -For file groups generated by replace commit, it may not reveal the real insert/update time for the file group. However, we can assume that we won't do clustering for a partition without new writes for a long time when using the strategy. And in the future, we may introduce a more accurate mechanism to get `lastModifiedTime` of a partition, for example using metadata table. +For file groups generated by replace commit, it may not reveal the real insert/update time for the file group. However, +we can assume that we won't do clustering for a partition without new writes for a long time when using the strategy. +And in the future, we may introduce a more accurate mechanism to get `lastModifiedTime` of a partition, for example +using metadata table. ### Apply different strategies for different partitions -For some specific users, they may want to apply different strategies for different partitions. For example, they may have multi partition fileds(productId, day). For partitions under `product=1` they want to keep for 30 days while for partitions under `product=2` they want to keep for 7 days only. +For some specific users, they may want to apply different strategies for different partitions. For example, they may +have multi partition fileds(productId, day). For partitions under `product=1` they want to keep for 30 days while for +partitions under `product=2` they want to keep for 7 days only. -For the first version of TTL management, we do not plan to implement a complicated strategy (For example, use an array to store strategies, introduce partition regex etc.). Instead, we add a new abstract method `getPartitionPathsForTTLManagement` in `PartitionTTLManagementStrategy` and provides a new config `hoodie.partition.ttl.management.partition.selected`. +For the first version of partition lifecycle management, we do not plan to implement a complicated strategy (For +example, use an array to store strategies, introduce partition regex etc.). Instead, we add a new abstract +method `getPartitionPathsForLifecycleManagement` in `PartitionLifecycleManagementStrategy` and provides a new +config `hoodie.partition.lifecycle.management.partition.selected`. -If `hoodie.partition.ttl.management.partition.selected` is set, `getPartitionPathsForTTLManagement` will return partitions provided by this config. If not, `getPartitionPathsForTTLManagement` will return all partitions in the hudi table. +If `hoodie.partition.lifecycle.management.partition.selected` is set, `getPartitionPathsForLifecycleManagement` will +return partitions provided by this config. If not, `getPartitionPathsForLifecycleManagement` will return all partitions +in the hudi table. -TTL management strategies will only be applied for partitions return by `getPartitionPathsForTTLManagement`. +lifecycle management strategies will only be applied for partitions return by `getPartitionPathsForLifecycleManagement`. -Thus, if users want to apply different strategies for different partitions, they can do the TTL management multiple times, selecting different partitions and apply corresponding strategies. And we may provide a batch interface in the future to simplify this. +Thus, if users want to apply different strategies for different partitions, they can do the partition lifecycle +management multiple times, selecting different partitions and apply corresponding strategies. And we may provide a batch +interface in the future to simplify this. -The `getPartitionPathsForTTLManagement` method will look like this: +The `getPartitionPathsForLifecycleManagement` method will look like this: ```java /** - * Strategy for partition-level TTL management. + * Strategy for partition-level lifecycle management. */ -public abstract class PartitionTTLManagementStrategy { - /** - * Scan and list all partitions for partition TTL management. - * - * @return Partitions to apply TTL management strategy - */ - protected List getPartitionPathsForTTLManagement() { - if (StringUtils.isNullOrEmpty(config.getTTLManagementPartitionSelected())) { - return getMatchedPartitions(); - } else { - // Return All partition paths - return FSUtils.getAllPartitionPaths(hoodieTable.getContext(), config.getMetadataConfig(), config.getBasePath()); - } - } +public abstract class PartitionLifecycleManagementStrategy { + /** + * Scan and list all partitions for partition lifecycle management. + * + * @return Partitions to apply lifecycle management strategy + */ + protected List getPartitionPathsForLifecycleManagement() { + if (StringUtils.isNullOrEmpty(config.getLifecycleManagementPartitionSelected())) { + return getMatchedPartitions(); + } else { + // Return All partition paths + return FSUtils.getAllPartitionPaths(hoodieTable.getContext(), config.getMetadataConfig(), config.getBasePath()); + } + } } ``` -### Executing TTL management +### Executing partition lifecycle management -Once we already have a proper `PartitionTTLManagementStrategy` implementation, it's easy to execute the ttl management. +Once we already have a proper `PartitionLifecycleManagementStrategy` implementation, it's easy to execute the partition +lifecycle management. ```java -public class SparkTTLManagementActionExecutor extends BaseSparkCommitActionExecutor { - @Override - public HoodieWriteMetadata> execute() { - // Construct PartitionTTLManagementStrategy - PartitionTTLManagementStrategy strategy = (PartitionTTLManagementStrategy) ReflectionUtils.loadClass( - PartitionTTLManagementStrategy.checkAndGetPartitionTTLManagementStrategy(config), - new Class[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config); - - // Get expired partition paths - List expiredPartitions = strategy.getExpiredPartitionPaths(); - - // Delete them reusing SparkDeletePartitionCommitActionExecutor - return new SparkDeletePartitionCommitActionExecutor<>(context, config, table, instantTime, expiredPartitions).execute(); - } +public class SparkPartitionLifecycleManagementActionExecutor extends BaseSparkCommitActionExecutor { + @Override + public HoodieWriteMetadata> execute() { + // Construct PartitionLifecycleManagementStrategy + PartitionLifecycleManagementStrategy strategy = (PartitionLifecycleManagementStrategy) ReflectionUtils.loadClass( + PartitionLifecycleManagementStrategy.checkAndGetPartitionLifecycleManagementStrategy(config), + new Class[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config); + + // Get expired partition paths + List expiredPartitions = strategy.getExpiredPartitionPaths(); + + // Delete them reusing SparkDeletePartitionCommitActionExecutor + return new SparkDeletePartitionCommitActionExecutor<>(context, config, table, instantTime, + expiredPartitions).execute(); + } } ``` -We will add a new method `managePartitionTTL` in `HoodieTable` and `HoodieSparkCopyOnWriteTable` can implement it like this: +We will add a new method `managePartitionLifecycle` in `HoodieTable` and `HoodieSparkCopyOnWriteTable` can implement it +like this: ```java @Override -public HoodieWriteMetadata> managePartitionTTL(HoodieEngineContext context, String instantTime) { - return new SparkTTLManagementActionExecutor<>(context, config, this, instantTime).execute(); -} +public HoodieWriteMetadata>managePartitionLifecycle(HoodieEngineContext context,String instantTime){ + return new SparkPartitionLifecycleManagementActionExecutor<>(context,config,this,instantTime).execute(); + } ``` -We can call `hoodieTable.managePartitionTTL` in independent flink/spark job, in async/sync inline table services like clustering/compaction/clean etc. - +We can call `hoodieTable.managePartitionLifecycle` in independent flink/spark job, in async/sync inline table services +like clustering/compaction/clean etc. -### User interface for Partition TTL Management +### User interface for Partition lifecycle Management -We can do partition TTL management inline with streaming ingestion job or do it with a independent batch job, for both spark and flink engine. +We can do partition lifecycle management inline with streaming ingestion job or do it with a independent batch job, for +both spark and flink engine. -#### Run inline with Streaming Ingestion +#### Run inline with Streaming Ingestion -Since we can run clustering inline with streaming ingestion job through the following config: +Since we can run clustering inline with streaming ingestion job through the following config: ```properties hoodie.clustering.async.enabled=true hoodie.clustering.async.max.commits=5 ``` -We can do similar thing for partition TTL management. The config for async ttl management are: +We can do similar thing for partition lifecycle management. The config for async partition lifecycle management are: -| Config key | Remarks | Default | -|-----------------------------------------|--------------------------------------------------------------------------------------------------------------------|---------| -| hoodie.ttl.management.async.enabled | Enable running of TTL management service, asynchronously as writes happen on the table. | False | -| hoodie.ttl.management.async.max.commits | Control frequency of async TTL management by specifying after how many commits TTL management should be triggered. | 4 | +| Config key | Remarks | Default | +|---------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------|---------| +| hoodie.partition.lifecycle.management.async.enabled | Enable running of lifecycle management service, asynchronously as writes happen on the table. | False | +| hoodie.partition.lifecycle.management.async.max.commits | Control frequency of async lifecycle management by specifying after how many commits lifecycle management should be triggered. | 4 | - -We can easily implement async ttl management for both spark and flink engine since we only need to call `hoodieTable.managePartitionTTL`. And we can support synchronized ttl management if we want. +We can easily implement async partition lifecycle management for both spark and flink engine since we only need to +call `hoodieTable.managePartitionLifecycle`. And we can support synchronized lifecycle management if we want. #### Run by Independent Job -Deleting a large number of partitions is a heavy operation so we may want to run TTL management through a independent job. We will provide a SparkSQL Call Command to run TTL management and it may look like this: +Deleting a large number of partitions is a heavy operation so we may want to run partition lifecycle management through +a +independent job. We will provide a SparkSQL Call Command to run partition lifecycle management, and it may look like +this: ```sql -call managePartitionTTL(table => 'hudi_table', strategy => 'KEEP_BY_TIME', daysRetain => '10', predicate => 'productid = 1'); +call managePartitionLifecycle(table => 'hudi_table', strategy => 'KEEP_BY_TIME', daysRetain => '10', predicate => 'productid = 1'); ``` The params are as follows: -| Param name | Remarks | Default | -|------------|-------------------------------------------------------------------------------------------------------------------|--------------| -| table | The hoodie table to run partition TTL management | empty string | -| basePath | The hoodie table path to run partition TTL management | empty string | -| strategy | The partition TTL management strategy, corresponding to a implementation of `PartitionTTLManagementStrategy` | KEEP_BY_TIME | -| predicate | Partition predicate for TTL management, will only apply ttl strategy on the partitions selected by this predicate | empty string | - +| Param name | Remarks | Default | +|------------|----------------------------------------------------------------------------------------------------------------------------------------------------|--------------| +| table | The hoodie table to run partition lifecycle management | empty string | +| basePath | The hoodie table path to run partition lifecycle management | empty string | +| strategy | The partition lifecycle management strategy, corresponding to a implementation of `PartitionLifecycleManagementStrategy` | KEEP_BY_TIME | +| predicate | Partition predicate for lifecycle management, will only apply partition lifecycle management strategy on the partitions selected by this predicate | empty string | -Besides SparkSQL call commands, we can support run TTL management with a spark jar like running clustering by `HoodieClusteringJob` and run TTL with a flink job like `HoodieFlinkClusteringJob` in the future. +Besides SparkSQL call commands, we can support run lifecycle management with a spark jar like running clustering +by `HoodieClusteringJob` and run lifecycle with a flink job like `HoodieFlinkClusteringJob` in the future. ### Future plan -We can do a lot of things about TTL management in the future: +We can do a lot of things about lifecycle management in the future: -* Support record level TTL management +* Support record level lifecycle management * Move the partitions to be cleaned up to cold/cheaper storage in objects stores instead of delete them forever -* Stash the partitions to be cleaned up in .stashedForDeletion folder (at .hoodie level) and introduce some recover mechanism, for data security. +* Stash the partitions to be cleaned up in .stashedForDeletion folder (at .hoodie level) and introduce some recover + mechanism, for data security. * ... ## Rollout/Adoption Plan -Hoodie Partition TTL Management V1 will support a simple KEEP_BY_TIME strategy at first, and others can implement their own `PartitionTTLManagementStrategy`. +Hoodie Partition lifecycle Management V1 will support a simple KEEP_BY_TIME strategy at first, and others can implement +their own `PartitionLifecycleManagementStrategy`. Add this feature won't affect existing functions.