From cccc1341ec82f5503b8f8aa2aba7dc67a76d2287 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Mon, 18 May 2026 10:54:32 +0300 Subject: [PATCH 1/4] Fix PipelineOptions display-data NPE --- ...stCommit_Python_ValidatesRunner_Flink.json | 2 +- .../sdk/options/PipelineOptionsFactory.java | 10 +++++++--- .../options/PipelineOptionsFactoryTest.java | 19 +++++++++++++++++++ 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json index b08e2bb81150..c2c0a6b5f968 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json @@ -1,5 +1,5 @@ { "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support", "https://github.com/apache/beam/pull/34830": "testing", - "trigger-2026-04-04": "portable_runner expand_sdf opt-in" + "trigger-2026-05-18": "portable_runner expand_sdf opt-in" } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index ac76a57b6b07..10dd0a73b8a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -1746,7 +1746,7 @@ private static JsonDeserializer computeDeserializerForMethod(Method meth TypeDeserializer typeDeserializer = context.getFactory().findTypeDeserializer(context.getConfig(), prop.getType()); - if (typeDeserializer != null) { + if (typeDeserializer != null && jsonDeserializer != null) { jsonDeserializer = new TypeWrappedDeserializer(typeDeserializer, jsonDeserializer); } @@ -1801,10 +1801,14 @@ static Object deserializeNode(JsonNode node, Method method) throws IOException { return null; } + JsonDeserializer jsonDeserializer = getDeserializerForMethod(method); + if (jsonDeserializer == null) { + JavaType javaType = MAPPER.constructType(method.getGenericReturnType()); + return MAPPER.treeToValue(node, javaType); + } + JsonParser parser = new TreeTraversingParser(node, MAPPER); parser.nextToken(); - - JsonDeserializer jsonDeserializer = getDeserializerForMethod(method); return jsonDeserializer.deserialize(parser, DESERIALIZATION_CONTEXT.copy()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 2755ade271dd..40396ee23707 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -74,6 +74,7 @@ import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.InterceptingUrlClassLoader; import org.apache.beam.sdk.testing.RestoreSystemProperties; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Collections2; @@ -1130,6 +1131,24 @@ public void testPolymorphicType() { assertEquals(PolymorphicTypeTwo.class, options.getObjectValue().get().getClass()); } + /** + * Regression test for display-data population after JSON round-trip. With Jackson 2.18+, {@link + * PipelineOptionsFactory#computeDeserializerForMethod(Method)} can return null for polymorphic + * option types; {@link DisplayData#from} must not NPE when re-deserializing jsonOptions. + */ + @Test + public void testDisplayDataFromDeserializedPolymorphicOption() throws Exception { + PolymorphicTypes options = + PipelineOptionsFactory.fromArgs("--object={\"key\":\"value\",\"@type\":\"one\"}") + .as(PolymorphicTypes.class); + ObjectMapper mapper = + new ObjectMapper() + .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + PipelineOptions deserialized = + mapper.readValue(mapper.writeValueAsString(options), PipelineOptions.class); + DisplayData.from(deserialized); + } + @Test public void testMissingArgument() { String[] args = new String[] {}; From 54a44f37b56e0f2dac839b6c2b225d7a64ae208f Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Mon, 18 May 2026 12:31:29 +0300 Subject: [PATCH 2/4] Fix deserializeNode NPE --- ...am_PostCommit_Python_ValidatesRunner_Flink.json | 2 +- .../beam/sdk/options/PipelineOptionsFactory.java | 14 ++++++++++---- .../sdk/options/PipelineOptionsFactoryTest.java | 6 +++--- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json index c2c0a6b5f968..1fdefde8600e 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json @@ -1,5 +1,5 @@ { "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support", "https://github.com/apache/beam/pull/34830": "testing", - "trigger-2026-05-18": "portable_runner expand_sdf opt-in" + "trigger-2026-05-18": "Fix deserializeNode NPE" } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 10dd0a73b8a1..7e702dc7a7b7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -1801,15 +1801,21 @@ static Object deserializeNode(JsonNode node, Method method) throws IOException { return null; } + JavaType javaType = MAPPER.constructType(method.getGenericReturnType()); JsonDeserializer jsonDeserializer = getDeserializerForMethod(method); if (jsonDeserializer == null) { - JavaType javaType = MAPPER.constructType(method.getGenericReturnType()); return MAPPER.treeToValue(node, javaType); } - JsonParser parser = new TreeTraversingParser(node, MAPPER); - parser.nextToken(); - return jsonDeserializer.deserialize(parser, DESERIALIZATION_CONTEXT.copy()); + try { + JsonParser parser = new TreeTraversingParser(node, MAPPER); + parser.nextToken(); + return jsonDeserializer.deserialize(parser, DESERIALIZATION_CONTEXT.copy()); + } catch (NullPointerException e) { + // Jackson 2.18+ can yield a non-null contextual JsonDeserializer that still NPEs for some + // pipeline option shapes during display-data serialization (Flink ValidatesRunner Kafka). + return MAPPER.treeToValue(node, javaType); + } } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 40396ee23707..252140b527a7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -1132,9 +1132,9 @@ public void testPolymorphicType() { } /** - * Regression test for display-data population after JSON round-trip. With Jackson 2.18+, {@link - * PipelineOptionsFactory#computeDeserializerForMethod(Method)} can return null for polymorphic - * option types; {@link DisplayData#from} must not NPE when re-deserializing jsonOptions. + * Regression test for display-data population after JSON round-trip. The contextual + * JsonDeserializer path must not NPE without recovery; {@link DisplayData#from} exercises {@link + * PipelineOptionsFactory#deserializeNode} for jsonOptions. */ @Test public void testDisplayDataFromDeserializedPolymorphicOption() throws Exception { From 25b81912a413c6fb6aad275fccec5baa7fa4d715 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Mon, 18 May 2026 13:58:45 +0300 Subject: [PATCH 3/4] move construcutType --- .../org/apache/beam/sdk/options/PipelineOptionsFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 7e702dc7a7b7..2585fad6c4f6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -1801,9 +1801,9 @@ static Object deserializeNode(JsonNode node, Method method) throws IOException { return null; } - JavaType javaType = MAPPER.constructType(method.getGenericReturnType()); JsonDeserializer jsonDeserializer = getDeserializerForMethod(method); if (jsonDeserializer == null) { + JavaType javaType = MAPPER.constructType(method.getGenericReturnType()); return MAPPER.treeToValue(node, javaType); } @@ -1814,6 +1814,7 @@ static Object deserializeNode(JsonNode node, Method method) throws IOException { } catch (NullPointerException e) { // Jackson 2.18+ can yield a non-null contextual JsonDeserializer that still NPEs for some // pipeline option shapes during display-data serialization (Flink ValidatesRunner Kafka). + JavaType javaType = MAPPER.constructType(method.getGenericReturnType()); return MAPPER.treeToValue(node, javaType); } } From 76ec9cf9a7ca9c76d9d5177b89864f6f374c440d Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Mon, 18 May 2026 21:29:41 +0300 Subject: [PATCH 4/4] Use treeToValue for default PipelineOptions deserialization --- .../sdk/options/PipelineOptionsFactory.java | 25 ++++++++++++------- .../options/PipelineOptionsFactoryTest.java | 5 ++-- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 2585fad6c4f6..3175ee56935e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -1796,27 +1796,34 @@ private static JsonDeserializer getDeserializerForMethod(Method method) .orNull(); } + /** + * Returns true when the getter declares custom Jackson serde, per {@link PipelineOptions} + * validation ({@code @JsonSerialize} and {@code @JsonDeserialize} must appear together). + */ + private static boolean usesPropertyLevelJacksonSerde(Method method) { + return method.isAnnotationPresent(JsonSerialize.class) + && method.isAnnotationPresent(JsonDeserialize.class); + } + static Object deserializeNode(JsonNode node, Method method) throws IOException { if (node.isNull()) { return null; } - JsonDeserializer jsonDeserializer = getDeserializerForMethod(method); - if (jsonDeserializer == null) { + if (!usesPropertyLevelJacksonSerde(method)) { JavaType javaType = MAPPER.constructType(method.getGenericReturnType()); return MAPPER.treeToValue(node, javaType); } - try { - JsonParser parser = new TreeTraversingParser(node, MAPPER); - parser.nextToken(); - return jsonDeserializer.deserialize(parser, DESERIALIZATION_CONTEXT.copy()); - } catch (NullPointerException e) { - // Jackson 2.18+ can yield a non-null contextual JsonDeserializer that still NPEs for some - // pipeline option shapes during display-data serialization (Flink ValidatesRunner Kafka). + JsonDeserializer jsonDeserializer = getDeserializerForMethod(method); + if (jsonDeserializer == null) { JavaType javaType = MAPPER.constructType(method.getGenericReturnType()); return MAPPER.treeToValue(node, javaType); } + + JsonParser parser = new TreeTraversingParser(node, MAPPER); + parser.nextToken(); + return jsonDeserializer.deserialize(parser, DESERIALIZATION_CONTEXT.copy()); } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 252140b527a7..adb3f0e4dc0f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -1132,9 +1132,8 @@ public void testPolymorphicType() { } /** - * Regression test for display-data population after JSON round-trip. The contextual - * JsonDeserializer path must not NPE without recovery; {@link DisplayData#from} exercises {@link - * PipelineOptionsFactory#deserializeNode} for jsonOptions. + * Regression test for display-data population after JSON round-trip. {@link DisplayData#from} + * exercises {@link PipelineOptionsFactory#deserializeNode} for jsonOptions (treeToValue path). */ @Test public void testDisplayDataFromDeserializedPolymorphicOption() throws Exception {