diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java index 60acace17b89f..d240483f2b444 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/RegisteredKeyValueStateBackendMetaInfo.java @@ -138,6 +138,13 @@ public TypeSerializerSchemaCompatibility updateStateSerializer( return stateSerializerProvider.registerNewSerializerForRestoredState(newStateSerializer); } + @Nonnull + public TypeSerializerSchemaCompatibility updateNamespaceSerializer( + TypeSerializer newNamespaceSerializer) { + return namespaceSerializerProvider.registerNewSerializerForRestoredState( + newNamespaceSerializer); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 65eb52907d487..635d29b8c4194 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -445,6 +445,22 @@ private RegisteredKeyValueStateBackendMetaInfo updateRestored RegisteredKeyValueStateBackendMetaInfo restoredKvStateMetaInfo = oldStateInfo.f1; + // fetch current namespace serializer now because if it is incompatible, we can't access + // it anymore to improve the error message + TypeSerializer previousNamespaceSerializer = + restoredKvStateMetaInfo.getNamespaceSerializer(); + + TypeSerializerSchemaCompatibility s = + restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer); + if (s.isCompatibleAfterMigration() || s.isIncompatible()) { + throw new StateMigrationException( + "The new namespace serializer (" + + namespaceSerializer + + ") must be compatible with the old namespace serializer (" + + previousNamespaceSerializer + + ")."); + } + restoredKvStateMetaInfo.checkStateMetaInfo(stateDesc); // fetch current serializer now because if it is incompatible, we can't access it anymore to diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java index 48f431b220949..32841170a127b 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java @@ -265,4 +265,56 @@ void testKryoRestoreResilienceWithDifferentRegistrationOrder(@TempDir File newTm } snapshot1.get().discardState(); } + + @Test + void testStateNamespaceSerializerChanged() throws Exception { + MapStateDescriptor descriptor = + new MapStateDescriptor<>( + "testState", IntSerializer.INSTANCE, StringSerializer.INSTANCE); + + // set the old namespace serializer to IntSerializer.INSTANCE + MapState mapState = + keyedBackend.createState(1, IntSerializer.INSTANCE, descriptor); + + RunnableFuture> snapshot = + keyedBackend.snapshot( + 1L, + System.currentTimeMillis(), + env.getCheckpointStorageAccess() + .resolveCheckpointStorageLocation( + 1L, CheckpointStorageLocationReference.getDefault()), + CheckpointOptions.forCheckpointWithDefaultLocation()); + + if (!snapshot.isDone()) { + snapshot.run(); + } + SnapshotResult snapshotResult = snapshot.get(); + KeyedStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); + IOUtils.closeQuietly(keyedBackend); + keyedBackend.dispose(); + + FileSystem.initialize(new Configuration(), null); + Configuration configuration = new Configuration(); + ForStStateBackend forStStateBackend = + new ForStStateBackend().configure(configuration, null); + keyedBackend = + createKeyedStateBackend( + forStStateBackend, + env, + StringSerializer.INSTANCE, + Collections.singletonList(stateHandle)); + keyedBackend.setup(aec); + try { + + // change new NSSerializer to StringSerializer + keyedBackend.createState("String", StringSerializer.INSTANCE, descriptor); + fail("Expected a state migration exception."); + } catch (Exception e) { + if (CommonTestUtils.containsCause(e, StateMigrationException.class)) { + // StateMigrationException expected + } else { + throw e; + } + } + } }