Skip to content

Commit

Permalink
[FLINK-30535] Use customized TtlTimeProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly authored and Myasuka committed Jan 4, 2024
1 parent 9da39ce commit 9fb3482
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyedStateBackend;

import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
Expand Down Expand Up @@ -62,6 +63,10 @@ public class StateBenchmarkBase extends BenchmarkBase {
protected KeyedStateBackend<Long> keyedStateBackend;

protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception {
return createKeyedStateBackend(TtlTimeProvider.DEFAULT);
}

protected KeyedStateBackend<Long> createKeyedStateBackend(TtlTimeProvider ttlTimeProvider) throws Exception {
Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
String stateDataDirPath = benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
File dataDir = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
/** Implementation for list state benchmark testing. */
public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
private final String STATE_NAME = "listState";
private final ListStateDescriptor<Long> STATE_DESC =
configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
private ListStateDescriptor<Long> stateDesc;
private ListState<Long> listState;
private List<Long> dummyLists;

Expand All @@ -64,7 +63,8 @@ public static void main(String[] args) throws RunnerException {
@Setup
public void setUp() throws Exception {
keyedStateBackend = createKeyedStateBackend();
listState = getListState(keyedStateBackend, STATE_DESC);
stateDesc = configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
listState = getListState(keyedStateBackend, stateDesc);
dummyLists = new ArrayList<>(listValueCount);
for (int i = 0; i < listValueCount; ++i) {
dummyLists.add(random.nextLong());
Expand All @@ -76,6 +76,7 @@ public void setUp() throws Exception {
public void setUpPerIteration() throws Exception {
for (int i = 0; i < setupKeyCount; ++i) {
keyedStateBackend.setCurrentKey((long) i);
setTtlWhenInitialization();
listState.add(random.nextLong());
}
// make sure only one sst file left, so all get invocation will access this single file,
Expand All @@ -84,15 +85,16 @@ public void setUpPerIteration() throws Exception {
if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
(RocksDBKeyedStateBackend<Long>) keyedStateBackend;
compactState(rocksDBKeyedStateBackend, STATE_DESC);
compactState(rocksDBKeyedStateBackend, stateDesc);
}
advanceTimePerIteration();
}

@TearDown(Level.Iteration)
public void tearDownPerIteration() throws Exception {
applyToAllKeys(
keyedStateBackend,
STATE_DESC,
stateDesc,
(k, state) -> {
keyedStateBackend.setCurrentKey(k);
state.clear();
Expand All @@ -101,7 +103,7 @@ public void tearDownPerIteration() throws Exception {
if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
(RocksDBKeyedStateBackend<Long>) keyedStateBackend;
compactState(rocksDBKeyedStateBackend, STATE_DESC);
compactState(rocksDBKeyedStateBackend, stateDesc);
} else {
System.gc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.state.benchmark.StateBenchmarkBase;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.infra.Blackhole;
Expand Down Expand Up @@ -70,12 +71,18 @@ public void setUp() throws Exception {
for (int i = 0; i < setupKeyCount; ++i) {
keyedStateBackend.setCurrentKey((long) i);
for (int j = 0; j < mapKeyCount; j++) {
setTtlWhenInitialization();
mapState.put(mapKeys.get(j), random.nextDouble());
}
}
keyIndex = new AtomicInteger();
}

@Setup(Level.Iteration)
public void setUpPerIteration() throws Exception {
advanceTimePerIteration();
}

@Benchmark
public void mapUpdate(StateBenchmarkBase.KeyValue keyValue) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.benchmark.ttl;

import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.benchmark.StateBenchmarkBase;
import org.openjdk.jmh.annotations.Param;

Expand All @@ -11,38 +31,78 @@
/** The base class for state tests with ttl. */
public class TtlStateBenchmarkBase extends StateBenchmarkBase {

private static final long initialTime = 1000000;

/** The expired time of ttl. */
public enum ExpiredTimeOptions {

/** 5 seconds. */
Seconds5(5000),
/** Expire 3 percent of the initial keys per iteration. */
Expire3PercentPerIteration(3),

/** never expired but enable the ttl. */
MaxTime(Long.MAX_VALUE);
NeverExpired(0);

private Time time;
ExpiredTimeOptions(long mills) {
time = Time.of(mills, TimeUnit.MILLISECONDS);
public long advanceTimePerIteration;
ExpiredTimeOptions(int expirePercentPerIteration) {
this.advanceTimePerIteration = initialTime * expirePercentPerIteration / 100;
}
}

@Param({"Seconds5", "MaxTime"})
protected ExpiredTimeOptions expiredTime;
@Param({"Expire3PercentPerIteration", "NeverExpired"})
protected ExpiredTimeOptions expiredOption;

@Param({"OnCreateAndWrite", "OnReadAndWrite"})
protected StateTtlConfig.UpdateType updateType;

@Param({"ReturnExpiredIfNotCleanedUp", "NeverReturnExpired"})
@Param({"NeverReturnExpired", "ReturnExpiredIfNotCleanedUp"})
protected StateTtlConfig.StateVisibility stateVisibility;

protected ControllableTtlTimeProvider timeProvider;

/** Configure the state descriptor with ttl. */
protected <T extends StateDescriptor<?, ?>> T configTtl(T stateDescriptor) {
StateTtlConfig ttlConfig =
new StateTtlConfig.Builder(expiredTime.time)
new StateTtlConfig.Builder(Time.of(initialTime, TimeUnit.MILLISECONDS))
.setUpdateType(updateType)
.setStateVisibility(stateVisibility)
.build();
stateDescriptor.enableTimeToLive(ttlConfig);
return stateDescriptor;
}

@Override
protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception {
timeProvider = new ControllableTtlTimeProvider();
return createKeyedStateBackend(timeProvider);
}

protected void setTtlWhenInitialization() {
timeProvider.setCurrentTimestamp(random.nextLong(initialTime + 1));
}

protected void finishInitialization() {
timeProvider.setCurrentTimestamp(initialTime);
}

protected void advanceTimePerIteration() {
timeProvider.advanceTimestamp(expiredOption.advanceTimePerIteration);
}

static class ControllableTtlTimeProvider implements TtlTimeProvider {

long current = 0L;

@Override
public long currentTimestamp() {
return current;
}

public void setCurrentTimestamp(long value) {
current = value;
}

public void advanceTimestamp(long value) {
current += value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
Expand Down Expand Up @@ -53,10 +54,17 @@ public void setUp() throws Exception {
keyedStateBackend = createKeyedStateBackend();
valueState = getValueState(keyedStateBackend, configTtl(new ValueStateDescriptor<>("kvState", Long.class)));
for (int i = 0; i < setupKeyCount; ++i) {
setTtlWhenInitialization();
keyedStateBackend.setCurrentKey((long) i);
valueState.update(random.nextLong());
}
keyIndex = new AtomicInteger();
finishInitialization();
}

@Setup(Level.Iteration)
public void setUpPerIteration() throws Exception {
advanceTimePerIteration();
}

@Benchmark
Expand Down

0 comments on commit 9fb3482

Please sign in to comment.