Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public StateChangeInfo<RowData> add(RowData row, long timestamp) throws Exceptio
final Long highSqn = highSqnAndSize == null ? null : highSqnAndSize.highSqn;
final long oldSize = highSqnAndSize == null ? 0 : highSqnAndSize.size;
final RowSqnInfo rowSqnInfo = rowToSqnState.get(key);
final Long rowSqn = rowSqnInfo == null ? null : rowToSqnState.get(key).firstSqn;
final Long rowSqn = rowSqnInfo == null ? null : rowSqnInfo.firstSqn;
final boolean isNewRowKey = rowSqn == null; // it's a 1st such record 'row'
final boolean isNewContextKey = highSqn == null; // 1st a record for current context key

Expand Down Expand Up @@ -185,8 +185,8 @@ public StateChangeInfo<RowData> add(RowData row, long timestamp) throws Exceptio
isNewRowKey
? new Node(row, newSqn, highSqn, null, null, timestamp)
: sqnToNodeState.get(oldSqn).withRow(row, timestamp));
highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize));
if (isNewRowKey) {
highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize));
rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn));
if (!isNewContextKey) {
sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ public TypeSerializerSchemaCompatibility<RowDataKey> resolveSchemaCompatibility(
RowDataKeySerializerSnapshot old = (RowDataKeySerializerSnapshot) oldSerializerSnapshot;

TypeSerializerSchemaCompatibility<RowData> compatibility =
old.restoredRowDataSerializerSnapshot.resolveSchemaCompatibility(
old.serializer.serializer.snapshotConfiguration());
serializer
.serializer
.snapshotConfiguration()
.resolveSchemaCompatibility(old.restoredRowDataSerializerSnapshot);

return mapToOuterCompatibility(
compatibility,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,34 @@ public void testRemove() throws Exception {
});
}

/** Test that replacing a non-tail row preserves the ability to add new rows afterwards. */
@TestTemplate
public void testAddAfterReplacingNonTail() throws Exception {
runTest(
state -> {
state.add(row("k1", "v1"), 1L);
state.add(row("k2", "v2"), 2L);
state.add(row("k3", "v3"), 3L);

// replace k1 (not the tail) - should not corrupt highSqn
state.add(row("k1", "v1-updated"), 4L);
assertStateContents(
state,
Tuple2.of(row("k1", "v1-updated"), 4L),
Tuple2.of(row("k2", "v2"), 2L),
Tuple2.of(row("k3", "v3"), 3L));

// adding a new key after the replace should work correctly
state.add(row("k4", "v4"), 5L);
assertStateContents(
state,
Tuple2.of(row("k1", "v1-updated"), 4L),
Tuple2.of(row("k2", "v2"), 2L),
Tuple2.of(row("k3", "v3"), 3L),
Tuple2.of(row("k4", "v4"), 5L));
});
}

@TestTemplate
public void testAddAfterRemovingTail() throws Exception {
runTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@

import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
Expand All @@ -29,9 +34,16 @@
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.VarCharType;

import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Objects;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link RowDataKeySerializer}. */
public class RowDataKeySerializerTest extends SerializerTestBase<RowDataKey> {

Expand Down Expand Up @@ -62,6 +74,51 @@ protected RowDataKey[] getTestData() {
return new RowDataKey[] {new RowDataKey(StreamRecordUtils.row(123), equaliser, equaliser)};
}

@Test
void testResolveSchemaCompatibilityWithDifferentSchema() throws Exception {
// Create a snapshot from the "old" serializer (IntType only) and serialize it
RowDataKeySerializer oldSerializer =
new RowDataKeySerializer(
new RowDataSerializer(new IntType()),
equaliser,
equaliser,
EQUALISER,
HASH_FUNCTION);

byte[] serializedSnapshot;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
new DataOutputViewStreamWrapper(out), oldSerializer.snapshotConfiguration());
serializedSnapshot = out.toByteArray();
}

// Restore the "old" snapshot
TypeSerializerSnapshot<RowDataKey> restoredOldSnapshot;
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedSnapshot)) {
restoredOldSnapshot =
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
new DataInputViewStreamWrapper(in),
Thread.currentThread().getContextClassLoader());
}

// Create a "new" serializer with a different schema (IntType + VarCharType)
RowDataKeySerializer newSerializer =
new RowDataKeySerializer(
new RowDataSerializer(new IntType(), new VarCharType()),
equaliser,
equaliser,
EQUALISER,
HASH_FUNCTION);

// The new snapshot should detect incompatibility with the old snapshot
TypeSerializerSchemaCompatibility<RowDataKey> compatibility =
newSerializer
.snapshotConfiguration()
.resolveSchemaCompatibility(restoredOldSnapshot);

assertThat(compatibility.isIncompatible()).isTrue();
}

static final GeneratedRecordEqualiser EQUALISER =
new GeneratedRecordEqualiser("", "", new Object[0]) {

Expand Down