From 2d1ce8271a9afc8d1d833b0b675f17e91e434ca9 Mon Sep 17 00:00:00 2001 From: StreamingFlames Date: Mon, 27 Feb 2023 19:08:27 +0800 Subject: [PATCH 1/9] [HUDI-5823] RFC for Partition TTL Management --- rfc/rfc-65/rfc-65.md | 110 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 rfc/rfc-65/rfc-65.md diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md new file mode 100644 index 000000000000..93d9b0821955 --- /dev/null +++ b/rfc/rfc-65/rfc-65.md @@ -0,0 +1,110 @@ +## Proposers +- @stream2000 +- @hujincalrin +- @huberylee +- @YuweiXiao +## Approvers +## Status +JIRA: [HUDI-5823](https://issues.apache.org/jira/browse/HUDI-5823) +## Abstract +In some classic hudi use cases, users partition hudi data by time and are only interested in data from a recent period of time. The outdated data is useless and costly, we need a TTL(Time-To-Live) management mechanism to prevent the dataset from growing infinitely. +This proposal introduces Partition TTL Management policies to hudi, people can config the policies by table config directly or by call commands. With proper configs set, Hudi can find out which partitions are outdated and delete them. +## Background +TTL management mechanism is an important feature for databases. Hudi already provides a delete_partition interface to delete outdated partitions. However, users still need to detect which partitions are outdated and call `delete_partition` manually, which means that users need to define and implement some kind of TTL policies and maintain proper statistics to find expired partitions by themself. As the scale of installations grew, it's more important to implement a user-friendly TTL management mechanism for hudi. +## Implementation +There are 3 components to implement Partition TTL Management + +- TTL policy definition & storage +- Partition statistics for TTL management +- Appling policies +### TTL Policy Definition +We have three main considerations when designing TTL policy: + +1. User hopes to manage partition TTL not only by expired time but also by sub-partitions count and sub-partitions size. So we need to support the following three different TTL policy types. + 1. **KEEP_BY_TIME**. Partitions will expire N days after their last modified time. + 2. **KEEP_BY_COUNT**. Keep N sub-partitions for a high-level partition. When sub partition count exceeds, delete the partitions with smaller partition values until the sub-partition count meets the policy configuration. + 3. **KEEP_BY_SIZE**. Similar to KEEP_BY_COUNT, but to ensure that the sum of the data size of all sub-partitions does not exceed the policy configuration. +2. User need to set different policies for different partitions. For example, the hudi table is partitioned by two fields (user_id, ts). For partition(user_id='1'), we set the policy to keep 100G data for all sub-partitions, and for partition(user_id='2') we set the policy that all sub-partitions will expire 10 days after their last modified time. +3. It's possible that there are a lot of high-level partitions in the user's table, and they don't want to set TTL policies for all the high-level partitions. So we need to provide a default policy mechanism so that users can set a default policy for all high-level partitions and add some explicit policies for some of them if needed. Explicit policies will override the default policy. + +So here we have the TTL policy definition: +```java +public class HoodiePartitionTTLPolicy { + public enum TTLPolicy { + KEEP_BY_TIME, KEEP_BY_SIZE, KEEP_BY_COUNT + } + + // Partition spec for which the policy takes effect + private String partitionSpec; + + private TTLPolicy policy; + + private long policyValue; +} +``` + +### User Interface for TTL policy +Users can config partition TTL management policies through SparkSQL Call Command and through table config directly. Assume that the user has a hudi table partitioned by two fields(user_id, ts), he can config partition TTL policies as follows. + +```sql +-- Set default policy for all user_id, which keeps the data for 30 days. +call add_ttl_policy(table => 'test', partitionSpec => 'user_id=*/', policy => 'KEEP_BY_TIME', policyValue => '30'); + +--For partition user_id=1/, keep 10 sub partitions. +call add_ttl_policy(table => 'test', partitionSpec => 'user_id=1/', policy => 'KEEP_BY_COUNT', policyValue => '10'); + +--For partition user_id=2/, keep 100GB data in total +call add_ttl_policy(table => 'test', partitionSpec => 'user_id=2/', policy => 'KEEP_BY_SIZE', policyValue => '107374182400'); + +--For partition user_id=3/, keep the data for 7 day. +call add_ttl_policy(table => 'test', partitionSpec => 'user_id=3/', policy => 'KEEP_BY_TIME', policyValue => '7'); + +-- Show all the TTL policies including default and explicit policies +call show_ttl_policies(table => 'test'); +user_id=*/ KEEP_BY_TIME 30 +user_id=1/ KEEP_BY_COUNT 10 +user_id=2/ KEEP_BY_SIZE 107374182400 +user_id=3/ KEEP_BY_TIME 7 +``` + +### Storage for TTL policy +The partition TTL policies will be stored in `hoodie.properties`since it is part of table metadata. The policy configs in `hoodie.properties`are defined as follows. Explicit policies are defined using a JSON array while default policy is defined using separate configs. + +```properties +# Default TTL policy definition +hoodie.partition.ttl.management.default.policy=KEEP_BY_TIME +hoodie.partition.ttl.management.default.fields=user_id +hoodie.partition.ttl.management.default.policy.value=30 + +# Explicit TTL policy definition +hoodie.partition.ttl.policies=[{"partitionSpec"\:"user_id\=2/","policy"\:"KEEP_BY_SIZE","policyValue"\:107374182400},{"partitionSpec"\:"user_id\=1/","policy"\:"KEEP_BY_COUNT","policyValue"\:10},{"partitionSpec"\:"user_id\=3/","policy"\:"KEEP_BY_TIME","policyValue"\:7}] +``` + +### Partition Statistics +#### Partition Statistics Entity +We need need to maintain a partition stat for every partition in the hudi table. The stat will contain three fields: + +- RelativePath. Relative path to the base path, or we can call it PartitionPath. +- LastModifiedTime. Last instant time that modified the partition. We need this information to support the `KEEP_BY_TIME` policy. +- PartitionTotalFileSize. The sum of all valid data file sizes in the partition, to support the `KEEP_BY_SIZE`policy. We calculate only the latest file slices in the partition currently. +#### Gathering Partition Statistics +The simplest way to get partition statistics is that reading information from the metadata table. However, the write and compaction of the metadata table will severely affect the throughput and latency of data ingestion. So we design an asynchronous partition statistics as follows. + +- At the first time we gather the partitions statistics, we list all partitions in the hudi table and calculate `PartitionTotalFileSize`and `LastModifiedTime`for each partition. Then we store all the partition stats in persistent storage along with the instant time we do the stats. For example, we can directly store all partition stats in a JSON file whose file name contains the instant time. +- After initializing the partition statistics, we can list only affected partitions by reading timeline metadata and store the new partition statistics back to the storage with new instant time. +- Note that deleted partition will be deleted from partition statistics too. If a partition was deleted before and have no new data, we will remove it from partition statistics so it won't calculated as expired partition again. +### Appling Policies +Once we have defined the TTL policies and have proper partition statistics, it's easy to apply the policies and delete expired partitions. + +1. Gather partitions statistics. +2. Apply each TTL policy. For explicit policies, find sub-partitions matched `PartitionSpec`defined in the policy and check if there are expired partitions according to the policy type and size. For default policy, find partitions that do not match any explicit policy and check if they are expired. +3. For all expired partitions, call delete_partition to delete them, which will generate a replace commit and mark all files in those partitions as replaced. For pending clustering and compaction that affect the target partition to delete, [HUDI-5553](https://issues.apache.org/jira/browse/HUDI-5553) introduces a pre-check that will throw an exception, and further improvement could be discussed in [HUDI-5663](https://issues.apache.org/jira/browse/HUDI-5663). +4. Clean then will delete all replaced file groups. +## Rollout/Adoption Plan +This will be updated once we decide on one of the approaches proposed above. +## Test Plan +This will be updated once we decide on one of the approaches proposed above. + + + + From f2533397175067f565e551bd443abb9b8691b6fa Mon Sep 17 00:00:00 2001 From: stream2000 <18889897088@163.com> Date: Wed, 31 May 2023 15:26:32 +0800 Subject: [PATCH 2/9] address comments and add more details --- rfc/rfc-65/rfc-65.md | 159 ++++++++++++++++++++++++++++--------------- 1 file changed, 105 insertions(+), 54 deletions(-) diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md index 93d9b0821955..8494e5d2ee6f 100644 --- a/rfc/rfc-65/rfc-65.md +++ b/rfc/rfc-65/rfc-65.md @@ -1,40 +1,72 @@ ## Proposers + - @stream2000 - @hujincalrin - @huberylee - @YuweiXiao + ## Approvers + ## Status + JIRA: [HUDI-5823](https://issues.apache.org/jira/browse/HUDI-5823) + ## Abstract -In some classic hudi use cases, users partition hudi data by time and are only interested in data from a recent period of time. The outdated data is useless and costly, we need a TTL(Time-To-Live) management mechanism to prevent the dataset from growing infinitely. -This proposal introduces Partition TTL Management policies to hudi, people can config the policies by table config directly or by call commands. With proper configs set, Hudi can find out which partitions are outdated and delete them. + +In some classic hudi use cases, users partition hudi data by time and are only interested in data from a recent period +of time. The outdated data is useless and costly, we need a TTL(Time-To-Live) management mechanism to prevent the +dataset from growing infinitely. +This proposal introduces Partition TTL Management policies to hudi, people can config the policies by table config +directly or by call commands. With proper configs set, Hudi can find out which partitions are outdated and delete them. + ## Background -TTL management mechanism is an important feature for databases. Hudi already provides a delete_partition interface to delete outdated partitions. However, users still need to detect which partitions are outdated and call `delete_partition` manually, which means that users need to define and implement some kind of TTL policies and maintain proper statistics to find expired partitions by themself. As the scale of installations grew, it's more important to implement a user-friendly TTL management mechanism for hudi. + +TTL management mechanism is an important feature for databases. Hudi already provides a delete_partition interface to +delete outdated partitions. However, users still need to detect which partitions are outdated and +call `delete_partition` manually, which means that users need to define and implement some kind of TTL policies and +maintain proper statistics to find expired partitions by themselves. As the scale of installations grew, it's more +important to implement a user-friendly TTL management mechanism for hudi. + ## Implementation + There are 3 components to implement Partition TTL Management - TTL policy definition & storage -- Partition statistics for TTL management -- Appling policies +- Gathering proper partition statistics for TTL management +- Executing TTL management + ### TTL Policy Definition -We have three main considerations when designing TTL policy: -1. User hopes to manage partition TTL not only by expired time but also by sub-partitions count and sub-partitions size. So we need to support the following three different TTL policy types. - 1. **KEEP_BY_TIME**. Partitions will expire N days after their last modified time. - 2. **KEEP_BY_COUNT**. Keep N sub-partitions for a high-level partition. When sub partition count exceeds, delete the partitions with smaller partition values until the sub-partition count meets the policy configuration. - 3. **KEEP_BY_SIZE**. Similar to KEEP_BY_COUNT, but to ensure that the sum of the data size of all sub-partitions does not exceed the policy configuration. -2. User need to set different policies for different partitions. For example, the hudi table is partitioned by two fields (user_id, ts). For partition(user_id='1'), we set the policy to keep 100G data for all sub-partitions, and for partition(user_id='2') we set the policy that all sub-partitions will expire 10 days after their last modified time. -3. It's possible that there are a lot of high-level partitions in the user's table, and they don't want to set TTL policies for all the high-level partitions. So we need to provide a default policy mechanism so that users can set a default policy for all high-level partitions and add some explicit policies for some of them if needed. Explicit policies will override the default policy. +We have four main considerations when designing TTL policy: + +1. Usually user hopes to manage partition TTL by expired time, so we support the policy `KEEP_BY_TIME` that partitions + will expire N days after their last modified time. Note that the last modified time here means any inserts/updates to + the partition. Also, we will make it easy to extend new policies so that users can implement their own polices to + decide which partitions are expired. +2. User need to set different policies for different partitions. For example, the hudi table is partitioned + by `user_id`. For partition(user_id='1'), we set the policy that this partition will expire 7 days after its last + modified time, while for + partition(user_id='2') we can set the policy that it will expire 30 days after its last modified time. +3. It's possible that users have multi-fields partitioning, and they don't want set policy for all partition. So we need + to support partition regex when defining + partition TTL policies. For example, for a hudi table partitioned by (user_id, ts), we can first set a default + policy(which policy value contains `*`) + that for all partitions matched (user_id=*/) will expire in 7 days, and explicitly set a policy that for partition + matched user_id=3 will expire in 30 days. Explicit policy will override default polices, which means default policies + only takes effects on partitions that do not match any explicit ttl policies. +4. We may need to support record level ttl policy in the future. So partition TTL policy should be an implementation of + HoodieTTLPolicy. + +So we have the TTL policy definition like this (may change when implementing): -So here we have the TTL policy definition: ```java -public class HoodiePartitionTTLPolicy { +public class HoodiePartitionTTLPolicy implements HoodieTTLPolicy { public enum TTLPolicy { - KEEP_BY_TIME, KEEP_BY_SIZE, KEEP_BY_COUNT + // We supports keep by last modified time at the first version + KEEP_BY_TIME } - // Partition spec for which the policy takes effect + // Partition spec for which the policy takes effect, could be a regex or a static partition value private String partitionSpec; private TTLPolicy policy; @@ -43,18 +75,15 @@ public class HoodiePartitionTTLPolicy { } ``` -### User Interface for TTL policy -Users can config partition TTL management policies through SparkSQL Call Command and through table config directly. Assume that the user has a hudi table partitioned by two fields(user_id, ts), he can config partition TTL policies as follows. +### User Interface for TTL policy Definition -```sql --- Set default policy for all user_id, which keeps the data for 30 days. -call add_ttl_policy(table => 'test', partitionSpec => 'user_id=*/', policy => 'KEEP_BY_TIME', policyValue => '30'); - ---For partition user_id=1/, keep 10 sub partitions. -call add_ttl_policy(table => 'test', partitionSpec => 'user_id=1/', policy => 'KEEP_BY_COUNT', policyValue => '10'); +Users can config partition TTL management policies through SparkSQL Call Command and through table config directly. +Assume that the user has a hudi table partitioned by two fields(user_id, ts), he can config partition TTL policies as +follows. ---For partition user_id=2/, keep 100GB data in total -call add_ttl_policy(table => 'test', partitionSpec => 'user_id=2/', policy => 'KEEP_BY_SIZE', policyValue => '107374182400'); +```sparksql +-- Set policy for all user_id using partition regex, which keeps the data for 30 days. +call add_ttl_policy(table => 'test', partitionSpec => 'user_id=*/', policy => 'KEEP_BY_TIME', policyValue => '30'); --For partition user_id=3/, keep the data for 7 day. call add_ttl_policy(table => 'test', partitionSpec => 'user_id=3/', policy => 'KEEP_BY_TIME', policyValue => '7'); @@ -62,47 +91,69 @@ call add_ttl_policy(table => 'test', partitionSpec => 'user_id=3/', policy => 'K -- Show all the TTL policies including default and explicit policies call show_ttl_policies(table => 'test'); user_id=*/ KEEP_BY_TIME 30 -user_id=1/ KEEP_BY_COUNT 10 -user_id=2/ KEEP_BY_SIZE 107374182400 user_id=3/ KEEP_BY_TIME 7 ``` ### Storage for TTL policy -The partition TTL policies will be stored in `hoodie.properties`since it is part of table metadata. The policy configs in `hoodie.properties`are defined as follows. Explicit policies are defined using a JSON array while default policy is defined using separate configs. -```properties -# Default TTL policy definition -hoodie.partition.ttl.management.default.policy=KEEP_BY_TIME -hoodie.partition.ttl.management.default.fields=user_id -hoodie.partition.ttl.management.default.policy.value=30 +We need persistent partition ttt policies in hudi table config and users should interact with hudi only by +setting/getting ttl policies. Hudi then will takes care of doing the ttl management in an async table service. + +The partition TTL policies will be stored in `hoodie.properties` and they are defined as follows. Explicit policies are +defined using a JSON array, note we support regex here. -# Explicit TTL policy definition -hoodie.partition.ttl.policies=[{"partitionSpec"\:"user_id\=2/","policy"\:"KEEP_BY_SIZE","policyValue"\:107374182400},{"partitionSpec"\:"user_id\=1/","policy"\:"KEEP_BY_COUNT","policyValue"\:10},{"partitionSpec"\:"user_id\=3/","policy"\:"KEEP_BY_TIME","policyValue"\:7}] +```properties +# TTL policy definition +hoodie.partition.ttl.policies=[{"partitionSpec"\:"user_id\=*/","policy"\:"KEEP_BY_TIME","policyValue"\:7},{"partitionSpec"\:"user_id\=3/","policy"\:"KEEP_BY_TIME","policyValue"\:30}] ``` ### Partition Statistics -#### Partition Statistics Entity -We need need to maintain a partition stat for every partition in the hudi table. The stat will contain three fields: - -- RelativePath. Relative path to the base path, or we can call it PartitionPath. -- LastModifiedTime. Last instant time that modified the partition. We need this information to support the `KEEP_BY_TIME` policy. -- PartitionTotalFileSize. The sum of all valid data file sizes in the partition, to support the `KEEP_BY_SIZE`policy. We calculate only the latest file slices in the partition currently. -#### Gathering Partition Statistics -The simplest way to get partition statistics is that reading information from the metadata table. However, the write and compaction of the metadata table will severely affect the throughput and latency of data ingestion. So we design an asynchronous partition statistics as follows. - -- At the first time we gather the partitions statistics, we list all partitions in the hudi table and calculate `PartitionTotalFileSize`and `LastModifiedTime`for each partition. Then we store all the partition stats in persistent storage along with the instant time we do the stats. For example, we can directly store all partition stats in a JSON file whose file name contains the instant time. -- After initializing the partition statistics, we can list only affected partitions by reading timeline metadata and store the new partition statistics back to the storage with new instant time. -- Note that deleted partition will be deleted from partition statistics too. If a partition was deleted before and have no new data, we will remove it from partition statistics so it won't calculated as expired partition again. -### Appling Policies -Once we have defined the TTL policies and have proper partition statistics, it's easy to apply the policies and delete expired partitions. - -1. Gather partitions statistics. -2. Apply each TTL policy. For explicit policies, find sub-partitions matched `PartitionSpec`defined in the policy and check if there are expired partitions according to the policy type and size. For default policy, find partitions that do not match any explicit policy and check if they are expired. -3. For all expired partitions, call delete_partition to delete them, which will generate a replace commit and mark all files in those partitions as replaced. For pending clustering and compaction that affect the target partition to delete, [HUDI-5553](https://issues.apache.org/jira/browse/HUDI-5553) introduces a pre-check that will throw an exception, and further improvement could be discussed in [HUDI-5663](https://issues.apache.org/jira/browse/HUDI-5663). + +We need to gather proper partition statistics for different kinds of partition ttl policies. Specifically, for +KEEP_BY_TIME policy that we want to support first, we need to know the last modified time for every partition in hudi +table. + +To simplify the design, we decide to use the largest commit time of committed file groups in the partition as partition +last modified time. But for file groups generated by replace commit, it may not reveal the real insert/update time for +the file group. We can also introduce other mechanism to get last modified time of the partition, like add a new kind of +partition metadata or add a new field in metadata table. Open to ideas for this design choice. + +### Executing TTL management + +Once we have defined the TTL policies and have proper partition statistics, it's easy to apply the policies and delete +expired partitions. The process of TTL management is very similar to other table services in hudi, we will provide a new +method called `managePartitionTTL` in `BaseHoodieTableServiceClient` and we can invoke this method in async table +service, SparkSQL Call Command, cli, JAVA code etc. + +We will provide an Async Table Service as the default interface to the users that want to do partition TTL management +and we can add more interfaces in the future. + +User will config the async ttl management service as follows: + +```properties +hoodie.partition.ttl.management.async=true +``` + +The process of manage partition TTL is as follows: + +1. Gather partitions statistics, list all partitions and the largest commit time of committed + file groups of each partition as their last modified time. +2. Apply each TTL policy. For explicit policies, find sub-partitions matched `PartitionSpec` defined in the policy and + check if there are expired partitions according to the policy type and size. For default policy, find partitions that + do not match any explicit policy and check if they are expired. +3. For all expired partitions, call delete_partition to delete them, which will generate a replace commit and mark all + files in those partitions as replaced. For pending clustering and compaction that affect the target partition to + delete, [HUDI-5553](https://issues.apache.org/jira/browse/HUDI-5553) introduces a pre-check that will throw an + exception, and further improvement could be discussed + in [HUDI-5663](https://issues.apache.org/jira/browse/HUDI-5663). 4. Clean then will delete all replaced file groups. + ## Rollout/Adoption Plan + This will be updated once we decide on one of the approaches proposed above. + ## Test Plan + This will be updated once we decide on one of the approaches proposed above. From c0d09e1e1eb3ccee0eed2a73d427b52c32c5e50c Mon Sep 17 00:00:00 2001 From: stream2000 <18889897088@163.com> Date: Wed, 31 May 2023 15:53:43 +0800 Subject: [PATCH 3/9] address comments and add more details --- rfc/rfc-65/rfc-65.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md index 8494e5d2ee6f..b1a461fc8952 100644 --- a/rfc/rfc-65/rfc-65.md +++ b/rfc/rfc-65/rfc-65.md @@ -128,10 +128,12 @@ service, SparkSQL Call Command, cli, JAVA code etc. We will provide an Async Table Service as the default interface to the users that want to do partition TTL management and we can add more interfaces in the future. -User will config the async ttl management service as follows: +User will config the async ttl management service as follows. TTL management will be trigger N commits after last +TTL replace commit. ```properties hoodie.partition.ttl.management.async=true +hoodie.partition.ttl.management.min.commits=10 ``` The process of manage partition TTL is as follows: From 7a6f928b7df25cb63451048b84d38adb9ecfb526 Mon Sep 17 00:00:00 2001 From: stream2000 <18889897088@163.com> Date: Fri, 15 Sep 2023 16:01:46 +0800 Subject: [PATCH 4/9] Update rfc --- rfc/rfc-65/rfc-65.md | 197 +++++++++++++++++++++---------------------- 1 file changed, 97 insertions(+), 100 deletions(-) diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md index b1a461fc8952..56dc37e957a3 100644 --- a/rfc/rfc-65/rfc-65.md +++ b/rfc/rfc-65/rfc-65.md @@ -16,148 +16,145 @@ JIRA: [HUDI-5823](https://issues.apache.org/jira/browse/HUDI-5823) In some classic hudi use cases, users partition hudi data by time and are only interested in data from a recent period of time. The outdated data is useless and costly, we need a TTL(Time-To-Live) management mechanism to prevent the dataset from growing infinitely. -This proposal introduces Partition TTL Management policies to hudi, people can config the policies by table config +This proposal introduces Partition TTL Management strategies to hudi, people can config the strategies by table config directly or by call commands. With proper configs set, Hudi can find out which partitions are outdated and delete them. + +This proposal introduces Partition TTL Management service to hudi. TTL management is like other table services such as Clean/Compaction/Clustering. +The user can config their ttl strategies through write configs and Hudi will help users find expired partitions and delete them automatically. + ## Background -TTL management mechanism is an important feature for databases. Hudi already provides a delete_partition interface to +TTL management mechanism is an important feature for databases. Hudi already provides a `delete_partition` interface to delete outdated partitions. However, users still need to detect which partitions are outdated and -call `delete_partition` manually, which means that users need to define and implement some kind of TTL policies and -maintain proper statistics to find expired partitions by themselves. As the scale of installations grew, it's more -important to implement a user-friendly TTL management mechanism for hudi. +call `delete_partition` manually, which means that users need to define and implement some kind of TTL strategies, find expired partitions and call call `delete_partition` by themselves. As the scale of installations grew, it is becoming increasingly important to implement a user-friendly TTL management mechanism for hudi. ## Implementation -There are 3 components to implement Partition TTL Management +Our main goals are as follows: -- TTL policy definition & storage -- Gathering proper partition statistics for TTL management -- Executing TTL management +* Providing an extensible framework for partition TTL management. +* Implement a simple KEEP_BY_TIME strategy, which can be executed through independent Spark job, synchronous or asynchronous table services. -### TTL Policy Definition +### Strategy Definition -We have four main considerations when designing TTL policy: +The TTL strategies is similar to existing table service strategies. We can define TTL strategies like defining a clustering/clean/compaction strategy: -1. Usually user hopes to manage partition TTL by expired time, so we support the policy `KEEP_BY_TIME` that partitions - will expire N days after their last modified time. Note that the last modified time here means any inserts/updates to - the partition. Also, we will make it easy to extend new policies so that users can implement their own polices to - decide which partitions are expired. -2. User need to set different policies for different partitions. For example, the hudi table is partitioned - by `user_id`. For partition(user_id='1'), we set the policy that this partition will expire 7 days after its last - modified time, while for - partition(user_id='2') we can set the policy that it will expire 30 days after its last modified time. -3. It's possible that users have multi-fields partitioning, and they don't want set policy for all partition. So we need - to support partition regex when defining - partition TTL policies. For example, for a hudi table partitioned by (user_id, ts), we can first set a default - policy(which policy value contains `*`) - that for all partitions matched (user_id=*/) will expire in 7 days, and explicitly set a policy that for partition - matched user_id=3 will expire in 30 days. Explicit policy will override default polices, which means default policies - only takes effects on partitions that do not match any explicit ttl policies. -4. We may need to support record level ttl policy in the future. So partition TTL policy should be an implementation of - HoodieTTLPolicy. +```properties +hoodie.partition.ttl.management.strategy=KEEP_BY_TIME +hoodie.partition.ttl.management.strategy.class=org.apache.hudi.table.action.ttl.strategy.KeepByTimePartitionTTLManagementStrategy +hoodie.partition.ttl.days.retain=10 +``` -So we have the TTL policy definition like this (may change when implementing): +The config `hoodie.partition.ttl.management.strategy.class` is to provide a strategy class (subclass of `PartitionTTLManagementStrategy`) to get expired partition paths to delete. And `hoodie.partition.ttl.days.retain` is the strategy value used by `KeepByTimePartitionTTLManagementStrategy` which means that we will expire partitions that haven't been modified for this strategy value set. We will cover the `KeepByTimeTTLManagementStrategy` strategy in detail in the next section. + +The core definition of `PartitionTTLManagementStrategy` looks like this: ```java -public class HoodiePartitionTTLPolicy implements HoodieTTLPolicy { - public enum TTLPolicy { - // We supports keep by last modified time at the first version - KEEP_BY_TIME - } +/** + * Strategy for partition-level TTL management. + */ +public abstract class PartitionTTLManagementStrategy { + /** + * Get expired partition paths for a specific partition ttl management strategy. + * + * @return Expired partition paths. + */ + public abstract List getExpiredPartitionPaths(); +} +``` - // Partition spec for which the policy takes effect, could be a regex or a static partition value - private String partitionSpec; +Users can provide their own implementation of `PartitionTTLManagementStrategy` and hudi will help delete the expired partitions. - private TTLPolicy policy; +### KeepByTimeTTLManagementStrategy - private long policyValue; -} -``` +We will provide a strategy call `KeepByTimePartitionTTLManagementStrategy` in the first version of partition TTL management implementation. -### User Interface for TTL policy Definition +The `KeepByTimePartitionTTLManagementStrategy` will calculate the 'lastModifiedTime' for each input partitions. If duration between now and 'lastModifiedTime' for the partition is larger than what `hoodie.partition.ttl.days.retain` configured, `KeepByTimePartitionTTLManagementStrategy` will return this partition as an expired partition. -Users can config partition TTL management policies through SparkSQL Call Command and through table config directly. -Assume that the user has a hudi table partitioned by two fields(user_id, ts), he can config partition TTL policies as -follows. +we will to use the largest commit time of committed file groups in the partition as the partition's +'lastModifiedTime'. So any write(including normal DMLs, clustering etc.) with larger instant time will change the `lastModifiedTime`. -```sparksql --- Set policy for all user_id using partition regex, which keeps the data for 30 days. -call add_ttl_policy(table => 'test', partitionSpec => 'user_id=*/', policy => 'KEEP_BY_TIME', policyValue => '30'); +For file groups generated by replace commit, it may not reveal the real insert/update time for +the file group. However, we can assume that we won't do clustering for a partition without new writes for a long time when using the strategy. And in the future, we may introduce a more accurate mechanism to get `lastModifiedTime` of a partition, for example using metadata table. ---For partition user_id=3/, keep the data for 7 day. -call add_ttl_policy(table => 'test', partitionSpec => 'user_id=3/', policy => 'KEEP_BY_TIME', policyValue => '7'); --- Show all the TTL policies including default and explicit policies -call show_ttl_policies(table => 'test'); -user_id=*/ KEEP_BY_TIME 30 -user_id=3/ KEEP_BY_TIME 7 -``` +### Apply different strategies for different partitions -### Storage for TTL policy +For some specific users, they may want to apply different strategies for different partitions. For example, they may have multi partition fileds(productId, day). For partitions under `product=1` they want to keep for 30 days while for partitions under `product=2` they want to keep for 7 days only. -We need persistent partition ttt policies in hudi table config and users should interact with hudi only by -setting/getting ttl policies. Hudi then will takes care of doing the ttl management in an async table service. +For the first version of TTL management, we do not plan to implement a complicated strategy (For example, use an array to store strategies, introduce partition regex etc.). Instead, we add a new abstract method `getPartitionPathsForTTLManagement` in `PartitionTTLManagementStrategy` and provides a new config `hoodie.partition.ttl.management.partition.selected`. -The partition TTL policies will be stored in `hoodie.properties` and they are defined as follows. Explicit policies are -defined using a JSON array, note we support regex here. +If `hoodie.partition.ttl.management.partition.selected` is set, `getPartitionPathsForTTLManagement` will return partitions provided by this config. If not, `getPartitionPathsForTTLManagement` will return all partitions in the hudi table. -```properties -# TTL policy definition -hoodie.partition.ttl.policies=[{"partitionSpec"\:"user_id\=*/","policy"\:"KEEP_BY_TIME","policyValue"\:7},{"partitionSpec"\:"user_id\=3/","policy"\:"KEEP_BY_TIME","policyValue"\:30}] -``` +TTL management strategies will only be applied for partitions return by `getPartitionPathsForTTLManagement`. -### Partition Statistics +Thus, if users want to apply different strategies for different partitions, they can do the TTL management multiple times, selecting different partitions and apply corresponding strategies. And we may provide a batch interface in the future to simplify this. -We need to gather proper partition statistics for different kinds of partition ttl policies. Specifically, for -KEEP_BY_TIME policy that we want to support first, we need to know the last modified time for every partition in hudi -table. +The `getPartitionPathsForTTLManagement` method will look like this: -To simplify the design, we decide to use the largest commit time of committed file groups in the partition as partition -last modified time. But for file groups generated by replace commit, it may not reveal the real insert/update time for -the file group. We can also introduce other mechanism to get last modified time of the partition, like add a new kind of -partition metadata or add a new field in metadata table. Open to ideas for this design choice. +```java +/** + * Strategy for partition-level TTL management. + */ +public abstract class PartitionTTLManagementStrategy { + /** + * Scan and list all partitions for partition TTL management. + * + * @return all partitions paths for the dataset. + */ + protected List getPartitionPathsForTTLManagement() { + if (StringUtils.isNullOrEmpty(config.getTTLManagementPartitionSelected())) { + return getMatchedPartitions(); + } else { + // Return All partition paths + return FSUtils.getAllPartitionPaths(hoodieTable.getContext(), config.getMetadataConfig(), config.getBasePath()); + } + } +} +``` ### Executing TTL management -Once we have defined the TTL policies and have proper partition statistics, it's easy to apply the policies and delete -expired partitions. The process of TTL management is very similar to other table services in hudi, we will provide a new -method called `managePartitionTTL` in `BaseHoodieTableServiceClient` and we can invoke this method in async table -service, SparkSQL Call Command, cli, JAVA code etc. +Once we already have a proper `PartitionTTLManagementStrategy` implementation, it's easy to execute the ttl management. -We will provide an Async Table Service as the default interface to the users that want to do partition TTL management -and we can add more interfaces in the future. +```java +public class SparkTTLManagementActionExecutor extends BaseSparkCommitActionExecutor { + @Override + public HoodieWriteMetadata> execute() { + // Construct PartitionTTLManagementStrategy + PartitionTTLManagementStrategy strategy = (PartitionTTLManagementStrategy) ReflectionUtils.loadClass( + PartitionTTLManagementStrategy.checkAndGetPartitionTTLManagementStrategy(config), + new Class[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config); + + // Get expired partition paths + List expiredPartitions = strategy.getExpiredPartitionPaths(); + + // Delete them reusing SparkDeletePartitionCommitActionExecutor + return new SparkDeletePartitionCommitActionExecutor<>(context, config, table, instantTime, expiredPartitions).execute(); + } +} +``` -User will config the async ttl management service as follows. TTL management will be trigger N commits after last -TTL replace commit. +We will add a new method `managePartitionTTL` in `HoodieTable` and `HoodieSparkCopyOnWriteTable` can implement it like this: -```properties -hoodie.partition.ttl.management.async=true -hoodie.partition.ttl.management.min.commits=10 +```java +@Override +public HoodieWriteMetadata> managePartitionTTL(HoodieEngineContext context, String instantTime) { + return new SparkTTLManagementActionExecutor<>(context, config, this, instantTime).execute(); +} ``` -The process of manage partition TTL is as follows: +We can call `hoodieTable.managePartitionTTL` in independent flink/spark job, in async/sync inline table services like clustering/compaction/clean etc. -1. Gather partitions statistics, list all partitions and the largest commit time of committed - file groups of each partition as their last modified time. -2. Apply each TTL policy. For explicit policies, find sub-partitions matched `PartitionSpec` defined in the policy and - check if there are expired partitions according to the policy type and size. For default policy, find partitions that - do not match any explicit policy and check if they are expired. -3. For all expired partitions, call delete_partition to delete them, which will generate a replace commit and mark all - files in those partitions as replaced. For pending clustering and compaction that affect the target partition to - delete, [HUDI-5553](https://issues.apache.org/jira/browse/HUDI-5553) introduces a pre-check that will throw an - exception, and further improvement could be discussed - in [HUDI-5663](https://issues.apache.org/jira/browse/HUDI-5663). -4. Clean then will delete all replaced file groups. +### Future plan ## Rollout/Adoption Plan -This will be updated once we decide on one of the approaches proposed above. - -## Test Plan - -This will be updated once we decide on one of the approaches proposed above. - +Hoodie Partition TTL Management V1 will support a simple KEEP_BY_TIME strategy at first, and others can implement their own `PartitionTTLManagementStrategy`. +Add this feature won't affect existing functions. +## Test Plan +Will add UTs and ITs to test this. From d27023e41b83544d1f5c7366def8cf46f2b63c5a Mon Sep 17 00:00:00 2001 From: stream2000 <18889897088@163.com> Date: Fri, 15 Sep 2023 16:53:47 +0800 Subject: [PATCH 5/9] Add sections about how to run partition TTL management --- rfc/rfc-65/rfc-65.md | 53 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md index 56dc37e957a3..8f9fe6ad6ee3 100644 --- a/rfc/rfc-65/rfc-65.md +++ b/rfc/rfc-65/rfc-65.md @@ -101,7 +101,7 @@ public abstract class PartitionTTLManagementStrategy { /** * Scan and list all partitions for partition TTL management. * - * @return all partitions paths for the dataset. + * @return Partitions to apply TTL management strategy */ protected List getPartitionPathsForTTLManagement() { if (StringUtils.isNullOrEmpty(config.getTTLManagementPartitionSelected())) { @@ -147,8 +147,59 @@ public HoodieWriteMetadata> managePartitionTTL(HoodieEng We can call `hoodieTable.managePartitionTTL` in independent flink/spark job, in async/sync inline table services like clustering/compaction/clean etc. + +### User interface for Partition TTL Management + +We can do partition TTL management inline with streaming ingestion job or do it with a independent batch job, for both spark and flink engine. + +#### Run inline with Streaming Ingestion + +Since we can run clustering inline with streaming ingestion job through the following config: + +```properties +hoodie.clustering.async.enabled=true +hoodie.clustering.async.max.commits=5 +``` + +We can do similar thing for partition TTL management. The config for async ttl management are: + +| Config key | Remarks | Default | +|-----------------------------------------|--------------------------------------------------------------------------------------------------------------------|---------| +| hoodie.ttl.management.async.enabled | Enable running of TTL management service, asynchronously as writes happen on the table. | False | +| hoodie.ttl.management.async.max.commits | Control frequency of async TTL management by specifying after how many commits TTL management should be triggered. | 4 | + + +We can easily implement async ttl management for both spark and flink engine since we only need to call `hoodieTable.managePartitionTTL`. And we can support synchronized ttl management if we want. + +#### Run by Independent Job + +Deleting a large number of partitions is a heavy operation so we may want to run TTL management through a independent job. We will provide a SparkSQL Call Command to run TTL management and it may look like this: + +```sql +call managePartitionTTL(table => 'hudi_table', strategy => 'KEEP_BY_TIME', daysRetain => '10', predicate => 'productid = 1'); +``` + +The params are as follows: + +| Param name | Remarks | Default | +|------------|-------------------------------------------------------------------------------------------------------------------|--------------| +| table | The hoodie table to run partition TTL management | empty string | +| basePath | The hoodie table path to run partition TTL management | empty string | +| strategy | The partition TTL management strategy, corresponding to a implementation of `PartitionTTLManagementStrategy` | KEEP_BY_TIME | +| predicate | Partition predicate for TTL management, will only apply ttl strategy on the partitions selected by this predicate | empty string | + + +And we can support run TTL with a spark jar like running clustering by`HoodieClusteringJob`, and run TTL with a flink job like `HoodieFlinkClusteringJob` in the future. + ### Future plan +We can do a lot of things about TTL management in the future: + +* Support record level TTL management +* Move the partitions to be cleaned up to cold/cheaper storage in objects stores instead of delete them forever +* Stash the partitions to be cleaned up in .stashedForDeletion folder (at .hoodie level) and introduce some recover mechanism, for data security. +* ... + ## Rollout/Adoption Plan Hoodie Partition TTL Management V1 will support a simple KEEP_BY_TIME strategy at first, and others can implement their own `PartitionTTLManagementStrategy`. From 17d36c49932b86bebc76e5f17332b2893622d169 Mon Sep 17 00:00:00 2001 From: stream2000 <18889897088@163.com> Date: Fri, 15 Sep 2023 17:06:24 +0800 Subject: [PATCH 6/9] udpate rfc --- rfc/rfc-65/rfc-65.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md index 8f9fe6ad6ee3..7b3106ab8ba3 100644 --- a/rfc/rfc-65/rfc-65.md +++ b/rfc/rfc-65/rfc-65.md @@ -70,14 +70,12 @@ Users can provide their own implementation of `PartitionTTLManagementStrategy` a We will provide a strategy call `KeepByTimePartitionTTLManagementStrategy` in the first version of partition TTL management implementation. -The `KeepByTimePartitionTTLManagementStrategy` will calculate the 'lastModifiedTime' for each input partitions. If duration between now and 'lastModifiedTime' for the partition is larger than what `hoodie.partition.ttl.days.retain` configured, `KeepByTimePartitionTTLManagementStrategy` will return this partition as an expired partition. +The `KeepByTimePartitionTTLManagementStrategy` will calculate the `lastModifiedTime` for each input partitions. If duration between now and 'lastModifiedTime' for the partition is larger than what `hoodie.partition.ttl.days.retain` configured, `KeepByTimePartitionTTLManagementStrategy` will mark this partition as an expired partition. We use day as the unit of expired time since it is very common-used for datalakes. Open to ideas for this. we will to use the largest commit time of committed file groups in the partition as the partition's -'lastModifiedTime'. So any write(including normal DMLs, clustering etc.) with larger instant time will change the `lastModifiedTime`. - -For file groups generated by replace commit, it may not reveal the real insert/update time for -the file group. However, we can assume that we won't do clustering for a partition without new writes for a long time when using the strategy. And in the future, we may introduce a more accurate mechanism to get `lastModifiedTime` of a partition, for example using metadata table. +`lastModifiedTime`. So any write (including normal DMLs, clustering etc.) with larger instant time will change the partition's `lastModifiedTime`. +For file groups generated by replace commit, it may not reveal the real insert/update time for the file group. However, we can assume that we won't do clustering for a partition without new writes for a long time when using the strategy. And in the future, we may introduce a more accurate mechanism to get `lastModifiedTime` of a partition, for example using metadata table. ### Apply different strategies for different partitions @@ -189,7 +187,7 @@ The params are as follows: | predicate | Partition predicate for TTL management, will only apply ttl strategy on the partitions selected by this predicate | empty string | -And we can support run TTL with a spark jar like running clustering by`HoodieClusteringJob`, and run TTL with a flink job like `HoodieFlinkClusteringJob` in the future. +Besides SparkSQL call commands, we can support run TTL management with a spark jar like running clustering by `HoodieClusteringJob` and run TTL with a flink job like `HoodieFlinkClusteringJob` in the future. ### Future plan From 409fbddc3034fd8e38e0b2c9e3e27e31568a96fe Mon Sep 17 00:00:00 2001 From: stream2000 <18889897088@163.com> Date: Thu, 30 Nov 2023 15:19:34 +0800 Subject: [PATCH 7/9] add new `lastModifiedTime` calculation method for 1.0.0 and later hudi version --- rfc/rfc-65/rfc-65.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md index 7b3106ab8ba3..094dfd69bf25 100644 --- a/rfc/rfc-65/rfc-65.md +++ b/rfc/rfc-65/rfc-65.md @@ -19,7 +19,6 @@ dataset from growing infinitely. This proposal introduces Partition TTL Management strategies to hudi, people can config the strategies by table config directly or by call commands. With proper configs set, Hudi can find out which partitions are outdated and delete them. - This proposal introduces Partition TTL Management service to hudi. TTL management is like other table services such as Clean/Compaction/Clustering. The user can config their ttl strategies through write configs and Hudi will help users find expired partitions and delete them automatically. @@ -77,6 +76,8 @@ we will to use the largest commit time of committed file groups in the partition For file groups generated by replace commit, it may not reveal the real insert/update time for the file group. However, we can assume that we won't do clustering for a partition without new writes for a long time when using the strategy. And in the future, we may introduce a more accurate mechanism to get `lastModifiedTime` of a partition, for example using metadata table. +For 1.0.0 and later hudi version which supports efficient completion time queries on the timeline(#9565), we can get partition's `lastModifiedTime` by scanning the timeline and get the last write commit for the partition. Also for efficiency, we can store the partitions' last modified time and current completion time in the replace commit metadata. The next time we need to calculate the partitions' last modified time, we can build incrementally from the replace commit metadata of the last ttl management. + ### Apply different strategies for different partitions For some specific users, they may want to apply different strategies for different partitions. For example, they may have multi partition fileds(productId, day). For partitions under `product=1` they want to keep for 30 days while for partitions under `product=2` they want to keep for 7 days only. From 4ae9591786bdddeae2cfc03b677b322fbede3b3c Mon Sep 17 00:00:00 2001 From: stream2000 <18889897088@163.com> Date: Tue, 5 Dec 2023 13:41:34 +0800 Subject: [PATCH 8/9] address comments --- rfc/rfc-65/rfc-65.md | 73 ++++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md index 094dfd69bf25..f983eb605dbe 100644 --- a/rfc/rfc-65/rfc-65.md +++ b/rfc/rfc-65/rfc-65.md @@ -40,20 +40,20 @@ Our main goals are as follows: The TTL strategies is similar to existing table service strategies. We can define TTL strategies like defining a clustering/clean/compaction strategy: ```properties -hoodie.partition.ttl.management.strategy=KEEP_BY_TIME -hoodie.partition.ttl.management.strategy.class=org.apache.hudi.table.action.ttl.strategy.KeepByTimePartitionTTLManagementStrategy +hoodie.partition.ttl.strategy=KEEP_BY_TIME +hoodie.partition.ttl.strategy.class=org.apache.hudi.table.action.ttl.strategy.KeepByTimePartitionTTLStrategy hoodie.partition.ttl.days.retain=10 ``` -The config `hoodie.partition.ttl.management.strategy.class` is to provide a strategy class (subclass of `PartitionTTLManagementStrategy`) to get expired partition paths to delete. And `hoodie.partition.ttl.days.retain` is the strategy value used by `KeepByTimePartitionTTLManagementStrategy` which means that we will expire partitions that haven't been modified for this strategy value set. We will cover the `KeepByTimeTTLManagementStrategy` strategy in detail in the next section. +The config `hoodie.partition.ttl.strategy.class` is to provide a strategy class (subclass of `PartitionTTLStrategy`) to get expired partition paths to delete. And `hoodie.partition.ttl.days.retain` is the strategy value used by `KeepByTimePartitionTTLStrategy` which means that we will expire partitions that haven't been modified for this strategy value set. We will cover the `KeepByTimeTTLStrategy` strategy in detail in the next section. -The core definition of `PartitionTTLManagementStrategy` looks like this: +The core definition of `PartitionTTLManagemenStrategy` looks like this: ```java /** * Strategy for partition-level TTL management. */ -public abstract class PartitionTTLManagementStrategy { +public abstract class PartitionTTLStrategy { /** * Get expired partition paths for a specific partition ttl management strategy. * @@ -63,47 +63,47 @@ public abstract class PartitionTTLManagementStrategy { } ``` -Users can provide their own implementation of `PartitionTTLManagementStrategy` and hudi will help delete the expired partitions. +Users can provide their own implementation of `PartitionTTLStrategy` and hudi will help delete the expired partitions. ### KeepByTimeTTLManagementStrategy -We will provide a strategy call `KeepByTimePartitionTTLManagementStrategy` in the first version of partition TTL management implementation. +We will provide a strategy call `KeepByTimePartitionTTLStrategy` in the first version of partition TTL management implementation. -The `KeepByTimePartitionTTLManagementStrategy` will calculate the `lastModifiedTime` for each input partitions. If duration between now and 'lastModifiedTime' for the partition is larger than what `hoodie.partition.ttl.days.retain` configured, `KeepByTimePartitionTTLManagementStrategy` will mark this partition as an expired partition. We use day as the unit of expired time since it is very common-used for datalakes. Open to ideas for this. +The `KeepByTimePartitionTTLStrategy` will calculate the `lastCommitTime` for each input partitions. If duration between now and 'lastCommitTime' for the partition is larger than what `hoodie.partition.ttl.days.retain` configured, `KeepByTimePartitionTTLStrategy` will mark this partition as an expired partition. We use day as the unit of expired time since it is very common-used for datalakes. Open to ideas for this. we will to use the largest commit time of committed file groups in the partition as the partition's -`lastModifiedTime`. So any write (including normal DMLs, clustering etc.) with larger instant time will change the partition's `lastModifiedTime`. +`lastCommitTime`. So any write (including normal DMLs, clustering etc.) with larger instant time will change the partition's `lastCommitTime`. -For file groups generated by replace commit, it may not reveal the real insert/update time for the file group. However, we can assume that we won't do clustering for a partition without new writes for a long time when using the strategy. And in the future, we may introduce a more accurate mechanism to get `lastModifiedTime` of a partition, for example using metadata table. +For file groups generated by replace commit, it may not reveal the real insert/update time for the file group. However, we can assume that we won't do clustering for a partition without new writes for a long time when using the strategy. And in the future, we may introduce a more accurate mechanism to get `lastCommitTime` of a partition, for example using metadata table. -For 1.0.0 and later hudi version which supports efficient completion time queries on the timeline(#9565), we can get partition's `lastModifiedTime` by scanning the timeline and get the last write commit for the partition. Also for efficiency, we can store the partitions' last modified time and current completion time in the replace commit metadata. The next time we need to calculate the partitions' last modified time, we can build incrementally from the replace commit metadata of the last ttl management. +For 1.0.0 and later hudi version which supports efficient completion time queries on the timeline(#9565), we can get partition's `lastCommitTime` by scanning the timeline and get the last write commit for the partition. Also for efficiency, we can store the partitions' last modified time and current completion time in the replace commit metadata. The next time we need to calculate the partitions' last modified time, we can build incrementally from the replace commit metadata of the last ttl management. ### Apply different strategies for different partitions For some specific users, they may want to apply different strategies for different partitions. For example, they may have multi partition fileds(productId, day). For partitions under `product=1` they want to keep for 30 days while for partitions under `product=2` they want to keep for 7 days only. -For the first version of TTL management, we do not plan to implement a complicated strategy (For example, use an array to store strategies, introduce partition regex etc.). Instead, we add a new abstract method `getPartitionPathsForTTLManagement` in `PartitionTTLManagementStrategy` and provides a new config `hoodie.partition.ttl.management.partition.selected`. +For the first version of TTL management, we do not plan to implement a complicated strategy (For example, use an array to store strategies, introduce partition regex etc.). Instead, we add a new abstract method `getPartitionPathsForTTL` in `PartitionTTLStrategy` and provides a new config `hoodie.partition.ttl.partition.selected`. -If `hoodie.partition.ttl.management.partition.selected` is set, `getPartitionPathsForTTLManagement` will return partitions provided by this config. If not, `getPartitionPathsForTTLManagement` will return all partitions in the hudi table. +If `hoodie.partition.ttl.partition.selected` is set, `getPartitionPathsForTTL` will return partitions provided by this config. If not, `getPartitionPathsForTTL` will return all partitions in the hudi table. -TTL management strategies will only be applied for partitions return by `getPartitionPathsForTTLManagement`. +TTL strategies will only be applied for partitions return by `getPartitionPathsForTTL`. Thus, if users want to apply different strategies for different partitions, they can do the TTL management multiple times, selecting different partitions and apply corresponding strategies. And we may provide a batch interface in the future to simplify this. -The `getPartitionPathsForTTLManagement` method will look like this: +The `getPartitionPathsForTTL` method will look like this: ```java /** * Strategy for partition-level TTL management. */ -public abstract class PartitionTTLManagementStrategy { +public abstract class PartitionTTLManagemenStrategy { /** * Scan and list all partitions for partition TTL management. * - * @return Partitions to apply TTL management strategy + * @return Partitions to apply TTL strategy */ - protected List getPartitionPathsForTTLManagement() { - if (StringUtils.isNullOrEmpty(config.getTTLManagementPartitionSelected())) { + protected List getPartitionPathsForTTL() { + if (StringUtils.isNullOrEmpty(config.getTTLPartitionSelected())) { return getMatchedPartitions(); } else { // Return All partition paths @@ -113,17 +113,17 @@ public abstract class PartitionTTLManagementStrategy { } ``` -### Executing TTL management +### Executing TTL -Once we already have a proper `PartitionTTLManagementStrategy` implementation, it's easy to execute the ttl management. +Once we already have a proper `PartitionTTLStrategy` implementation, it's easy to execute the ttl management. ```java public class SparkTTLManagementActionExecutor extends BaseSparkCommitActionExecutor { @Override public HoodieWriteMetadata> execute() { - // Construct PartitionTTLManagementStrategy - PartitionTTLManagementStrategy strategy = (PartitionTTLManagementStrategy) ReflectionUtils.loadClass( - PartitionTTLManagementStrategy.checkAndGetPartitionTTLManagementStrategy(config), + // Construct PartitionTTLstrategy + PartitionTTLManagementStrategy strategy = (PartitionTTLStrategy) ReflectionUtils.loadClass( + PartitionTTLStrategy.checkAndGetPartitionTTLStrategy(config), new Class[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config); // Get expired partition paths @@ -140,7 +140,7 @@ We will add a new method `managePartitionTTL` in `HoodieTable` and `HoodieSparkC ```java @Override public HoodieWriteMetadata> managePartitionTTL(HoodieEngineContext context, String instantTime) { - return new SparkTTLManagementActionExecutor<>(context, config, this, instantTime).execute(); + return new SparkTTActionExecutor<>(context, config, this, instantTime).execute(); } ``` @@ -162,10 +162,10 @@ hoodie.clustering.async.max.commits=5 We can do similar thing for partition TTL management. The config for async ttl management are: -| Config key | Remarks | Default | -|-----------------------------------------|--------------------------------------------------------------------------------------------------------------------|---------| -| hoodie.ttl.management.async.enabled | Enable running of TTL management service, asynchronously as writes happen on the table. | False | -| hoodie.ttl.management.async.max.commits | Control frequency of async TTL management by specifying after how many commits TTL management should be triggered. | 4 | +| Config key | Remarks | Default | +|------------------------------|--------------------------------------------------------------------------------------------------------------------|---------| +| hoodie.ttl.async.enabled | Enable running of TTL management service, asynchronously as writes happen on the table. | False | +| hoodie.ttl.async.max.commits | Control frequency of async TTL management by specifying after how many commits TTL management should be triggered. | 4 | We can easily implement async ttl management for both spark and flink engine since we only need to call `hoodieTable.managePartitionTTL`. And we can support synchronized ttl management if we want. @@ -180,12 +180,12 @@ call managePartitionTTL(table => 'hudi_table', strategy => 'KEEP_BY_TIME', daysR The params are as follows: -| Param name | Remarks | Default | -|------------|-------------------------------------------------------------------------------------------------------------------|--------------| -| table | The hoodie table to run partition TTL management | empty string | -| basePath | The hoodie table path to run partition TTL management | empty string | -| strategy | The partition TTL management strategy, corresponding to a implementation of `PartitionTTLManagementStrategy` | KEEP_BY_TIME | -| predicate | Partition predicate for TTL management, will only apply ttl strategy on the partitions selected by this predicate | empty string | +| Param name | Remarks | Default | +|------------|--------------------------------------------------------------------------------------------------------|--------------| +| table | The hoodie table to run partition TTL | empty string | +| basePath | The hoodie table path to run partition TTL | empty string | +| strategy | The partition TTL strategy, corresponding to a implementation of `PartitionTTLStrategy` | KEEP_BY_TIME | +| predicate | Partition predicate for TTL, will only apply ttl strategy on the partitions selected by this predicate | empty string | Besides SparkSQL call commands, we can support run TTL management with a spark jar like running clustering by `HoodieClusteringJob` and run TTL with a flink job like `HoodieFlinkClusteringJob` in the future. @@ -197,11 +197,12 @@ We can do a lot of things about TTL management in the future: * Support record level TTL management * Move the partitions to be cleaned up to cold/cheaper storage in objects stores instead of delete them forever * Stash the partitions to be cleaned up in .stashedForDeletion folder (at .hoodie level) and introduce some recover mechanism, for data security. +* Support advanced ttl policies, for example wild card partition spec.` * ... ## Rollout/Adoption Plan -Hoodie Partition TTL Management V1 will support a simple KEEP_BY_TIME strategy at first, and others can implement their own `PartitionTTLManagementStrategy`. +Hoodie Partition TTL Management V1 will support a simple KEEP_BY_TIME strategy at first, and others can implement their own `PartitionTTLStrategy`. Add this feature won't affect existing functions. From 2fdcbfde234011a1b2880fa7d0ad8b7ccb89fcf4 Mon Sep 17 00:00:00 2001 From: stream2000 <18889897088@163.com> Date: Thu, 7 Dec 2023 14:32:36 +0800 Subject: [PATCH 9/9] address comments --- rfc/rfc-65/rfc-65.md | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md index f983eb605dbe..4e750dced1f0 100644 --- a/rfc/rfc-65/rfc-65.md +++ b/rfc/rfc-65/rfc-65.md @@ -47,13 +47,22 @@ hoodie.partition.ttl.days.retain=10 The config `hoodie.partition.ttl.strategy.class` is to provide a strategy class (subclass of `PartitionTTLStrategy`) to get expired partition paths to delete. And `hoodie.partition.ttl.days.retain` is the strategy value used by `KeepByTimePartitionTTLStrategy` which means that we will expire partitions that haven't been modified for this strategy value set. We will cover the `KeepByTimeTTLStrategy` strategy in detail in the next section. -The core definition of `PartitionTTLManagemenStrategy` looks like this: +The core definition of `PartitionTTLStrategy` looks like this: ```java /** * Strategy for partition-level TTL management. */ public abstract class PartitionTTLStrategy { + + protected final HoodieTable hoodieTable; + protected final HoodieWriteConfig writeConfig; + + public PartitionTTLStrategy(HoodieTable hoodieTable) { + this.writeConfig = hoodieTable.getConfig(); + this.hoodieTable = hoodieTable; + } + /** * Get expired partition paths for a specific partition ttl management strategy. * @@ -96,18 +105,26 @@ The `getPartitionPathsForTTL` method will look like this: /** * Strategy for partition-level TTL management. */ -public abstract class PartitionTTLManagemenStrategy { +public abstract class PartitionTTLStrategy { + protected final HoodieTable hoodieTable; + protected final HoodieWriteConfig writeConfig; + + public PartitionTTLStrategy(HoodieTable hoodieTable) { + this.writeConfig = hoodieTable.getConfig(); + this.hoodieTable = hoodieTable; + } + /** * Scan and list all partitions for partition TTL management. * * @return Partitions to apply TTL strategy */ protected List getPartitionPathsForTTL() { - if (StringUtils.isNullOrEmpty(config.getTTLPartitionSelected())) { + if (StringUtils.isNullOrEmpty(writeConfig.getTTLPartitionSelected())) { return getMatchedPartitions(); } else { // Return All partition paths - return FSUtils.getAllPartitionPaths(hoodieTable.getContext(), config.getMetadataConfig(), config.getBasePath()); + return FSUtils.getAllPartitionPaths(hoodieTable.getContext(), writeConfig.getMetadataConfig(), writeConfig.getBasePath()); } } } @@ -167,7 +184,6 @@ We can do similar thing for partition TTL management. The config for async ttl m | hoodie.ttl.async.enabled | Enable running of TTL management service, asynchronously as writes happen on the table. | False | | hoodie.ttl.async.max.commits | Control frequency of async TTL management by specifying after how many commits TTL management should be triggered. | 4 | - We can easily implement async ttl management for both spark and flink engine since we only need to call `hoodieTable.managePartitionTTL`. And we can support synchronized ttl management if we want. #### Run by Independent Job