Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5823][RFC-65] RFC for Partition TTL Management #8062

Merged
merged 9 commits into from Dec 8, 2023
210 changes: 210 additions & 0 deletions rfc/rfc-65/rfc-65.md
@@ -0,0 +1,210 @@
## Proposers

- @stream2000
- @hujincalrin
- @huberylee
- @YuweiXiao

## Approvers

## Status

JIRA: [HUDI-5823](https://issues.apache.org/jira/browse/HUDI-5823)

## Abstract

In some classic hudi use cases, users partition hudi data by time and are only interested in data from a recent period
of time. The outdated data is useless and costly, we need a TTL(Time-To-Live) management mechanism to prevent the
dataset from growing infinitely.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess you are talking about GDPR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: this is very different, the proposal for this doc only yields target partitons, it does not remove the whole partition.

This proposal introduces Partition TTL Management strategies to hudi, people can config the strategies by table config
directly or by call commands. With proper configs set, Hudi can find out which partitions are outdated and delete them.
stream2000 marked this conversation as resolved.
Show resolved Hide resolved

This proposal introduces Partition TTL Management service to hudi. TTL management is like other table services such as Clean/Compaction/Clustering.
The user can config their ttl strategies through write configs and Hudi will help users find expired partitions and delete them automatically.

## Background

TTL management mechanism is an important feature for databases. Hudi already provides a `delete_partition` interface to
delete outdated partitions. However, users still need to detect which partitions are outdated and
call `delete_partition` manually, which means that users need to define and implement some kind of TTL strategies, find expired partitions and call call `delete_partition` by themselves. As the scale of installations grew, it is becoming increasingly important to implement a user-friendly TTL management mechanism for hudi.

## Implementation
stream2000 marked this conversation as resolved.
Show resolved Hide resolved

Our main goals are as follows:

* Providing an extensible framework for partition TTL management.
* Implement a simple KEEP_BY_TIME strategy, which can be executed through independent Spark job, synchronous or asynchronous table services.

### Strategy Definition

The TTL strategies is similar to existing table service strategies. We can define TTL strategies like defining a clustering/clean/compaction strategy:

```properties
hoodie.partition.ttl.management.strategy=KEEP_BY_TIME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to consider other policies? It's really confusing, that TTL (Time-To-Live) could consider anything else then time or time-dependent. In your first commit I found that there were KEEP_BY_SIZE and KEEP_BY_COUNT. But KEEP_BY_SIZE looks like we wanna delete some huge partition with a lot of data just because it is big, and KEEP_BY_COUNT is not suitable due to Hudi's partitioning model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change the word TTL to lifecycle if we want to support other types of data expiration policies. In current version this RFC, we do not plan to support other types of policies like KEEP_BY_SIZE since they rely on the condition that the partition values are comparable which is not always true. However we do have some users who have this kind of requirement so we need to make the policy extensible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoodie.partition.ttl.strategy ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's better to implement only one processing for Partition TTL with accounting time only. For KEEP_BY_SIZE partition level looks not suitable. It's appropriate for record level processing.
So, we don't need hoodie.partition.ttl.strategy setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to provide the hoodie.partition.ttl.strategy abtraction so that users can implement their own strategies and reuse the infrastructures provided by RFC-65 to delete expired partitions. For example, user wants to add a new strategy that delete partitions by their specified logic.

In current design, we will implement KEEP_BY_TIME only. However, it doesn't mean that we don't need any other kind of ttl strategy in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, user wants to add a new strategy that delete partitions by their specified logic.

Do you have any example of this logic? Unfortunately, I couldn't come up with any of them.

hoodie.partition.ttl.management.strategy.class=org.apache.hudi.table.action.ttl.strategy.KeepByTimePartitionTTLManagementStrategy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoodie.partition.ttl.strategy.class ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sure. Will change it later.

hoodie.partition.ttl.days.retain=10
```

The config `hoodie.partition.ttl.management.strategy.class` is to provide a strategy class (subclass of `PartitionTTLManagementStrategy`) to get expired partition paths to delete. And `hoodie.partition.ttl.days.retain` is the strategy value used by `KeepByTimePartitionTTLManagementStrategy` which means that we will expire partitions that haven't been modified for this strategy value set. We will cover the `KeepByTimeTTLManagementStrategy` strategy in detail in the next section.

The core definition of `PartitionTTLManagementStrategy` looks like this:

```java
/**
* Strategy for partition-level TTL management.
*/
public abstract class PartitionTTLManagementStrategy {
/**
* Get expired partition paths for a specific partition ttl management strategy.
*
* @return Expired partition paths.
*/
public abstract List<String> getExpiredPartitionPaths();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pass around basic info like table path or meta client so that user can customize more easily? For example, how can a customized class knows the table path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see poc code: https://github.com/apache/hudi/pull/9723/files#diff-854a305c2cf1bbe8545bbe78866f53e84a489b60390ea1cfc7f64ed61165a0aaR33

Will provide a constructor like:

public PartitionTTLManagementStrategy(HoodieTable hoodieTable) {
    this.writeConfig = hoodieTable.getConfig();
    this.hoodieTable = hoodieTable;
 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then make it clear in the pseudocode code.

}
```

Users can provide their own implementation of `PartitionTTLManagementStrategy` and hudi will help delete the expired partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to be difficult, it's just providing TTL functionality by custom implementation of PartitionTTLManagementStrategy is not user friendly.
We want to automate detection of outdated partitions and calling delete_partition. Could we just allow user to set partition path specification with TTL value, and implement everything internally?

From my point of view, there are two main entities in TTL:

  • object
    In our case, it's partition, we define it by using spec.
  • definition of outdating
    It should be time or something time-dependent. In our case, we could compare difference of a current time and _hoodie_commit_time with user-defined delta value.

This is a main scope for TTL, and we shouldn't allow to have more flexibility.
Customized implementation of PartitionTTLManagementStrategy will allow to do anything with partitions. It still could be PartitionManagementStrategy, but then we shouldn't named it with TTL part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review~

it's just providing TTL functionality by custom implementation of PartitionTTLManagementStrategy is not user friendly.

For common users that want to do TTL management, they just need to add some write configs to their ingestion job. In the simplest situation they may just add the following config to enable async ttl management:

hoodie.partition.ttl.days.retain=10
hoodie.ttl.management.async.enabled=true

The TTL policy will default to KEEP_BY_TIME and we can automatically detect expired partitions by their last modified time and delete them.

However, this simple policy may not suit for all users, they may need to implement their own partition TTL policy so we provide an abstraction PartitionTTLManagementStrategy that returns expired partitions as users need and hudi will help delete them.

This is a main scope for TTL, and we shouldn't allow to have more flexibility.
Customized implementation of PartitionTTLManagementStrategy will allow to do anything with partitions. It still could be PartitionManagementStrategy, but then we shouldn't named it with TTL part.

Customized implementation of PartitionTTLManagementStrategy will only provide a list of expired partitions, and hudi will help delete them. We can rename the word PartitionTTLManagementStrategy to PartitionLifecycleManagementStrategy since we do not always delete the partitions by time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TTL policy will default to KEEP_BY_TIME and we can automatically detect expired partitions by their last modified time and delete them.

Unfortunately, I don't see any other strategies except KEEP_BY_TIME for partition level TTL.

Customized implementation of PartitionTTLManagementStrategy will only provide a list of expired partitions, and hudi will help delete them. We can rename the word PartitionTTLManagementStrategy to PartitionLifecycleManagementStrategy since we do not always delete the partitions by time.

I see a partition on different object levels as:

level time size
the whole partition TTL not applicable, could be trigger for lower levels
file groups managed by other services
file slices managed by other services
records TTL oldest could be removed by partition size threshold

So, we don't need anything else except KEEP_BY_TIME for partition level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I don't see any other strategies except KEEP_BY_TIME for partition level TTL.

size: not applicable, could be trigger for lower levels

We need to make the design extensible and help users with their own expire logic easy to use. I think it's better to have a strategy-level abstraction.


### KeepByTimeTTLManagementStrategy

We will provide a strategy call `KeepByTimePartitionTTLManagementStrategy` in the first version of partition TTL management implementation.

The `KeepByTimePartitionTTLManagementStrategy` will calculate the `lastModifiedTime` for each input partitions. If duration between now and 'lastModifiedTime' for the partition is larger than what `hoodie.partition.ttl.days.retain` configured, `KeepByTimePartitionTTLManagementStrategy` will mark this partition as an expired partition. We use day as the unit of expired time since it is very common-used for datalakes. Open to ideas for this.

we will to use the largest commit time of committed file groups in the partition as the partition's
`lastModifiedTime`. So any write (including normal DMLs, clustering etc.) with larger instant time will change the partition's `lastModifiedTime`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current realization HoodiePartitionMetadata provides only commitTime (partition created commit time) and partitionDepth properties. We can add new lastModifiedTime property in .hoodie_partition_metadata, which is updated on every commit/deltacommit to corresponding partition.

We need only to think about migration from version without partition level TTL to a new one with this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always think twice before making format changes. We have considered using HoodiePartitionMetadata to provide the lastModifiedTime, however, the current implementation of partition metadata does not support transactions, and will never be modified since the first creation. Not sure it is a good choice for maintaining the lastModifiedTime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have no notion of lastModifiedTime, do you mean lastCommitTime or latestCommitTime?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, HoodiePartitionMetadata, .hoodie_partition_metadata files, provides only commitTime and partitionDepth properties. This file is written during partition creation and is not updated later. We can make this file more usable by adding a new property, for instance, lastUpdateTime for saving time of the last upsert/delete operation on the partition with each commit/deltacommit/replacecommit. And use this property to handle for Partition TTL check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, leverage .hoodie_partition_metadata will bring format change, and it doesn't support any kind of transaction currently. As discussed with @danny0405 , In 1.0.0 and later version which supports efficient completion time queries on the timeline(#9565), we will have a more elegant way to get the lastCommitTime.
You can see the the updated RFC for details.


For file groups generated by replace commit, it may not reveal the real insert/update time for the file group. However, we can assume that we won't do clustering for a partition without new writes for a long time when using the strategy. And in the future, we may introduce a more accurate mechanism to get `lastModifiedTime` of a partition, for example using metadata table.

For 1.0.0 and later hudi version which supports efficient completion time queries on the timeline(#9565), we can get partition's `lastModifiedTime` by scanning the timeline and get the last write commit for the partition. Also for efficiency, we can store the partitions' last modified time and current completion time in the replace commit metadata. The next time we need to calculate the partitions' last modified time, we can build incrementally from the replace commit metadata of the last ttl management.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will only support the feature in 1.0 right? Before 1.0, we do not have good manner to know when a log file is committed in a file slice and that might be a snag when calcurating the lastCommitTime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. We should support this feature in 1.0 only to have an accurate lastCommitTime.


stream2000 marked this conversation as resolved.
Show resolved Hide resolved
### Apply different strategies for different partitions

For some specific users, they may want to apply different strategies for different partitions. For example, they may have multi partition fileds(productId, day). For partitions under `product=1` they want to keep for 30 days while for partitions under `product=2` they want to keep for 7 days only.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can allow user to write specification,
spec = "*/product=1/*", TTL = 30 DAYS
spec = "*/product=2/*, TTL = 7 DAYS
It also allows to set default TTL by:
spec = "*", TTL = 2 MONTHS

In this case we will have multiple policies for some partitons. To resolve conflict automatically, we could add hoodie.ttl.policies.conflict.resolution.rule property with available options:
MIN_TTL - policy with min TTL value would be choosen, used in the case of "prefer cleaning" to minimize disk space costs,
MAX_TTL - policy with max TTL value would be choosen, used in the case of "prefer to save data" to prevent deletion of any data in the case of ambiguity.

spec is unique and can be used to drop some policies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement two independent ttl policies, one is simple while another is advanced as I commented below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind if think it is valuable if the specification could be a wildcard matching. We need to handle the lex parsing of the spec string though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind if think it is valuable if the specification could be a wildcard matching. We need to handle the lex parsing of the spec string though.

Yes, it's valuable. However, it will introduce more complexity into the design. In our inner version which have implement a wild card match ttl policy, we need to deal with things like:

  1. provide a spark SQL procedure for defining ttl policies, user can add/delete/update policies to hudi table
  2. policies will be stored in hoodie.properties. We need to consider the concurrent modification issue for hoodie.properties.
  3. We need a default policy, so that partitions that does not explicitly define a ttl policy uses the default policy. So, when implement it, we need to first find which partitions have defined TTL, and then find which partitions shoud apply the default policy.
  4. We need to care about policies defined in different level, for example we have a,b,c,d as partition fields, can we define ttl policies in any level for the partition? ( We do not allow this in our inner version)

However, for most of our users, they just need a simple ttl policy that just care about last modified time for each partition and do not need the complicated ttl policy ( which need to understand the multi-level ttl policy definition, the procedure to set/unset policies and so on), so we simplify the design.

In the simplified version of partition lifecycle management, we can define the policy just by some write configs, and we do not need a default policy or complicated regex rules. Partition lifecycle management now is more like a simple syntactic sugar for delete_partition which is easy to understand and use by normal users. For advanced users, we can provide an advanced policy in the future that takes care of things I mentioned above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can decrease complexity by not implementing different strategies for TTL processing and implement only one, which will use only time. And add processing of policies by wildcards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a default policy, so that partitions that does not explicitly define a ttl policy uses the default policy.

But if we have wildcards, then spec=* will be a default policy. And it is really confusing that we have some policies in hoodie.properties and there is one default policy somewhere else. We need to keep them all in one place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to care about policies defined in different level, for example we have a,b,c,d as partition fields, can we define ttl policies in any level for the partition?

If partition path is suitable for the some spec policy then compare lastUpdateTime of the partition with settings of the policy, no matter on which level this partition is. I suppose it is more clear behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can fire an RFC about your design of the wildcard spec version of TTL strategy, make it clear about how user should use this kind of strategy, the data structure of the strategy, how to store them(and problems about it), etc. We can have a discussion about issues of this kind strategy in that RFC.

And implement of this advanced strategy can be added in the future by implementing a new kind of ttl strategy.

I prefer to provide a simple strategy that we can configure with a few simple and easy-to-understand write configs. @danny0405 What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have wrote all my ideas in a separate RFC: #10248
@stream2000 , @danny0405 if it wouldn't bother you, could you, please, look at it?


For the first version of TTL management, we do not plan to implement a complicated strategy (For example, use an array to store strategies, introduce partition regex etc.). Instead, we add a new abstract method `getPartitionPathsForTTLManagement` in `PartitionTTLManagementStrategy` and provides a new config `hoodie.partition.ttl.management.partition.selected`.

If `hoodie.partition.ttl.management.partition.selected` is set, `getPartitionPathsForTTLManagement` will return partitions provided by this config. If not, `getPartitionPathsForTTLManagement` will return all partitions in the hudi table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it Hudi table's new property? Would it contain the list of all partitions if we want to apply some default TTL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be a write config, and we will only apply TTL policy to the partition specified by this config. If this config is not specified, we will apply ttl policy to all the partitions in hudi table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, it is too complicated. You have some default TTL, but there are a lot of policies in hoodie.properties. And also you have some filter in another place, hoodie.partition.ttl.management.partition.selected, which will intersect with all settings mentioned above.

But all this functionality could be done by specs with wildcards. You don't need anything else. And all policies would be stored in one place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this hoodie.partition.ttl.management.partition.selected after we implement the advance strategy in the future.


TTL management strategies will only be applied for partitions return by `getPartitionPathsForTTLManagement`.

Thus, if users want to apply different strategies for different partitions, they can do the TTL management multiple times, selecting different partitions and apply corresponding strategies. And we may provide a batch interface in the future to simplify this.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if I want to implement 6 partition policies, then I should prepare 6 PartitionTTLManagementStrategys and call them 6 times? TTL is a simple property and I don't think that it should be so hard to set it.

I offer to allow user set only spec and TTL values with support of add, show, and delete operations. We can also implement dry run for checking partition list.
Then during TTL checking:

  • Iterate through a list of all partitions and compare them to spec.
    Later, listing of all partitions could be optimized by storing it in a separate data structure. But the tough part in working with this data structure would be providing consistency in the case of concurrent writing, when new partitions are created and some old partitions are removed.
  • Compare partition to the list of policies.
    We can order this list by TTL value in ascending or descending order. The order would be defined by a resolution strategy for multiple policies conflict: MIN_TTL or MAX_TTL correspondingly.
  • And then we can compare current time with lastModifiedTime.
    Later, storing of this property could be optimized to decrease reading from HDFS, for instance, by adding this data to the mentioned partition related data structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I offer to allow user set only spec and TTL values with support of add, show, and delete operations.

So, if I want to implement 6 partition policies, then I should prepare 6 PartitionTTLManagementStrategys and call them 6 times?

We can implement the advanced TTL management policy in the future to provide the ability to set different policies for different partitions. To achieve this, we can introduce a new hoodie.partition.ttl.management.strategy and implement the logic you mentioned. For most users, providing a simple policy that TTL any partition whose last mod time (last time when data was added/updated), is greater than the TTL time specified will be enough.

@geserdugarov @nsivabalan What do you think about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to start with simple partition TTL. If you want I can do it. I have ideas about design with specs supported wildcards, simple conflict resolution strategy, and I've already tested the implementation. All my ideas I've mentioned in the corresponding comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can fire an RFC about this, and we can have a discussion about your design in detail. And it's not conflict with the design in RFC-65.


The `getPartitionPathsForTTLManagement` method will look like this:

```java
/**
* Strategy for partition-level TTL management.
*/
public abstract class PartitionTTLManagementStrategy {
/**
* Scan and list all partitions for partition TTL management.
*
* @return Partitions to apply TTL management strategy
*/
protected List<String> getPartitionPathsForTTLManagement() {
if (StringUtils.isNullOrEmpty(config.getTTLManagementPartitionSelected())) {
return getMatchedPartitions();
} else {
// Return All partition paths
return FSUtils.getAllPartitionPaths(hoodieTable.getContext(), config.getMetadataConfig(), config.getBasePath());
}
Copy link
Contributor

@danny0405 danny0405 Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the hoodieTable. got instantiated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the constructor of PartitionTTLManagementStrategy like

  protected final HoodieTable hoodieTable;
  protected final HoodieWriteConfig writeConfig;
  public PartitionTTLManagementStrategy(HoodieTable hoodieTable) {
    this.writeConfig = hoodieTable.getConfig();
    this.hoodieTable = hoodieTable;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the pseudocode clear.

}
}
```

### Executing TTL management
stream2000 marked this conversation as resolved.
Show resolved Hide resolved

Once we already have a proper `PartitionTTLManagementStrategy` implementation, it's easy to execute the ttl management.

```java
public class SparkTTLManagementActionExecutor <T> extends BaseSparkCommitActionExecutor<T> {
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
// Construct PartitionTTLManagementStrategy
PartitionTTLManagementStrategy strategy = (PartitionTTLManagementStrategy) ReflectionUtils.loadClass(
PartitionTTLManagementStrategy.checkAndGetPartitionTTLManagementStrategy(config),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config);

// Get expired partition paths
List<String> expiredPartitions = strategy.getExpiredPartitionPaths();

// Delete them reusing SparkDeletePartitionCommitActionExecutor
return new SparkDeletePartitionCommitActionExecutor<>(context, config, table, instantTime, expiredPartitions).execute();
}
}
```

We will add a new method `managePartitionTTL` in `HoodieTable` and `HoodieSparkCopyOnWriteTable` can implement it like this:

```java
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> managePartitionTTL(HoodieEngineContext context, String instantTime) {
return new SparkTTLManagementActionExecutor<>(context, config, this, instantTime).execute();
}
```

We can call `hoodieTable.managePartitionTTL` in independent flink/spark job, in async/sync inline table services like clustering/compaction/clean etc.


### User interface for Partition TTL Management

We can do partition TTL management inline with streaming ingestion job or do it with a independent batch job, for both spark and flink engine.

#### Run inline with Streaming Ingestion

Since we can run clustering inline with streaming ingestion job through the following config:

```properties
hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=5
```

We can do similar thing for partition TTL management. The config for async ttl management are:

| Config key | Remarks | Default |
|-----------------------------------------|--------------------------------------------------------------------------------------------------------------------|---------|
| hoodie.ttl.management.async.enabled | Enable running of TTL management service, asynchronously as writes happen on the table. | False |
| hoodie.ttl.management.async.max.commits | Control frequency of async TTL management by specifying after how many commits TTL management should be triggered. | 4 |


We can easily implement async ttl management for both spark and flink engine since we only need to call `hoodieTable.managePartitionTTL`. And we can support synchronized ttl management if we want.

#### Run by Independent Job

Deleting a large number of partitions is a heavy operation so we may want to run TTL management through a independent job. We will provide a SparkSQL Call Command to run TTL management and it may look like this:

```sql
call managePartitionTTL(table => 'hudi_table', strategy => 'KEEP_BY_TIME', daysRetain => '10', predicate => 'productid = 1');
```

The params are as follows:

| Param name | Remarks | Default |
|------------|-------------------------------------------------------------------------------------------------------------------|--------------|
| table | The hoodie table to run partition TTL management | empty string |
| basePath | The hoodie table path to run partition TTL management | empty string |
| strategy | The partition TTL management strategy, corresponding to a implementation of `PartitionTTLManagementStrategy` | KEEP_BY_TIME |
| predicate | Partition predicate for TTL management, will only apply ttl strategy on the partitions selected by this predicate | empty string |


Besides SparkSQL call commands, we can support run TTL management with a spark jar like running clustering by `HoodieClusteringJob` and run TTL with a flink job like `HoodieFlinkClusteringJob` in the future.

### Future plan

stream2000 marked this conversation as resolved.
Show resolved Hide resolved
We can do a lot of things about TTL management in the future:

* Support record level TTL management
* Move the partitions to be cleaned up to cold/cheaper storage in objects stores instead of delete them forever
* Stash the partitions to be cleaned up in .stashedForDeletion folder (at .hoodie level) and introduce some recover mechanism, for data security.
* ...

## Rollout/Adoption Plan

Hoodie Partition TTL Management V1 will support a simple KEEP_BY_TIME strategy at first, and others can implement their own `PartitionTTLManagementStrategy`.

Add this feature won't affect existing functions.

## Test Plan

Will add UTs and ITs to test this.