Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,15 @@ private PrecomputedParameters(
this.stateful = stateful;
}

static PrecomputedParameters precompute(
public static PrecomputedParameters precompute(
boolean immutableTargetType, TypeSerializer<Object>[] fieldSerializers) {
return precompute(immutableTargetType, false, fieldSerializers);
}

public static PrecomputedParameters precompute(
boolean immutableTargetType,
boolean forceFieldsImmutable,
TypeSerializer<Object>[] fieldSerializers) {
Preconditions.checkNotNull(fieldSerializers);
int totalLength = 0;
boolean fieldsImmutable = true;
Expand All @@ -239,7 +246,7 @@ static PrecomputedParameters precompute(
if (fieldSerializer != fieldSerializer.duplicate()) {
stateful = true;
}
if (!fieldSerializer.isImmutableType()) {
if (!forceFieldsImmutable && !fieldSerializer.isImmutableType()) {
fieldsImmutable = false;
}
if (fieldSerializer.getLength() < 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.orderedmultisetstate;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.orderedmultisetstate.linked.LinkedMultiSetState;
import org.apache.flink.util.function.FunctionWithException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* An {@link OrderedMultiSetState} that switches dynamically between {@link ValueStateMultiSetState}
* and {@link LinkedMultiSetState} based on the number of elements.
*/
class AdaptiveOrderedMultiSetState implements OrderedMultiSetState<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(AdaptiveOrderedMultiSetState.class);

private final OrderedMultiSetState<RowData> smallState;
private final OrderedMultiSetState<RowData> largeState;
Comment on lines +44 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Code would be a bit more self-documenting if you used here:

    private final ValueStateMultiSetState<RowData> smallState;
    private final LinkedMultiSetState<RowData> largeState;
    ```
And it would also fit the Java doc description then. So either change the types to concrete classes, or rephrase the java doc?
    

private final long switchToLargeThreshold;
private final long switchToSmallThreshold;

AdaptiveOrderedMultiSetState(
OrderedMultiSetState<RowData> smallState,
OrderedMultiSetState<RowData> largeState,
long switchToLargeThreshold,
long switchToSmallThreshold) {
checkArgument(switchToLargeThreshold > switchToSmallThreshold);
this.smallState = smallState;
this.largeState = largeState;
this.switchToLargeThreshold = switchToLargeThreshold;
this.switchToSmallThreshold = switchToSmallThreshold;
LOG.info(
"Created {} with thresholds: {}=>large, {}=>small",
this.getClass().getSimpleName(),
switchToLargeThreshold,
switchToSmallThreshold);
}

@Override
public SizeChangeInfo add(RowData element, long timestamp) throws Exception {
return execute(state -> state.add(element, timestamp), Function.identity(), "add");
}

@Override
public SizeChangeInfo append(RowData element, long timestamp) throws Exception {
return execute(state -> state.append(element, timestamp), Function.identity(), "append");
}

@Override
public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
if (smallState.isEmpty()) {
return largeState.iterator();
} else {
return smallState.iterator();
}
}

@Override
public boolean isEmpty() throws IOException {
// large state check is faster
return largeState.isEmpty() || smallState.isEmpty();
}

@Override
public Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> remove(RowData element)
throws Exception {
return execute(state -> state.remove(element), ret -> ret.f2, "remove");
}

@Override
public void clear() {
clearCache();
smallState.clear();
largeState.clear();
}

@Override
public void loadCache() throws IOException {
smallState.loadCache();
largeState.loadCache();
}

@Override
public void clearCache() {
smallState.clearCache();
largeState.clearCache();
}

private <T> T execute(
FunctionWithException<OrderedMultiSetState<RowData>, T, Exception> stateOp,
Function<T, SizeChangeInfo> getSizeChangeInfo,
String action)
throws Exception {

final boolean isUsingLarge = isEmptyCaching(smallState) && !isEmptyCaching(largeState);

// start with small state, i.e. choose smallState when both are empty
OrderedMultiSetState<RowData> currentState = isUsingLarge ? largeState : smallState;
OrderedMultiSetState<RowData> otherState = isUsingLarge ? smallState : largeState;

T result = stateOp.apply(currentState);
SizeChangeInfo sizeInfo = getSizeChangeInfo.apply(result);

final boolean thresholdReached =
isUsingLarge
? sizeInfo.sizeAfter <= switchToSmallThreshold
: sizeInfo.sizeAfter >= switchToLargeThreshold;

if (thresholdReached) {
LOG.debug(
"Switch {} -> {} because '{}' resulted in state size change {} -> {}",
currentState.getClass().getSimpleName(),
otherState.getClass().getSimpleName(),
action,
sizeInfo.sizeBefore,
sizeInfo.sizeAfter);
switchState(currentState, otherState);
}

clearCache();
return result;
}

private boolean isEmptyCaching(OrderedMultiSetState<RowData> state) throws IOException {
state.loadCache();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would move this loadCache call out of here, to the top of the execute method.

return state.isEmpty();
}

private void switchState(OrderedMultiSetState<RowData> src, OrderedMultiSetState<RowData> dst)
throws Exception {
Iterator<Tuple2<RowData, Long>> it = src.iterator();
while (it.hasNext()) {
Tuple2<RowData, Long> next = it.next();
dst.append(next.f0, next.f1);
}
src.clear();
}

public static AdaptiveOrderedMultiSetState create(
OrderedMultiSetStateConfig orderedMultiSetStateConfig,
String backendTypeIdentifier,
OrderedMultiSetState<RowData> smallState,
OrderedMultiSetState<RowData> largeState) {
return new AdaptiveOrderedMultiSetState(
smallState,
largeState,
orderedMultiSetStateConfig
.getAdaptiveHighThresholdOverride()
.orElse(
isHeap(backendTypeIdentifier)
? ADAPTIVE_HEAP_HIGH_THRESHOLD
: ADAPTIVE_ROCKSDB_HIGH_THRESHOLD),
orderedMultiSetStateConfig
.getAdaptiveLowThresholdOverride()
.orElse(
isHeap(backendTypeIdentifier)
? ADAPTIVE_HEAP_LOW_THRESHOLD
: ADAPTIVE_ROCKSDB_LOW_THRESHOLD));
}

private static final long ADAPTIVE_HEAP_HIGH_THRESHOLD = 400;
private static final long ADAPTIVE_HEAP_LOW_THRESHOLD = 300;
private static final long ADAPTIVE_ROCKSDB_HIGH_THRESHOLD = 50;
private static final long ADAPTIVE_ROCKSDB_LOW_THRESHOLD = 40;

private static boolean isHeap(String stateBackend) {
String trim = stateBackend.trim();
return trim.equalsIgnoreCase("hashmap") || trim.equalsIgnoreCase("heap");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.orderedmultisetstate;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.orderedmultisetstate.linked.LinkedMultiSetState;

import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;

/**
* This class represents an interface for managing an ordered multi-set state in Apache Flink. It
* provides methods to add, append, and remove elements while maintaining insertion order.
*
* <p>The state supports two types of semantics for adding elements:
*
* <ul>
* <li><b>Normal Set Semantics:</b> Replaces an existing matching element with the new one.
* <li><b>Multi-Set Semantics:</b> Appends the new element, allowing duplicates.
* </ul>
*
* <p>Removal operations are supported with different result types, indicating the outcome of the
* removal process, such as whether all elements were removed, the last added element was removed,
* or no elements were removed.
*
* @param <T> The type of elements stored in the state.
*/
@Internal
@Experimental
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 aren't those two mutually exclusive? Class can either be Internal and not intended to be used outside of the Flink's repo, or Experimental/PublicEvolving/Public depending on the stability of the said api

public interface OrderedMultiSetState<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be actually named SequencedMultiSetState? Take a look at SequencedSet and SequencedCollection?


/**
* Add the given element using a normal (non-multi) set semantics: if a matching element exists
* already, replace it (the timestamp is updated).
*/
SizeChangeInfo add(T element, long timestamp) throws Exception;

/** Add the given element using a multi-set semantics, i.e. append. */
SizeChangeInfo append(T element, long timestamp) throws Exception;
Comment on lines +54 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both? 🤔


/** Get iterator over all remaining elements and their timestamps, in order of insertion. */
Iterator<Tuple2<T, Long>> iterator() throws Exception;
Copy link
Contributor

@davidradl davidradl Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the multi set changes (i.e. there is a removal) under the iterator what will happen? AI unit test for this would be good. It would be useful to understand any locking that has been considered or is in place.


/** Tells whether any state exists (in the given key context). */
boolean isEmpty() throws IOException;

/**
* Remove the given element. If there are multiple instances of the same element, remove the
* first one in insertion order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious : should we allow the user to choose LIFO or FIFO for the remove ?

*/
Tuple3<RemovalResultType, Optional<T>, SizeChangeInfo> remove(T element) throws Exception;

/** Clear the state (in the current key context). */
void clear();

/** Load cache. */
void loadCache() throws IOException;

/** Clear caches. */
void clearCache();

/** Removal Result Type. */
enum RemovalResultType {
/**
* Nothing was removed (e.g. as a result of TTL or not matching key), the result will not
* contain any elements.
*/
NOTHING_REMOVED,
/** All elements were removed. The result will contain the last removed element. */
ALL_REMOVED,
/**
* The most recently added element was removed. The result will contain the element added
* before it.
*/
REMOVED_LAST_ADDED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see tests checking these removal result types.

/**
* An element was removed, it was not the most recently added, there are more elements. The
* result will not contain any elements
*/
REMOVED_OTHER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why nothing is returned in this case, this seems inconsistent with REMOVED_LAST_ADDED which will return the element added before it.

}

enum Strategy {
VALUE_STATE,
MAP_STATE,
ADAPTIVE
}

/**
* Represents the change in size of a multi-set before and after an operation.
*
* <p>This class is used to track the size of the multi-set state before and after a
* modification, such as adding or removing elements.
*
* <p>Fields:
*
* <ul>
* <li>{@code sizeBefore}: The size of the multi-set before the operation.
* <li>{@code sizeAfter}: The size of the multi-set after the operation.
* </ul>
*
* <p>This class is immutable and provides a simple way to encapsulate size change information.
*/
class SizeChangeInfo {
public final long sizeBefore;
public final long sizeAfter;

public SizeChangeInfo(long sizeBefore, long sizeAfter) {
this.sizeBefore = sizeBefore;
this.sizeAfter = sizeAfter;
}

public boolean wasEmpty() {
return sizeBefore == 0;
}

public boolean isEmpty() {
return sizeAfter == 0;
}

@Override
public String toString() {
return "SizeChangeInfo{"
+ "sizeBefore="
+ sizeBefore
+ ", sizeAfter="
+ sizeAfter
+ '}';
}
}

static OrderedMultiSetState<RowData> create(
OrderedMultiSetStateContext parameters,
RuntimeContext ctx,
String backendTypeIdentifier) {
switch (parameters.config.getStrategy()) {
case MAP_STATE:
return LinkedMultiSetState.create(parameters, ctx);
case VALUE_STATE:
return ValueStateMultiSetState.create(parameters, ctx);
case ADAPTIVE:
return AdaptiveOrderedMultiSetState.create(
parameters.config,
backendTypeIdentifier,
ValueStateMultiSetState.create(parameters, ctx),
LinkedMultiSetState.create(parameters, ctx));
default:
throw new UnsupportedOperationException(parameters.config.getStrategy().name());
}
}
}
Loading