Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
Expand All @@ -35,6 +34,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
Expand All @@ -54,8 +54,6 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;

/**
* The NonTimeRangeUnboundedPrecedingFunction class is a specialized implementation for processing
* unbounded OVER window aggregations, particularly for non-time-based range queries in Apache
Expand Down Expand Up @@ -97,14 +95,12 @@
* recalculated, which could be inefficient for large windows
*/
public abstract class AbstractNonTimeUnboundedPrecedingOver<K>
extends KeyedProcessFunction<K, RowData, RowData> {
extends KeyedProcessFunctionWithCleanupState<K, RowData, RowData> {
private static final long serialVersionUID = 1L;

private static final Logger LOG =
LoggerFactory.getLogger(AbstractNonTimeUnboundedPrecedingOver.class);

private final long stateRetentionTime;

private final GeneratedAggsHandleFunction generatedAggsHandler;
private final GeneratedRecordEqualiser generatedRecordEqualiser;
private final GeneratedRecordEqualiser generatedSortKeyEqualiser;
Expand Down Expand Up @@ -170,7 +166,7 @@ public AbstractNonTimeUnboundedPrecedingOver(
LogicalType[] sortKeyTypes,
RowDataKeySelector sortKeySelector,
InternalTypeInfo<RowData> accKeyRowTypeInfo) {
this.stateRetentionTime = stateRetentionTime;
super(stateRetentionTime, stateRetentionTime);
this.generatedAggsHandler = genAggsHandler;
this.generatedRecordEqualiser = genRecordEqualiser;
this.generatedSortKeyEqualiser = genSortKeyEqualiser;
Expand Down Expand Up @@ -204,13 +200,8 @@ public void open(OpenContext openContext) throws Exception {
generatedSortKeyComparator.newInstance(
getRuntimeContext().getUserCodeClassLoader());

StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);

// Initialize state to maintain id counter
idStateDescriptor = new ValueStateDescriptor<Long>("idState", Long.class);
if (ttlConfig.isEnabled()) {
idStateDescriptor.enableTimeToLive(ttlConfig);
}
idState = getRuntimeContext().getState(idStateDescriptor);

// Input elements are all binary rows as they came from network
Expand All @@ -224,30 +215,24 @@ public void open(OpenContext openContext) throws Exception {
sortedListStateDescriptor =
new ValueStateDescriptor<List<Tuple2<RowData, List<Long>>>>(
"sortedListState", listTypeInfo);
if (ttlConfig.isEnabled()) {
sortedListStateDescriptor.enableTimeToLive(ttlConfig);
}
sortedListState = getRuntimeContext().getState(sortedListStateDescriptor);

// Initialize state which maintains the actual row
valueStateDescriptor =
new MapStateDescriptor<Long, RowData>(
"valueMapState", Types.LONG, inputRowTypeInfo);
if (ttlConfig.isEnabled()) {
valueStateDescriptor.enableTimeToLive(ttlConfig);
}
valueMapState = getRuntimeContext().getMapState(valueStateDescriptor);

// Initialize accumulator state per row
InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(accTypes);
accStateDescriptor =
new MapStateDescriptor<RowData, RowData>(
"accMapState", accKeyRowTypeInfo, accTypeInfo);
if (ttlConfig.isEnabled()) {
accStateDescriptor.enableTimeToLive(ttlConfig);
}

accMapState = getRuntimeContext().getMapState(accStateDescriptor);

initCleanupTimeState("NonTimeUnboundedPrecedingOverCleanupTime");

// metrics
this.numOfIdsNotFound =
getRuntimeContext().getMetricGroup().counter(IDS_NOT_FOUND_METRIC_NAME);
Expand All @@ -273,6 +258,9 @@ public void processElement(
KeyedProcessFunction<K, RowData, RowData>.Context ctx,
Collector<RowData> out)
throws Exception {
// register state-cleanup timer
registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());

RowKind rowKind = input.getRowKind();

switch (rowKind) {
Expand All @@ -288,6 +276,26 @@ public void processElement(
}

// Reset acc state since we can have out of order inserts in the ordered list
resetAndCleanupAggFuncs();
}

/**
* Called when the cleanup timer fires.
*
* <p>This method clears all associated state for the current key (idState, valueMapState,
* accMapState, sortedListState) atomically to implement state TTL behavior. Using a single
* timer ensures consistent cleanup of all related states.
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<RowData> out)
throws Exception {
if (stateCleaningEnabled && isProcessingTimeTimer(ctx) && needToCleanupState(timestamp)) {
cleanupState(idState, valueMapState, accMapState, sortedListState);
resetAndCleanupAggFuncs();
}
}

private void resetAndCleanupAggFuncs() throws Exception {
aggFuncs.resetAccumulators();
aggFuncs.cleanup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,18 +643,27 @@ public void testInsertWithStateTTLExpiration() throws Exception {
testHarness.processElement(insertRecord("key1", 2L, 201L));
validateState(function, thirdRecord, 1, 2, 1, 2, 2, 3, true);

// Output should contain 5 records till now
List<RowData> actualRows = testHarness.extractOutputValues();
assertThat(actualRows.size()).isEqualTo(5);

// expire the state
testHarness.setStateTtlProcessingTime(stateTtlTime.toMillis() + 1);
testHarness.setProcessingTime(stateTtlTime.toMillis() + 1);

// After insertion of the following record, there should be only 1 record in state
// After insertion of the following record, there should be only 1 record in state
GenericRowData fourthRecord = GenericRowData.of("key1", 5L, 500L);
testHarness.processElement(insertRecord("key1", 5L, 500L));
validateState(function, fourthRecord, 0, 1, 0, 1, 0, 1, true);

List<RowData> actualRows = testHarness.extractOutputValues();
// Verify only one new output record (i.e. 6th record) is emitted after state TTL expiry
actualRows = testHarness.extractOutputValues();
assertThat(actualRows.size()).isEqualTo(6);

// Aggregated value should be based on only the last inserted record
// The inserted record after ttl should be treated as the first record for that key
RowData expectedRowAfterStateTTLExpiry = outputRecord(RowKind.INSERT, "key1", 5L, 500L, 5L);
assertThat(actualRows.get(actualRows.size() - 1)).isEqualTo(expectedRowAfterStateTTLExpiry);

assertThat(function.getNumOfSortKeysNotFound().getCount()).isEqualTo(0L);
assertThat(function.getNumOfIdsNotFound().getCount()).isEqualTo(0L);
}
Expand Down Expand Up @@ -693,23 +702,34 @@ public void testInsertAndRetractWithStateTTLExpiration() throws Exception {
testHarness.processElement(insertRecord("key1", 5L, 502L));
validateState(function, fifthRecord, 2, 3, 1, 2, 4, 5, true);

// Output should contain 9 records till now
List<RowData> actualRows = testHarness.extractOutputValues();
assertThat(actualRows.size()).isEqualTo(9);

assertThat(function.getNumOfSortKeysNotFound().getCount()).isEqualTo(0L);
assertThat(function.getNumOfIdsNotFound().getCount()).isEqualTo(0L);

// expire the state
testHarness.setStateTtlProcessingTime(stateTtlTime.toMillis() + 1);
testHarness.setProcessingTime(stateTtlTime.toMillis() + 1);

// Retract a non-existent record due to state ttl expiration
testHarness.processElement(updateBeforeRecord("key1", 5L, 502L));

// Ensure state is null/empty
Long idValue = function.getRuntimeContext().getState(function.idStateDescriptor).value();
assertThat(idValue).isNull();
List<Tuple2<RowData, List<Long>>> sortedList =
function.getRuntimeContext().getState(function.sortedListStateDescriptor).value();
assertThat(sortedList).isNull();
MapState<RowData, RowData> mapState =
MapState<RowData, RowData> accMapState =
function.getRuntimeContext().getMapState(function.accStateDescriptor);
assertThat(mapState.isEmpty()).isTrue();
Long idValue = function.getRuntimeContext().getState(function.idStateDescriptor).value();
assertThat(idValue).isNull();
assertThat(accMapState.isEmpty()).isTrue();
MapState<Long, RowData> valueMapState =
function.getRuntimeContext().getMapState(function.valueStateDescriptor);
assertThat(valueMapState.isEmpty()).isTrue();

List<RowData> actualRows = testHarness.extractOutputValues();
// No new records should be emitted after retraction of non-existent record
actualRows = testHarness.extractOutputValues();
assertThat(actualRows.size()).isEqualTo(9);

assertThat(function.getNumOfSortKeysNotFound().getCount()).isEqualTo(1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,18 +590,28 @@ public void testInsertWithStateTTLExpiration() throws Exception {
testHarness.processElement(insertRecord("key1", 2L, 201L));
validateState(function, thirdRecord, 1, 2, 1, 2, 2, 3, true);

// Output should contain 3 records till now
List<RowData> actualRows = testHarness.extractOutputValues();
assertThat(actualRows.size()).isEqualTo(3);

// expire the state
testHarness.setStateTtlProcessingTime(stateTtlTime.toMillis() + 1);
testHarness.setProcessingTime(stateTtlTime.toMillis() + 1);

// After insertion of the following record, there should be only 1 record in state
// After insertion of the following record, there should be only 1 record in state
// After insertion of the following record, there should be only 1 record in state due to
// state ttl expiration
GenericRowData fourthRecord = GenericRowData.of("key1", 5L, 500L);
testHarness.processElement(insertRecord("key1", 5L, 500L));
validateState(function, fourthRecord, 0, 1, 0, 1, 0, 1, true);

List<RowData> actualRows = testHarness.extractOutputValues();
// Verify only one new output record (i.e. 4th record) is emitted after state TTL expiry
actualRows = testHarness.extractOutputValues();
assertThat(actualRows.size()).isEqualTo(4);

// Aggregated value should be based on only the last inserted record
// The inserted record after ttl should be treated as the first record for that key
RowData expectedRowAfterStateTTLExpiry = outputRecord(RowKind.INSERT, "key1", 5L, 500L, 5L);
assertThat(actualRows.get(actualRows.size() - 1)).isEqualTo(expectedRowAfterStateTTLExpiry);

assertThat(function.getNumOfSortKeysNotFound().getCount()).isEqualTo(0L);
assertThat(function.getNumOfIdsNotFound().getCount()).isEqualTo(0L);
}
Expand Down Expand Up @@ -640,23 +650,34 @@ public void testInsertAndRetractWithStateTTLExpiration() throws Exception {
testHarness.processElement(insertRecord("key1", 5L, 502L));
validateState(function, fifthRecord, 2, 3, 1, 2, 4, 5, true);

// Output should contain 5 records till now
List<RowData> actualRows = testHarness.extractOutputValues();
assertThat(actualRows.size()).isEqualTo(5);

assertThat(function.getNumOfSortKeysNotFound().getCount()).isEqualTo(0L);
assertThat(function.getNumOfIdsNotFound().getCount()).isEqualTo(0L);

// expire the state
testHarness.setStateTtlProcessingTime(stateTtlTime.toMillis() + 1);
testHarness.setProcessingTime(stateTtlTime.toMillis() + 1);

// Retract a non-existent record due to state ttl expiration
testHarness.processElement(updateBeforeRecord("key1", 5L, 502L));

// Ensure state is null/empty
Long idValue = function.getRuntimeContext().getState(function.idStateDescriptor).value();
assertThat(idValue).isNull();
List<Tuple2<RowData, List<Long>>> sortedList =
function.getRuntimeContext().getState(function.sortedListStateDescriptor).value();
assertThat(sortedList).isNull();
MapState<RowData, RowData> mapState =
MapState<RowData, RowData> accMapState =
function.getRuntimeContext().getMapState(function.accStateDescriptor);
assertThat(mapState.isEmpty()).isTrue();
Long idValue = function.getRuntimeContext().getState(function.idStateDescriptor).value();
assertThat(idValue).isNull();
assertThat(accMapState.isEmpty()).isTrue();
MapState<Long, RowData> valueMapState =
function.getRuntimeContext().getMapState(function.valueStateDescriptor);
assertThat(valueMapState.isEmpty()).isTrue();

List<RowData> actualRows = testHarness.extractOutputValues();
// No new records should be emitted after retraction of non-existent record
actualRows = testHarness.extractOutputValues();
assertThat(actualRows.size()).isEqualTo(5);

assertThat(function.getNumOfSortKeysNotFound().getCount()).isEqualTo(1L);
Expand Down