From 92707b9a07b7bc367b375fb25293554f1de25d87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Tue, 7 Feb 2017 14:41:57 +0100 Subject: [PATCH 1/2] [BEAM-1405] Refactor to remove repeated code from test --- .../spark/ProvidedSparkContextTest.java | 70 ++++++++----------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index 298284453c39..00c894d6d99f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import com.google.common.collect.ImmutableSet; @@ -48,15 +49,6 @@ public class ProvidedSparkContextTest { private static final String PROVIDED_CONTEXT_EXCEPTION = "The provided Spark context was not created or was stopped"; - private SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) { - final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); - options.setRunner(SparkRunner.class); - options.setUsesProvidedSparkContext(true); - options.setProvidedSparkContext(jsc); - options.setEnableSparkMetricSinks(false); - return options; - } - /** * Provide a context and call pipeline run. * @throws Exception @@ -64,20 +56,7 @@ private SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) { @Test public void testWithProvidedContext() throws Exception { JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); - - SparkContextOptions options = getSparkContextOptions(jsc); - - Pipeline p = Pipeline.create(options); - PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder - .of())); - PCollection output = inputWords.apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())); - - PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - - // Run test from pipeline - p.run().waitUntilFinish(); - + testWithValidProvidedContext(jsc); jsc.stop(); } @@ -87,8 +66,22 @@ public void testWithProvidedContext() throws Exception { */ @Test public void testWithNullContext() throws Exception { - JavaSparkContext jsc = null; + testWithInvalidContext(null); + } + + /** + * A SparkRunner with a stopped provided Spark context cannot run pipelines. + * @throws Exception + */ + @Test + public void testWithStoppedProvidedContext() throws Exception { + JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); + // Stop the provided Spark context directly + jsc.stop(); + testWithInvalidContext(jsc); + } + private void testWithValidProvidedContext(JavaSparkContext jsc) throws Exception { SparkContextOptions options = getSparkContextOptions(jsc); Pipeline p = Pipeline.create(options); @@ -99,24 +92,11 @@ public void testWithNullContext() throws Exception { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - try { - p.run().waitUntilFinish(); - fail("Should throw an exception when The provided Spark context is null"); - } catch (RuntimeException e){ - assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); - } + // Run test from pipeline + p.run().waitUntilFinish(); } - /** - * A SparkRunner with a stopped provided Spark context cannot run pipelines. - * @throws Exception - */ - @Test - public void testWithStoppedProvidedContext() throws Exception { - JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); - // Stop the provided Spark context directly - jsc.stop(); - + private void testWithInvalidContext(JavaSparkContext jsc) { SparkContextOptions options = getSparkContextOptions(jsc); Pipeline p = Pipeline.create(options); @@ -129,10 +109,18 @@ public void testWithStoppedProvidedContext() throws Exception { try { p.run().waitUntilFinish(); - fail("Should throw an exception when The provided Spark context is stopped"); + fail("Should throw an exception when The provided Spark context is null or stopped"); } catch (RuntimeException e){ assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); } } + private static SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) { + final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); + options.setRunner(SparkRunner.class); + options.setUsesProvidedSparkContext(true); + options.setProvidedSparkContext(jsc); + options.setEnableSparkMetricSinks(false); + return options; + } } From 5edcdff9ac6aee552cb95fe46f32a9f57dd813e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Tue, 7 Feb 2017 14:43:11 +0100 Subject: [PATCH 2/2] [BEAM-1405] Skip stopping context when spark context is provided --- .../spark/translation/SparkContextFactory.java | 11 ++++++++--- .../beam/runners/spark/ProvidedSparkContextTest.java | 2 ++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 67839a808243..326838a26a35 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -40,17 +40,21 @@ public final class SparkContextFactory { */ static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext"; + // Spark allows only one context for JVM so this can be static. private static JavaSparkContext sparkContext; private static String sparkMaster; + private static boolean usesProvidedSparkContext; private SparkContextFactory() { } public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) { SparkContextOptions contextOptions = options.as(SparkContextOptions.class); + usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext(); // reuse should be ignored if the context is provided. if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) - && !contextOptions.getUsesProvidedSparkContext()) { + && !usesProvidedSparkContext) { + // if the context is null or stopped for some reason, re-create it. if (sparkContext == null || sparkContext.sc().isStopped()) { sparkContext = createSparkContext(contextOptions); @@ -67,13 +71,14 @@ public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions } public static synchronized void stopSparkContext(JavaSparkContext context) { - if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) { + if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) + && !usesProvidedSparkContext) { context.stop(); } } private static JavaSparkContext createSparkContext(SparkContextOptions contextOptions) { - if (contextOptions.getUsesProvidedSparkContext()) { + if (usesProvidedSparkContext) { LOG.info("Using a provided Spark Context"); JavaSparkContext jsc = contextOptions.getProvidedSparkContext(); if (jsc == null || jsc.sc().isStopped()){ diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index 00c894d6d99f..a4190a9dcaed 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -57,6 +57,8 @@ public class ProvidedSparkContextTest { public void testWithProvidedContext() throws Exception { JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); testWithValidProvidedContext(jsc); + // A provided context must not be stopped after execution + assertFalse(jsc.sc().isStopped()); jsc.stop(); }