Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ public StateChangeInfo<RowData> add(RowData row, long timestamp) throws Exceptio
isNewRowKey
? new Node(row, newSqn, highSqn, null, null, timestamp)
: sqnToNodeState.get(oldSqn).withRow(row, timestamp));
highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize));
if (isNewRowKey) {
highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize));
rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn));
if (!isNewContextKey) {
sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,33 @@ public void testAdd() throws Exception {
});
}

@TestTemplate
public void testAddAfterReplacingNonHighestRow() throws Exception {
// Replacing an existing row by key must not regress highestSqnAndSizeState.highSqn.
// Otherwise, the next genuinely new row reuses an SQN already taken by an existing row,
// overwriting its node in sqnToNodeState.
runTest(
state -> {
state.add(row("k1", "v1"), 1L); // SQN 0
state.add(row("k2", "v2"), 2L); // SQN 1
state.add(row("k3", "v3"), 3L); // SQN 2, highSqn = 2

// Replace a non-highest-SQN row. Pre-fix, this would regress highSqn from 2
// to k1's SQN (0).
state.add(row("k1", "v1-updated"), 4L);

// New row must get SQN 3, not collide with k2's SQN 1.
state.add(row("k4", "v4"), 5L);

assertStateContents(
state,
Tuple2.of(row("k1", "v1-updated"), 4L),
Tuple2.of(row("k2", "v2"), 2L),
Tuple2.of(row("k3", "v3"), 3L),
Tuple2.of(row("k4", "v4"), 5L));
});
}

@TestTemplate
public void testRemove() throws Exception {
runTest(
Expand Down