From 774d3b62a741ae892041a7ff40d7f78b3f6b2f3f Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Wed, 14 Dec 2016 15:19:39 +0200 Subject: [PATCH 1/2] [BEAM-1144] Spark runner fails to deserialize MicrobatchSource in cluster mode --- runners/spark/pom.xml | 11 +-- .../coders/BeamSparkRunnerRegistrator.java | 2 +- .../ObjectInputStreamWithClassLoader.java | 46 ++++++++++++ .../spark/coders/StatelessJavaSerializer.java | 72 +++++++++++++++++++ .../BeamSparkRunnerRegistratorTest.java | 6 +- 5 files changed, 124 insertions(+), 13 deletions(-) create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ObjectInputStreamWithClassLoader.java create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index e8fffa2b91e3a..9ae69bd763270 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -134,14 +134,11 @@ ${hadoop.version} provided - com.esotericsoftware.kryo kryo - 2.21.1 + 2.21 + provided com.google.code.findbugs @@ -396,10 +393,6 @@ com.google.thirdparty org.apache.beam.spark.relocated.com.google.thirdparty - - com.esotericsoftware.kryo - org.apache.beam.spark.relocated.com.esotericsoftware.kryo - true spark-app diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java index 41b0a0198f6c5..93217b7ae8876 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java @@ -40,7 +40,7 @@ public class BeamSparkRunnerRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { for (Class clazz : ClassesForJavaSerialization.getClasses()) { - kryo.register(clazz, new JavaSerializer()); + kryo.register(clazz, new StatelessJavaSerializer()); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ObjectInputStreamWithClassLoader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ObjectInputStreamWithClassLoader.java new file mode 100644 index 0000000000000..f6af08216778c --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ObjectInputStreamWithClassLoader.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.coders; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; + + +/** + * ObjectInputStream with specific ClassLoader. + */ +class ObjectInputStreamWithClassLoader extends ObjectInputStream { + private final ClassLoader classLoader; + + ObjectInputStreamWithClassLoader(InputStream in, ClassLoader classLoader) throws IOException { + super(in); + this.classLoader = classLoader; + } + + @Override + protected Class resolveClass(ObjectStreamClass desc) { + try { + return Class.forName(desc.getName(), false, classLoader); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not find class: " + desc.getName(), e); + } + } +} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java new file mode 100644 index 0000000000000..ed3d553808327 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.coders; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + + +/** + * Stateless Java Serializer. + * + *

+ * Solves state re-use issue in Kryo version 2.21 used in Spark 1.x + * See: + * https://issues.apache.org/jira/browse/SPARK-7708 + * https://github.com/EsotericSoftware/kryo/issues/312 + *

+ * + *

+ * Also, solves class loading issue in cluster caused by ${@link ObjectInputStream} + * by using ${@link ObjectInputStreamWithClassLoader} + * ${@link ObjectInputStream} uses the last user-defined class loader in the stack which can be the + * wrong class loader. + * This is a known Java issue and a similar solution is often used. + * See: + * https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala#L154 + * https://issues.apache.org/jira/browse/GROOVY-1627 + * https://github.com/spring-projects/spring-loaded/issues/107 + *

+ */ +class StatelessJavaSerializer extends Serializer { + @SuppressWarnings("unchecked") + public void write(Kryo kryo, Output output, Object object) { + try { + ObjectOutputStream objectStream = new ObjectOutputStream(output); + objectStream.writeObject(object); + objectStream.flush(); + } catch (Exception e) { + throw new KryoException("Error during Java serialization.", e); + } + } + + @SuppressWarnings("unchecked") + public Object read (Kryo kryo, Input input, Class type) { + try { + return new ObjectInputStreamWithClassLoader(input, kryo.getClassLoader()).readObject(); + } catch (Exception e) { + throw new KryoException("Error during Java deserialization.", e); + } + } +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java index e35301750cf61..301ba0b19f06d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.spark.coders; import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.serializers.JavaSerializer; + import com.google.common.collect.Iterables; import java.io.Serializable; import org.apache.beam.sdk.coders.Coder; @@ -49,9 +49,9 @@ public void testCodersAndSourcesRegistration() { for (Class clazz : classesForJavaSerialization) { Assert.assertThat("Registered serializer for class " + clazz.getName() - + " was not an instance of " + JavaSerializer.class.getName(), + + " was not an instance of " + StatelessJavaSerializer.class.getName(), kryo.getSerializer(clazz), - Matchers.instanceOf(JavaSerializer.class)); + Matchers.instanceOf(StatelessJavaSerializer.class)); } } } From a122af1dbd1dfc84b5850b5c38105db01591914b Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Wed, 14 Dec 2016 16:20:32 +0200 Subject: [PATCH 2/2] PR review changes --- .../ObjectInputStreamWithClassLoader.java | 46 ------------------- .../spark/coders/StatelessJavaSerializer.java | 25 ++++++++++ .../BeamSparkRunnerRegistratorTest.java | 2 +- 3 files changed, 26 insertions(+), 47 deletions(-) delete mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ObjectInputStreamWithClassLoader.java diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ObjectInputStreamWithClassLoader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ObjectInputStreamWithClassLoader.java deleted file mode 100644 index f6af08216778c..0000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ObjectInputStreamWithClassLoader.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.coders; - -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectStreamClass; - - -/** - * ObjectInputStream with specific ClassLoader. - */ -class ObjectInputStreamWithClassLoader extends ObjectInputStream { - private final ClassLoader classLoader; - - ObjectInputStreamWithClassLoader(InputStream in, ClassLoader classLoader) throws IOException { - super(in); - this.classLoader = classLoader; - } - - @Override - protected Class resolveClass(ObjectStreamClass desc) { - try { - return Class.forName(desc.getName(), false, classLoader); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Could not find class: " + desc.getName(), e); - } - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java index ed3d553808327..b29cf0c1cfa84 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java @@ -23,8 +23,12 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; + +import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; /** @@ -69,4 +73,25 @@ public Object read (Kryo kryo, Input input, Class type) { throw new KryoException("Error during Java deserialization.", e); } } + + /** + * ObjectInputStream with specific ClassLoader. + */ + private static class ObjectInputStreamWithClassLoader extends ObjectInputStream { + private final ClassLoader classLoader; + + ObjectInputStreamWithClassLoader(InputStream in, ClassLoader classLoader) throws IOException { + super(in); + this.classLoader = classLoader; + } + + @Override + protected Class resolveClass(ObjectStreamClass desc) { + try { + return Class.forName(desc.getName(), false, classLoader); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not find class: " + desc.getName(), e); + } + } + } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java index 301ba0b19f06d..0468cd06283cf 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java @@ -49,7 +49,7 @@ public void testCodersAndSourcesRegistration() { for (Class clazz : classesForJavaSerialization) { Assert.assertThat("Registered serializer for class " + clazz.getName() - + " was not an instance of " + StatelessJavaSerializer.class.getName(), + + " was not an instance of " + StatelessJavaSerializer.class.getName(), kryo.getSerializer(clazz), Matchers.instanceOf(StatelessJavaSerializer.class)); }