From 81c3f3cab7b897ed7aec384318e79ec638008f6c Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 3 May 2017 17:12:20 -0700 Subject: [PATCH 1/3] [BEAM-2165] Update Flink to support serializing/deserializing custom user types configured via Jackson modules --- runners/flink/pom.xml | 5 + .../utils/SerializedPipelineOptions.java | 16 ++- .../utils/SerializedPipelineOptionsTest.java | 114 ++++++++++++++++++ 3 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 runners/flink/src/test/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptionsTest.java diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index eb2b005ddfd3..41224545cf4f 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -255,6 +255,11 @@ jackson-annotations + + com.fasterxml.jackson.core + jackson-core + + com.fasterxml.jackson.core jackson-databind diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java index 2256bb1df6c8..f717fd76c5e6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -27,6 +28,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.common.ReflectHelpers; /** * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. @@ -42,7 +44,7 @@ public SerializedPipelineOptions(PipelineOptions options) { checkNotNull(options, "PipelineOptions must not be null."); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - new ObjectMapper().writeValue(baos, options); + createMapper().writeValue(baos, options); this.serializedOptions = baos.toByteArray(); } catch (Exception e) { throw new RuntimeException("Couldn't serialize PipelineOptions.", e); @@ -53,7 +55,7 @@ public SerializedPipelineOptions(PipelineOptions options) { public PipelineOptions getPipelineOptions() { if (pipelineOptions == null) { try { - pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class); IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); FileSystems.setDefaultConfigInWorkers(pipelineOptions); @@ -64,4 +66,14 @@ public PipelineOptions getPipelineOptions() { return pipelineOptions; } + + /** + * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing + * for user specified configuration injection into the ObjectMapper. This supports user custom + * types on {@link PipelineOptions}. + */ + private static ObjectMapper createMapper() { + return new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptionsTest.java new file mode 100644 index 000000000000..1ca856078aa0 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptionsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.utils; + +import static org.junit.Assert.assertEquals; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +/** + * Tests for {@link SerializedPipelineOptions}. + */ +public class SerializedPipelineOptionsTest { + /** PipelineOptions used to test auto registration of Jackson modules. */ + public interface JacksonIncompatibleOptions extends PipelineOptions { + JacksonIncompatible getJacksonIncompatible(); + void setJacksonIncompatible(JacksonIncompatible value); + } + + /** A Jackson {@link Module} to test auto-registration of modules. */ + @AutoService(Module.class) + public static class RegisteredTestModule extends SimpleModule { + public RegisteredTestModule() { + super("RegisteredTestModule"); + setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); + } + } + + /** A class which Jackson does not know how to serialize/deserialize. */ + public static class JacksonIncompatible { + private final String value; + public JacksonIncompatible(String value) { + this.value = value; + } + } + + /** A Jackson mixin used to add annotations to other classes. */ + @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) + @JsonSerialize(using = JacksonIncompatibleSerializer.class) + public static final class JacksonIncompatibleMixin {} + + /** A Jackson deserializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleDeserializer extends + JsonDeserializer { + + @Override + public JacksonIncompatible deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + return new JacksonIncompatible(jsonParser.readValueAs(String.class)); + } + } + + /** A Jackson serializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleSerializer extends JsonSerializer { + + @Override + public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + jsonGenerator.writeString(jacksonIncompatible.value); + } + } + + @Test + public void testSerializingPipelineOptionsWithCustomUserType() throws Exception { + PipelineOptions options = PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"") + .as(JacksonIncompatibleOptions.class); + SerializedPipelineOptions context = new SerializedPipelineOptions(options); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) { + outputStream.writeObject(context); + } + try (ObjectInputStream inputStream = + new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) { + SerializedPipelineOptions copy = (SerializedPipelineOptions) inputStream.readObject(); + assertEquals("testValue", + copy.getPipelineOptions().as(JacksonIncompatibleOptions.class) + .getJacksonIncompatible().value); + } + } +} From 7b30ebeac4b666d0ec1ec1b8fb70f344ba17d0e4 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Thu, 4 May 2017 07:08:45 -0700 Subject: [PATCH 2/3] fixup! Address PR comments. --- .../runners/flink/PipelineOptionsTest.java | 87 +++++++++++++ .../utils/SerializedPipelineOptionsTest.java | 114 ------------------ 2 files changed, 87 insertions(+), 114 deletions(-) delete mode 100644 runners/flink/src/test/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptionsTest.java diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 23740a135944..7c773b44c286 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -23,6 +23,23 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.Collections; import java.util.HashMap; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -200,4 +217,74 @@ public void processElement(ProcessContext c) throws Exception { c.getPipelineOptions().as(MyOptions.class).getTestOption()); } } + + /** PipelineOptions used to test auto registration of Jackson modules. */ + public interface JacksonIncompatibleOptions extends PipelineOptions { + JacksonIncompatible getJacksonIncompatible(); + void setJacksonIncompatible(JacksonIncompatible value); + } + + /** A Jackson {@link Module} to test auto-registration of modules. */ + @AutoService(Module.class) + public static class RegisteredTestModule extends SimpleModule { + public RegisteredTestModule() { + super("RegisteredTestModule"); + setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); + } + } + + /** A class which Jackson does not know how to serialize/deserialize. */ + public static class JacksonIncompatible { + private final String value; + public JacksonIncompatible(String value) { + this.value = value; + } + } + + /** A Jackson mixin used to add annotations to other classes. */ + @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) + @JsonSerialize(using = JacksonIncompatibleSerializer.class) + public static final class JacksonIncompatibleMixin {} + + /** A Jackson deserializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleDeserializer extends + JsonDeserializer { + + @Override + public JacksonIncompatible deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + return new JacksonIncompatible(jsonParser.readValueAs(String.class)); + } + } + + /** A Jackson serializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleSerializer extends JsonSerializer { + + @Override + public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + jsonGenerator.writeString(jacksonIncompatible.value); + } + } + + @Test + public void testSerializingPipelineOptionsWithCustomUserType() throws Exception { + String expectedValue = "testValue"; + PipelineOptions options = PipelineOptionsFactory + .fromArgs("--jacksonIncompatible=\""+ expectedValue + "\"") + .as(JacksonIncompatibleOptions.class); + SerializedPipelineOptions context = new SerializedPipelineOptions(options); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) { + outputStream.writeObject(context); + } + try (ObjectInputStream inputStream = + new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) { + SerializedPipelineOptions copy = (SerializedPipelineOptions) inputStream.readObject(); + assertEquals(expectedValue, + copy.getPipelineOptions().as(JacksonIncompatibleOptions.class) + .getJacksonIncompatible().value); + } + } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptionsTest.java deleted file mode 100644 index 1ca856078aa0..000000000000 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptionsTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.utils; - -import static org.junit.Assert.assertEquals; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.auto.service.AutoService; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Test; - -/** - * Tests for {@link SerializedPipelineOptions}. - */ -public class SerializedPipelineOptionsTest { - /** PipelineOptions used to test auto registration of Jackson modules. */ - public interface JacksonIncompatibleOptions extends PipelineOptions { - JacksonIncompatible getJacksonIncompatible(); - void setJacksonIncompatible(JacksonIncompatible value); - } - - /** A Jackson {@link Module} to test auto-registration of modules. */ - @AutoService(Module.class) - public static class RegisteredTestModule extends SimpleModule { - public RegisteredTestModule() { - super("RegisteredTestModule"); - setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); - } - } - - /** A class which Jackson does not know how to serialize/deserialize. */ - public static class JacksonIncompatible { - private final String value; - public JacksonIncompatible(String value) { - this.value = value; - } - } - - /** A Jackson mixin used to add annotations to other classes. */ - @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) - @JsonSerialize(using = JacksonIncompatibleSerializer.class) - public static final class JacksonIncompatibleMixin {} - - /** A Jackson deserializer for {@link JacksonIncompatible}. */ - public static class JacksonIncompatibleDeserializer extends - JsonDeserializer { - - @Override - public JacksonIncompatible deserialize(JsonParser jsonParser, - DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - return new JacksonIncompatible(jsonParser.readValueAs(String.class)); - } - } - - /** A Jackson serializer for {@link JacksonIncompatible}. */ - public static class JacksonIncompatibleSerializer extends JsonSerializer { - - @Override - public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException, JsonProcessingException { - jsonGenerator.writeString(jacksonIncompatible.value); - } - } - - @Test - public void testSerializingPipelineOptionsWithCustomUserType() throws Exception { - PipelineOptions options = PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"") - .as(JacksonIncompatibleOptions.class); - SerializedPipelineOptions context = new SerializedPipelineOptions(options); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) { - outputStream.writeObject(context); - } - try (ObjectInputStream inputStream = - new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) { - SerializedPipelineOptions copy = (SerializedPipelineOptions) inputStream.readObject(); - assertEquals("testValue", - copy.getPipelineOptions().as(JacksonIncompatibleOptions.class) - .getJacksonIncompatible().value); - } - } -} From 6cd38320e090465fa791e61215c1a9bd961fd9e6 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Thu, 4 May 2017 07:12:57 -0700 Subject: [PATCH 3/3] fixup! Checkstyle --- .../java/org/apache/beam/runners/flink/PipelineOptionsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 7c773b44c286..7519dbf9d5c1 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -271,7 +271,7 @@ public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jso public void testSerializingPipelineOptionsWithCustomUserType() throws Exception { String expectedValue = "testValue"; PipelineOptions options = PipelineOptionsFactory - .fromArgs("--jacksonIncompatible=\""+ expectedValue + "\"") + .fromArgs("--jacksonIncompatible=\"" + expectedValue + "\"") .as(JacksonIncompatibleOptions.class); SerializedPipelineOptions context = new SerializedPipelineOptions(options);