Skip to content

Conversation

rkhachatryan
Copy link
Contributor

@rkhachatryan rkhachatryan commented Sep 30, 2025

Introduce OrderedMultiSetState and 3 implementations (map, value, adaptive) to be used in SInkUpsertMaterializerV2.

Test coverage is currently provided on the operator level (#27070).

I'm planning to add lower-level unit tests later to this PR.

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 30, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@rkhachatryan rkhachatryan marked this pull request as ready for review October 1, 2025 08:40
@rkhachatryan rkhachatryan requested a review from pnowojski October 1, 2025 08:40

/**
* Remove the given element. If there are multiple instances of the same element, remove the
* first one in insertion order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious : should we allow the user to choose LIFO or FIFO for the remove ?

* The most recently added element was removed. The result will contain the element added
* before it.
*/
REMOVED_LAST_ADDED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see tests checking these removal result types.

* An element was removed, it was not the most recently added, there are more elements. The
* result will not contain any elements
*/
REMOVED_OTHER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why nothing is returned in this case, this seems inconsistent with REMOVED_LAST_ADDED which will return the element added before it.

SizeChangeInfo append(T element, long timestamp) throws Exception;

/** Get iterator over all remaining elements and their timestamps, in order of insertion. */
Iterator<Tuple2<T, Long>> iterator() throws Exception;
Copy link
Contributor

@davidradl davidradl Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the multi set changes (i.e. there is a removal) under the iterator what will happen? AI unit test for this would be good. It would be useful to understand any locking that has been considered or is in place.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(partial review, I haven't yet reviewed linked variant and tests).

Comment on lines +50 to +51
@Internal
@Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 aren't those two mutually exclusive? Class can either be Internal and not intended to be used outside of the Flink's repo, or Experimental/PublicEvolving/Public depending on the stability of the said api

Comment on lines +44 to +45
private final OrderedMultiSetState<RowData> smallState;
private final OrderedMultiSetState<RowData> largeState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Code would be a bit more self-documenting if you used here:

    private final ValueStateMultiSetState<RowData> smallState;
    private final LinkedMultiSetState<RowData> largeState;
    ```
And it would also fit the Java doc description then. So either change the types to concrete classes, or rephrase the java doc?
    

}

private boolean isEmptyCaching(OrderedMultiSetState<RowData> state) throws IOException {
state.loadCache();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would move this loadCache call out of here, to the top of the execute method.

} else {
list.set(idx, toAdd);
}
valuesState.update(list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to cache writes? Or SinkUpsertMaterializer doesn't do more than a single write per input record?

/**
* Simple implementation of {@link OrderedMultiSetState} based on plain {@code ValueState<List>}.
*/
class ValueStateMultiSetState implements OrderedMultiSetState<RowData> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you benchmark the current SinkUpsertMaterializer vs new one using ValueStateMultiSetState?

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 3, 2025
*/
@Internal
@Experimental
public interface OrderedMultiSetState<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be actually named SequencedMultiSetState? Take a look at SequencedSet and SequencedCollection?

/**
* Add row, replacing any matching existing ones.
*
* @return RowKind.UPDATE_AFTER if an existing row was replaced; INSERT otherwise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated doc?

Comment on lines +181 to +182
final boolean append = rowSqn == null;
final boolean existed = highSqn != null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean append and existed? Either rename or add a comment?


final Long oldSqn = append ? null : rowSqn;
final long newSqn = append ? (existed ? highSqn + 1 : 0) : oldSqn;
final long newSize = existed ? (append ? oldSize + 1 : oldSize) : 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does !append mean the newly added record overwritten previous record? Via upsert-key?

Those three lines above are not very readable, maybe they are lacking some comments? Maybe it would be better to express it via

if (append) { 
  if (existed) {
    // comment explaining
    oldSqn = ...
    newSqn = ...
    newSize = ...
  } else {
    // comment explaining
    ...
} else {
...

or

if (append && existed) {
    // comment explaining
    oldSqn = ...
    newSqn = ...
    newSize = ...
else if (...) {
...

?

Comment on lines +54 to +61
/**
* Add the given element using a normal (non-multi) set semantics: if a matching element exists
* already, replace it (the timestamp is updated).
*/
SizeChangeInfo add(T element, long timestamp) throws Exception;

/** Add the given element using a multi-set semantics, i.e. append. */
SizeChangeInfo append(T element, long timestamp) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both? 🤔

* @see org.apache.flink.api.common.state.ValueState
*/
@Internal
public class LinkedMultiSetState implements OrderedMultiSetState<RowData> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have test coverage for this quite complex class. Ditto for Value and Adaptive versions, so probably one test suite to be executed on different implementations with adaptive configured in a couple of different ways?

Plus some dedicated tests for switchover in Adaptive version?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants