Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
Expand All @@ -321,7 +321,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
Expand Down Expand Up @@ -399,7 +399,7 @@ Heap state backend 会额外存储一个包括用户状态以及时间戳的 Jav
import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.disableCleanupInBackground()
.build();
```
Expand All @@ -409,7 +409,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.disableCleanupInBackground
.build
```
Expand Down Expand Up @@ -441,7 +441,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupFullSnapshot()
.build();
```
Expand All @@ -452,7 +452,7 @@ import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupFullSnapshot
.build
```
Expand Down Expand Up @@ -487,7 +487,7 @@ ttl_config = StateTtlConfig \
```java
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupIncrementally(10, true)
.build();
```
Expand All @@ -496,7 +496,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;
```scala
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupIncrementally(10, true)
.build
```
Expand Down Expand Up @@ -537,8 +537,8 @@ Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的
import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000, Time.hours(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build();
```
{{< /tab >}}
Expand All @@ -547,19 +547,20 @@ StateTtlConfig ttlConfig = StateTtlConfig
import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000, Time.hours(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build
```
{{< /tab >}}
{{< tab "Python" >}}
```python
from pyflink.common import Duration
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \
.cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \
.build()
```
{{< /tab >}}
Expand All @@ -573,7 +574,7 @@ RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一
定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目。
比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中。
该功能可以确保文件定期通过压缩过滤器压缩。
您可以通过`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicCompactionTime)`
您可以通过`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)`
方法设定定期压缩的时间。
定期压缩的时间的默认值是 30 天。
您可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理,但它将会触发更多压缩。
Expand Down
29 changes: 15 additions & 14 deletions docs/content/docs/dev/datastream/fault-tolerance/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
Expand All @@ -360,7 +360,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
Expand Down Expand Up @@ -447,7 +447,7 @@ in the background if supported by the configured state backend. Background clean
```java
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.disableCleanupInBackground()
.build();
```
Expand All @@ -456,7 +456,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
```scala
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.disableCleanupInBackground
.build
```
Expand Down Expand Up @@ -491,7 +491,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupFullSnapshot()
.build();
```
Expand All @@ -502,7 +502,7 @@ import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupFullSnapshot
.build
```
Expand Down Expand Up @@ -543,7 +543,7 @@ This feature can be configured in `StateTtlConfig`:
```java
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupIncrementally(10, true)
.build();
```
Expand All @@ -552,7 +552,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;
```scala
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupIncrementally(10, true)
.build
```
Expand Down Expand Up @@ -600,8 +600,8 @@ This feature can be configured in `StateTtlConfig`:
import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000, Time.hours(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build();
```
{{< /tab >}}
Expand All @@ -610,19 +610,20 @@ StateTtlConfig ttlConfig = StateTtlConfig
import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000, Time.hours(1))
.newBuilder(Duration.ofSeconds(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build
```
{{< /tab >}}
{{< tab "Python" >}}
```python
from pyflink.common import Duration
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \
.cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \
.build()
```
{{< /tab >}}
Expand All @@ -640,7 +641,7 @@ Periodic compaction could speed up expired state entries cleanup, especially for
Files older than this value will be picked up for compaction, and re-written to the same level as they were before.
It makes sure a file goes through compaction filters periodically.
You can change it and pass a custom value to
`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicCompactionTime)` method.
`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)` method.
The default value of Periodic compaction seconds is 30 days.
You could set it to 0 to turn off periodic compaction or set a small value to speed up expired state entries cleanup, but it
would trigger more compactions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) {
*/
@Nonnull
public Builder cleanupInRocksdbCompactFilter(
long queryTimeAfterNumEntries, Time periodicCompactionTime) {
long queryTimeAfterNumEntries, Duration periodicCompactionTime) {
strategies.put(
CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER,
new RocksdbCompactFilterCleanupStrategy(
Expand All @@ -354,7 +354,7 @@ public Builder cleanupInRocksdbCompactFilter(
*
* <p>If some specific cleanup is configured, e.g. {@link #cleanupIncrementally(int,
* boolean)} or {@link #cleanupInRocksdbCompactFilter(long)} or {@link
* #cleanupInRocksdbCompactFilter(long, Time)} , this setting does not disable it.
* #cleanupInRocksdbCompactFilter(long, Duration)} , this setting does not disable it.
*/
@Nonnull
public Builder disableCleanupInBackground() {
Expand Down Expand Up @@ -497,7 +497,7 @@ public static class RocksdbCompactFilterCleanupStrategy
* Default value is 30 days so that every file goes through the compaction process at least
* once every 30 days if not compacted sooner.
*/
static final Time DEFAULT_PERIODIC_COMPACTION_TIME = Time.days(30);
static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30);

static final RocksdbCompactFilterCleanupStrategy
DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY =
Expand All @@ -515,14 +515,14 @@ public static class RocksdbCompactFilterCleanupStrategy
* and re-written to the same level as they were before. It makes sure a file goes through
* compaction filters periodically. 0 means turning off periodic compaction.
*/
private final Time periodicCompactionTime;
private final Duration periodicCompactionTime;

private RocksdbCompactFilterCleanupStrategy(long queryTimeAfterNumEntries) {
this(queryTimeAfterNumEntries, DEFAULT_PERIODIC_COMPACTION_TIME);
}

private RocksdbCompactFilterCleanupStrategy(
long queryTimeAfterNumEntries, Time periodicCompactionTime) {
long queryTimeAfterNumEntries, Duration periodicCompactionTime) {
this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
this.periodicCompactionTime = periodicCompactionTime;
}
Expand All @@ -531,7 +531,7 @@ public long getQueryTimeAfterNumEntries() {
return queryTimeAfterNumEntries;
}

public Time getPeriodicCompactionTime() {
public Duration getPeriodicCompactionTime() {
return periodicCompactionTime;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -71,7 +72,8 @@ void testStateTtlConfigBuildWithCleanupInBackground() {
assertThat(incrementalCleanupStrategy.getCleanupSize()).isEqualTo(5);
assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord()).isFalse();
assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries()).isEqualTo(1000L);
assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime()).isEqualTo(Time.days(30));
assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime())
.isEqualTo(Duration.ofDays(30));
}

@Test
Expand Down
14 changes: 8 additions & 6 deletions flink-python/pyflink/datastream/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from enum import Enum
from typing import TypeVar, Generic, Iterable, List, Iterator, Dict, Tuple, Optional

from pyflink.common.time import Time
from pyflink.common.time import Duration, Time
from pyflink.common.typeinfo import TypeInformation, Types

__all__ = [
Expand Down Expand Up @@ -809,7 +809,7 @@ def cleanup_incrementally(self,
def cleanup_in_rocksdb_compact_filter(
self,
query_time_after_num_entries,
periodic_compaction_time=Time.days(30)) -> \
periodic_compaction_time=None) -> \
'StateTtlConfig.Builder':
"""
Cleanup expired state while Rocksdb compaction is running.
Expand All @@ -833,7 +833,8 @@ def cleanup_in_rocksdb_compact_filter(
self._strategies[
StateTtlConfig.CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER] = \
StateTtlConfig.CleanupStrategies.RocksdbCompactFilterCleanupStrategy(
query_time_after_num_entries, periodic_compaction_time)
query_time_after_num_entries,
periodic_compaction_time if periodic_compaction_time else Duration.of_days(30))
return self

def disable_cleanup_in_background(self) -> 'StateTtlConfig.Builder':
Expand Down Expand Up @@ -925,14 +926,15 @@ class RocksdbCompactFilterCleanupStrategy(CleanupStrategy):

def __init__(self,
query_time_after_num_entries: int,
periodic_compaction_time=Time.days(30)):
periodic_compaction_time=None):
self._query_time_after_num_entries = query_time_after_num_entries
self._periodic_compaction_time = periodic_compaction_time
self._periodic_compaction_time = periodic_compaction_time \
if periodic_compaction_time else Duration.of_days(30)

def get_query_time_after_num_entries(self) -> int:
return self._query_time_after_num_entries

def get_periodic_compaction_time(self) -> Time:
def get_periodic_compaction_time(self) -> Duration:
return self._periodic_compaction_time

EMPTY_STRATEGY = EmptyCleanupStrategy()
Expand Down
3 changes: 1 addition & 2 deletions flink-python/pyflink/fn_execution/embedded/java_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ def to_java_state_ttl_config(ttl_config: StateTtlConfig):

if rocksdb_compact_filter_cleanup_strategy:
j_ttl_config_builder.cleanupInRocksdbCompactFilter(
rocksdb_compact_filter_cleanup_strategy.get_query_time_after_num_entries(),
rocksdb_compact_filter_cleanup_strategy.get_periodic_compaction_time())
rocksdb_compact_filter_cleanup_strategy.get_query_time_after_num_entries())

return j_ttl_config_builder.build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -118,6 +119,6 @@ void testParseStateTtlConfigFromProto() {
assertThat(rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries())
.isEqualTo(1000);
assertThat(rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime())
.isEqualTo(Time.days(30));
.isEqualTo(Duration.ofDays(30));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void configCompactFilter(
columnFamilyOptionsMap.get(stateDesc.getName());
Preconditions.checkNotNull(columnFamilyOptions);
columnFamilyOptions.setPeriodicCompactionSeconds(
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().toSeconds());
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().getSeconds());

long queryTimeAfterNumEntries =
rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
Expand Down