From d0fe004db98b4f1743939f357da4193bc3759f77 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Mon, 2 Jan 2017 14:42:47 +0200 Subject: [PATCH] [BEAM-1146] Decrease spark runner startup overhead --- runners/spark/pom.xml | 5 -- .../coders/BeamSparkRunnerRegistrator.java | 48 +--------------- .../BeamSparkRunnerRegistratorTest.java | 57 ------------------- .../streaming/KafkaStreamingTest.java | 57 ++++++++++++++++++- 4 files changed, 59 insertions(+), 108 deletions(-) delete mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index dad5718be894..d9d45ddc48a8 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -247,11 +247,6 @@ metrics-core ${dropwizard.metrics.version} - - org.reflections - reflections - 0.9.10 - diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java index 93217b7ae887..9d63ab063b47 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java @@ -19,17 +19,8 @@ package org.apache.beam.runners.spark.coders; import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.Arrays; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.Source; +import org.apache.beam.runners.spark.io.MicrobatchSource; import org.apache.spark.serializer.KryoRegistrator; -import org.reflections.Reflections; /** @@ -39,40 +30,7 @@ public class BeamSparkRunnerRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { - for (Class clazz : ClassesForJavaSerialization.getClasses()) { - kryo.register(clazz, new StatelessJavaSerializer()); - } - } - - /** - * Register coders and sources with {@link JavaSerializer} since they aren't guaranteed to be - * Kryo-serializable. - */ - private static class ClassesForJavaSerialization { - private static final Class[] CLASSES_FOR_JAVA_SERIALIZATION = new Class[]{ - Coder.class, Source.class - }; - - private static final Iterable> INSTANCE; - - /** - * Find all subclasses of ${@link CLASSES_FOR_JAVA_SERIALIZATION} - */ - static { - final Reflections reflections = new Reflections(); - INSTANCE = Iterables.concat(Lists.transform(Arrays.asList(CLASSES_FOR_JAVA_SERIALIZATION), - new Function>>() { - @SuppressWarnings({"unchecked", "ConstantConditions"}) - @Nullable - @Override - public Set> apply(@Nullable Class clazz) { - return reflections.getSubTypesOf(clazz); - } - })); - } - - static Iterable> getClasses() { - return INSTANCE; - } + // MicrobatchSource is serialized as data and may not be Kryo-serializable. + kryo.register(MicrobatchSource.class, new StatelessJavaSerializer()); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java deleted file mode 100644 index 0468cd06283c..000000000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.coders; - -import com.esotericsoftware.kryo.Kryo; - -import com.google.common.collect.Iterables; -import java.io.Serializable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.Source; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Test; -import org.reflections.Reflections; - - -/** - * BeamSparkRunnerRegistrator Test. - */ -public class BeamSparkRunnerRegistratorTest { - @Test - public void testCodersAndSourcesRegistration() { - BeamSparkRunnerRegistrator registrator = new BeamSparkRunnerRegistrator(); - - Reflections reflections = new Reflections(); - Iterable> classesForJavaSerialization = - Iterables.concat(reflections.getSubTypesOf(Coder.class), - reflections.getSubTypesOf(Source.class)); - - Kryo kryo = new Kryo(); - - registrator.registerClasses(kryo); - - for (Class clazz : classesForJavaSerialization) { - Assert.assertThat("Registered serializer for class " + clazz.getName() - + " was not an instance of " + StatelessJavaSerializer.class.getName(), - kryo.getSerializer(clazz), - Matchers.instanceOf(StatelessJavaSerializer.class)); - } - } -} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 6be92d050b6f..0853e9f8878a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -17,8 +17,13 @@ */ package org.apache.beam.runners.spark.translation.streaming; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectStreamException; +import java.io.OutputStream; +import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.Map; @@ -30,6 +35,9 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; @@ -52,6 +60,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; + /** * Test Kafka as input. */ @@ -163,7 +172,7 @@ public void testLatest() throws Exception { .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList()) .withTopics(Collections.singletonList(topic)) .withKeyCoder(StringUtf8Coder.of()) - .withValueCoder(StringUtf8Coder.of()) + .withValueCoder(NonKryoSerializableStringCoder.of()) .updateConsumerProperties(consumerProps); PCollection formatted = @@ -212,4 +221,50 @@ public void processElement(ProcessContext c) { } } + /** + * This coder is not Kryo serializable, used to make sure + * {@link org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator} registers needed + * classes to ensure Java serialization is used instead. + */ + private static class NonKryoSerializableStringCoder extends CustomCoder + implements Serializable { + private Coder stringCoder; + private Boolean isSerialized = false; + + private NonKryoSerializableStringCoder() { + } + + @JsonCreator + public static NonKryoSerializableStringCoder of() { + return new NonKryoSerializableStringCoder(); + } + + private Object readResolve() throws ObjectStreamException { + NonKryoSerializableStringCoder deserialized = new NonKryoSerializableStringCoder(); + deserialized.stringCoder = StringUtf8Coder.of(); + deserialized.isSerialized = true; + return deserialized; + } + + private Object writeReplace() throws ObjectStreamException { + return new NonKryoSerializableStringCoder(); + } + + @Override + public void encode(String value, OutputStream outStream, Context context) + throws CoderException, IOException { + if (!isSerialized) { + this.stringCoder = StringUtf8Coder.of(); + } + stringCoder.encode(value, outStream, context); + } + + @Override + public String decode(InputStream inStream, Context context) throws CoderException, IOException { + if (!isSerialized) { + this.stringCoder = StringUtf8Coder.of(); + } + return stringCoder.decode(inStream, context); + } + } }