diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java index 840fe007ebf84..9ab37d942003e 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java @@ -154,7 +154,7 @@ public StateChangeInfo add(RowData row, long timestamp) throws Exceptio final Long highSqn = highSqnAndSize == null ? null : highSqnAndSize.highSqn; final long oldSize = highSqnAndSize == null ? 0 : highSqnAndSize.size; final RowSqnInfo rowSqnInfo = rowToSqnState.get(key); - final Long rowSqn = rowSqnInfo == null ? null : rowToSqnState.get(key).firstSqn; + final Long rowSqn = rowSqnInfo == null ? null : rowSqnInfo.firstSqn; final boolean isNewRowKey = rowSqn == null; // it's a 1st such record 'row' final boolean isNewContextKey = highSqn == null; // 1st a record for current context key @@ -185,8 +185,8 @@ public StateChangeInfo add(RowData row, long timestamp) throws Exceptio isNewRowKey ? new Node(row, newSqn, highSqn, null, null, timestamp) : sqnToNodeState.get(oldSqn).withRow(row, timestamp)); - highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); if (isNewRowKey) { + highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn)); if (!isNewContextKey) { sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn)); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java index f4a690dbf5bd5..99bb6a03f2f79 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java @@ -115,8 +115,10 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( RowDataKeySerializerSnapshot old = (RowDataKeySerializerSnapshot) oldSerializerSnapshot; TypeSerializerSchemaCompatibility compatibility = - old.restoredRowDataSerializerSnapshot.resolveSchemaCompatibility( - old.serializer.serializer.snapshotConfiguration()); + serializer + .serializer + .snapshotConfiguration() + .resolveSchemaCompatibility(old.restoredRowDataSerializerSnapshot); return mapToOuterCompatibility( compatibility, diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java index b68638115cfb7..38f3c826d2156 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java @@ -229,6 +229,34 @@ public void testRemove() throws Exception { }); } + /** Test that replacing a non-tail row preserves the ability to add new rows afterwards. */ + @TestTemplate + public void testAddAfterReplacingNonTail() throws Exception { + runTest( + state -> { + state.add(row("k1", "v1"), 1L); + state.add(row("k2", "v2"), 2L); + state.add(row("k3", "v3"), 3L); + + // replace k1 (not the tail) - should not corrupt highSqn + state.add(row("k1", "v1-updated"), 4L); + assertStateContents( + state, + Tuple2.of(row("k1", "v1-updated"), 4L), + Tuple2.of(row("k2", "v2"), 2L), + Tuple2.of(row("k3", "v3"), 3L)); + + // adding a new key after the replace should work correctly + state.add(row("k4", "v4"), 5L); + assertStateContents( + state, + Tuple2.of(row("k1", "v1-updated"), 4L), + Tuple2.of(row("k2", "v2"), 2L), + Tuple2.of(row("k3", "v3"), 3L), + Tuple2.of(row("k4", "v4"), 5L)); + }); + } + @TestTemplate public void testAddAfterRemovingTail() throws Exception { runTest( diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java index f78faf9f34317..b59e13a0ded09 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java @@ -20,7 +20,12 @@ import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedHashFunction; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; @@ -29,9 +34,16 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.StreamRecordUtils; import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.util.Objects; +import static org.assertj.core.api.Assertions.assertThat; + /** Test for {@link RowDataKeySerializer}. */ public class RowDataKeySerializerTest extends SerializerTestBase { @@ -62,6 +74,51 @@ protected RowDataKey[] getTestData() { return new RowDataKey[] {new RowDataKey(StreamRecordUtils.row(123), equaliser, equaliser)}; } + @Test + void testResolveSchemaCompatibilityWithDifferentSchema() throws Exception { + // Create a snapshot from the "old" serializer (IntType only) and serialize it + RowDataKeySerializer oldSerializer = + new RowDataKeySerializer( + new RowDataSerializer(new IntType()), + equaliser, + equaliser, + EQUALISER, + HASH_FUNCTION); + + byte[] serializedSnapshot; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot( + new DataOutputViewStreamWrapper(out), oldSerializer.snapshotConfiguration()); + serializedSnapshot = out.toByteArray(); + } + + // Restore the "old" snapshot + TypeSerializerSnapshot restoredOldSnapshot; + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedSnapshot)) { + restoredOldSnapshot = + TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot( + new DataInputViewStreamWrapper(in), + Thread.currentThread().getContextClassLoader()); + } + + // Create a "new" serializer with a different schema (IntType + VarCharType) + RowDataKeySerializer newSerializer = + new RowDataKeySerializer( + new RowDataSerializer(new IntType(), new VarCharType()), + equaliser, + equaliser, + EQUALISER, + HASH_FUNCTION); + + // The new snapshot should detect incompatibility with the old snapshot + TypeSerializerSchemaCompatibility compatibility = + newSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(restoredOldSnapshot); + + assertThat(compatibility.isIncompatible()).isTrue(); + } + static final GeneratedRecordEqualiser EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]) {