From 63966a3b0fe452528317a70ed4370ef0ed42606e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A1=BE=E7=BF=94?= Date: Fri, 18 Jun 2021 21:45:33 +0800 Subject: [PATCH 1/3] bugfix: if namespace not match namespaceSerializer --- .../runtime/state/CompositeKeySerializationUtils.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java index 8088c19ec68e7..8e1e80e8f6e5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java @@ -68,6 +68,7 @@ public static N readNamespace( return namespace; } + public static void writeNameSpace( N namespace, TypeSerializer namespaceSerializer, @@ -76,8 +77,13 @@ public static void writeNameSpace( throws IOException { int beforeWrite = keySerializationDataOutputView.length(); - namespaceSerializer.serialize(namespace, keySerializationDataOutputView); + // If the passed namespaceSerializer does not match the namespace, it should not serialize and should not write backendState + // Sharing state between trigger and TimeWindow causes inconsistencies in the namespace and NamespaceSerializer + if ((namespaceSerializer instanceof VoidNamespaceSerializer) && !(namespace instanceof VoidNamespace)){ + return; + } + namespaceSerializer.serialize(namespace, keySerializationDataOutputView); if (ambiguousKeyPossible) { // write length of namespace writeLengthFrom(beforeWrite, keySerializationDataOutputView); From 0e07e3bb459a4cc64d49868dae1eef8f8043a68e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A1=BE=E7=BF=94?= Date: Fri, 18 Jun 2021 21:47:14 +0800 Subject: [PATCH 2/3] bugfix: if namespace not match namespaceSerializer --- .../flink/runtime/state/CompositeKeySerializationUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java index 8e1e80e8f6e5d..6876ae4cabaf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.DataOutputView; + import javax.annotation.Nonnull; import java.io.IOException; @@ -68,7 +69,6 @@ public static N readNamespace( return namespace; } - public static void writeNameSpace( N namespace, TypeSerializer namespaceSerializer, From 6085b34b132a9fc6ea7352f8ea911bf8fe8ae4d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A1=BE=E7=BF=94?= Date: Fri, 18 Jun 2021 21:48:27 +0800 Subject: [PATCH 3/3] bugfix: Remove empty lines --- .../flink/runtime/state/CompositeKeySerializationUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java index 6876ae4cabaf2..4d89c2b58e4ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java @@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.DataOutputView; - import javax.annotation.Nonnull; import java.io.IOException;