From 7e58f2bacaca32b66dd546b2b1f5d200cfbb2b8d Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 22 Jan 2017 14:30:44 +0200 Subject: [PATCH 1/5] [BEAM-648] Persist and restore Aggergator values in case of recovery from failure --- .../beam/runners/spark/SparkRunner.java | 10 +- .../aggregators/AccumulatorSingleton.java | 91 +++++++++++++++++-- .../spark/aggregators/SparkAggregators.java | 19 +++- .../ResumeFromCheckpointStreamingTest.java | 5 +- 4 files changed, 114 insertions(+), 11 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 92c07bb4b12b..9cade4f96bca 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource; @@ -54,6 +55,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.metrics.MetricsSystem; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,7 +132,8 @@ private SparkRunner(SparkPipelineOptions options) { } private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc) { - final Accumulator accum = SparkAggregators.getNamedAggregators(jsc); + final Accumulator accum = SparkAggregators.getOrCreateNamedAggregators(jsc, + opts.isStreaming(), opts.getCheckpointDir()); final NamedAggregators initialValue = accum.value(); if (opts.getEnableSparkMetricSinks()) { @@ -159,6 +162,11 @@ public SparkPipelineResult run(final Pipeline pipeline) { final JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), contextFactory); + // Checkpoint aggregator values + jssc.addStreamingListener( + new JavaStreamingListenerWrapper( + new AccumulatorSingleton.AccumulatorCheckpointingSparkListener())); + startPipeline = executorService.submit(new Runnable() { @Override diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java index 883830e5ce8e..d6f1106b9698 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java @@ -19,35 +19,114 @@ package org.apache.beam.runners.spark.aggregators; import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingListener; +import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * For resilience, {@link Accumulator}s are required to be wrapped in a Singleton. * @see accumulators */ -class AccumulatorSingleton { +public class AccumulatorSingleton { + private static final Logger LOG = LoggerFactory.getLogger(AccumulatorSingleton.class); + + private static final String ACCUMULATOR_CHECKPOINT_FILE = "beam_aggregators"; - private static volatile Accumulator instance = null; + private static volatile Accumulator instance; + private static volatile FileSystem fileSystem; + private static volatile Path checkpointPath; + private static volatile Path tempCheckpointPath; + private static volatile Path backupCheckpointPath; - static Accumulator getInstance(JavaSparkContext jsc) { + static Accumulator getInstance( + JavaSparkContext jsc, + boolean isStreaming, + String checkpointDir) { if (instance == null) { synchronized (AccumulatorSingleton.class) { if (instance == null) { - //TODO: currently when recovering from checkpoint, Spark does not recover the - // last known Accumulator value. The SparkRunner should be able to persist and recover - // the NamedAggregators in order to recover Aggregators as well. instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam()); + if (isStreaming) { + recoverValueFromCheckpoint(jsc, checkpointDir); + } } } } return instance; } + private static void recoverValueFromCheckpoint(JavaSparkContext jsc, String checkpointDir) { + FSDataInputStream is = null; + try { + checkpointPath = new Path(checkpointDir, ACCUMULATOR_CHECKPOINT_FILE); + tempCheckpointPath = checkpointPath.suffix(".tmp"); + backupCheckpointPath = checkpointPath.suffix(".bak"); + fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration()); + if (fileSystem.exists(checkpointPath)) { + is = fileSystem.open(checkpointPath); + } else if (fileSystem.exists(backupCheckpointPath)) { + is = fileSystem.open(backupCheckpointPath); + } + if (is != null) { + ObjectInputStream objectInputStream = new ObjectInputStream(is); + NamedAggregators recoveredValue = + (NamedAggregators) objectInputStream.readObject(); + objectInputStream.close(); + LOG.info("Recovered accumulators from checkpoint: " + recoveredValue); + instance.setValue(recoveredValue); + } else { + LOG.info("No accumulator checkpoint found."); + } + } catch (Exception e) { + throw new RuntimeException("Failure while reading accumulator checkpoint.", e); + } + } + + private static void checkpoint() throws IOException { + if (checkpointPath != null) { + if (fileSystem.exists(checkpointPath)) { + if (fileSystem.exists(backupCheckpointPath)) { + fileSystem.delete(backupCheckpointPath, false); + } + fileSystem.rename(checkpointPath, backupCheckpointPath); + } + FSDataOutputStream os = fileSystem.create(tempCheckpointPath, true); + ObjectOutputStream oos = new ObjectOutputStream(os); + oos.writeObject(instance.value()); + oos.close(); + fileSystem.rename(tempCheckpointPath, checkpointPath); + } + } + @VisibleForTesting static void clear() { synchronized (AccumulatorSingleton.class) { instance = null; } } + + /** + * Spark Listener which checkpoints {@link NamedAggregators} values for fault-tolerance. + */ + public static class AccumulatorCheckpointingSparkListener extends JavaStreamingListener { + @Override + public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { + try { + checkpoint(); + } catch (IOException e) { + LOG.error("Failed to checkpoint accumulator singleton.", e); + } + } + } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java index fa5c8d15c02a..fcba64764426 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java @@ -66,10 +66,25 @@ private static T valueOf(final Accumulator accum, * * @param jsc a Spark context to be used in order to retrieve the name * {@link NamedAggregators} instance - * @return a {@link NamedAggregators} instance */ public static Accumulator getNamedAggregators(JavaSparkContext jsc) { - return AccumulatorSingleton.getInstance(jsc); + return getOrCreateNamedAggregators(jsc, false, ""); + } + + /** + * Retrieves or creates the {@link NamedAggregators} instance using the provided Spark context. + * + * @param jsc a Spark context to be used in order to retrieve the name + * {@link NamedAggregators} instance + * @param isStreaming is streaming pipeline + * @param checkpointDir checkpoint dir + * @return a {@link NamedAggregators} instance + */ + public static Accumulator getOrCreateNamedAggregators( + JavaSparkContext jsc, + boolean isStreaming, + String checkpointDir) { + return AccumulatorSingleton.getInstance(jsc, isStreaming, checkpointDir); } /** diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 7346bd9c925e..828067288e5e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -81,6 +81,7 @@ public class ResumeFromCheckpointStreamingTest { ); private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"}; private static final long EXPECTED_AGG_FIRST = 4L; + private static final long EXPECTED_AGG_SECOND = 8L; @Rule public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @@ -141,8 +142,8 @@ public void testRun() throws Exception { res = runAgain(options); long processedMessages2 = res.getAggregatorValue("processedMessages", Long.class); assertThat(String.format("Expected %d processed messages count but " - + "found %d", EXPECTED_AGG_FIRST, processedMessages2), processedMessages2, - equalTo(EXPECTED_AGG_FIRST)); + + "found %d", EXPECTED_AGG_SECOND, processedMessages2), processedMessages2, + equalTo(EXPECTED_AGG_SECOND)); } private SparkPipelineResult runAgain(SparkPipelineOptions options) { From 4a47716dadb92b8845bdb1a795241ea301dc5e3e Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 29 Jan 2017 09:56:35 +0200 Subject: [PATCH 2/5] Changes after review --- .../beam/runners/spark/SparkRunner.java | 14 +++-- .../aggregators/AccumulatorSingleton.java | 18 ++++-- .../spark/aggregators/SparkAggregators.java | 12 ++-- .../translation/streaming/CheckpointDir.java | 56 +++++++++++++++++++ .../SparkRunnerStreamingContextFactory.java | 40 +++++++++---- 5 files changed, 114 insertions(+), 26 deletions(-) create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 9cade4f96bca..e6ac1a94ee3d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; import java.util.Collection; import java.util.List; @@ -33,6 +34,7 @@ import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; +import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; @@ -132,8 +134,10 @@ private SparkRunner(SparkPipelineOptions options) { } private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc) { - final Accumulator accum = SparkAggregators.getOrCreateNamedAggregators(jsc, - opts.isStreaming(), opts.getCheckpointDir()); + Optional maybeCheckpointDir = + opts.isStreaming() ? Optional.of(opts.getCheckpointDir()) : Optional.absent(); + final Accumulator accum = + SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir); final NamedAggregators initialValue = accum.value(); if (opts.getEnableSparkMetricSinks()) { @@ -157,10 +161,12 @@ public SparkPipelineResult run(final Pipeline pipeline) { detectTranslationMode(pipeline); if (mOptions.isStreaming()) { + CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir()); final SparkRunnerStreamingContextFactory contextFactory = - new SparkRunnerStreamingContextFactory(pipeline, mOptions); + new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir); final JavaStreamingContext jssc = - JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), contextFactory); + JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(), + contextFactory); // Checkpoint aggregator values jssc.addStreamingListener( diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java index d6f1106b9698..8407bdf47a99 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark.aggregators; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -41,7 +42,8 @@ public class AccumulatorSingleton { private static final Logger LOG = LoggerFactory.getLogger(AccumulatorSingleton.class); - private static final String ACCUMULATOR_CHECKPOINT_FILE = "beam_aggregators"; + private static final String BEAM_CHECKPOINT_DIR = "beam-checkpoint"; + private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "beam_aggregators"; private static volatile Accumulator instance; private static volatile FileSystem fileSystem; @@ -49,16 +51,16 @@ public class AccumulatorSingleton { private static volatile Path tempCheckpointPath; private static volatile Path backupCheckpointPath; + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") static Accumulator getInstance( JavaSparkContext jsc, - boolean isStreaming, - String checkpointDir) { + Optional checkpointDir) { if (instance == null) { synchronized (AccumulatorSingleton.class) { if (instance == null) { instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam()); - if (isStreaming) { - recoverValueFromCheckpoint(jsc, checkpointDir); + if (checkpointDir.isPresent()) { + recoverValueFromCheckpoint(jsc, checkpointDir.get()); } } } @@ -69,10 +71,14 @@ static Accumulator getInstance( private static void recoverValueFromCheckpoint(JavaSparkContext jsc, String checkpointDir) { FSDataInputStream is = null; try { - checkpointPath = new Path(checkpointDir, ACCUMULATOR_CHECKPOINT_FILE); + Path beamCheckpointPath = new Path(checkpointDir, BEAM_CHECKPOINT_DIR); + checkpointPath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME); tempCheckpointPath = checkpointPath.suffix(".tmp"); backupCheckpointPath = checkpointPath.suffix(".bak"); fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration()); + if (!fileSystem.exists(beamCheckpointPath)) { + fileSystem.mkdirs(beamCheckpointPath); + } if (fileSystem.exists(checkpointPath)) { is = fileSystem.open(checkpointPath); } else if (fileSystem.exists(backupCheckpointPath)) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java index fcba64764426..6a8aa85be7c5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark.aggregators; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import java.util.Collection; import java.util.Map; @@ -68,7 +69,7 @@ private static T valueOf(final Accumulator accum, * {@link NamedAggregators} instance */ public static Accumulator getNamedAggregators(JavaSparkContext jsc) { - return getOrCreateNamedAggregators(jsc, false, ""); + return getOrCreateNamedAggregators(jsc, Optional.absent()); } /** @@ -76,15 +77,14 @@ public static Accumulator getNamedAggregators(JavaSparkContext * * @param jsc a Spark context to be used in order to retrieve the name * {@link NamedAggregators} instance - * @param isStreaming is streaming pipeline - * @param checkpointDir checkpoint dir + * @param checkpointDir checkpoint dir (optional, for streaming pipelines) * @return a {@link NamedAggregators} instance */ + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public static Accumulator getOrCreateNamedAggregators( JavaSparkContext jsc, - boolean isStreaming, - String checkpointDir) { - return AccumulatorSingleton.getInstance(jsc, isStreaming, checkpointDir); + Optional checkpointDir) { + return AccumulatorSingleton.getInstance(jsc, checkpointDir); } /** diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java new file mode 100644 index 000000000000..f272c3a71e9f --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation.streaming; + +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Spark checkpoint dir tree. + */ +public class CheckpointDir { + private static final Logger LOG = LoggerFactory.getLogger(CheckpointDir.class); + + private static final String SPARK_CHECKPOINT_DIR = "spark-checkpoint"; + private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)"; + + private final Path rootCheckpointDir; + private final Path sparkCheckpointDir; + + public CheckpointDir(String rootCheckpointDir) { + if (!rootCheckpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) { + LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case " + + "of failures this job may not recover properly or even at all.", rootCheckpointDir); + } + LOG.info("Checkpoint dir set to: {}", rootCheckpointDir); + + this.rootCheckpointDir = new Path(rootCheckpointDir); + this.sparkCheckpointDir = new Path(rootCheckpointDir, SPARK_CHECKPOINT_DIR); + } + + public Path getRootCheckpointDir() { + return rootCheckpointDir; + } + + public Path getSparkCheckpointDir() { + return sparkCheckpointDir; + } +} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index d069a1121eb3..ead62c50e004 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import java.io.IOException; import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; @@ -28,6 +29,8 @@ import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.sdk.Pipeline; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -45,14 +48,18 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory { private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class); - private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)"; private final Pipeline pipeline; private final SparkPipelineOptions options; + private final CheckpointDir checkpointDir; - public SparkRunnerStreamingContextFactory(Pipeline pipeline, SparkPipelineOptions options) { + public SparkRunnerStreamingContextFactory( + Pipeline pipeline, + SparkPipelineOptions options, + CheckpointDir checkpointDir) { this.pipeline = pipeline; this.options = options; + this.checkpointDir = checkpointDir; } private EvaluationContext ctxt; @@ -73,18 +80,12 @@ public JavaStreamingContext create() { JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); + ctxt = new EvaluationContext(jsc, pipeline, jssc); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); - // set checkpoint dir. - String checkpointDir = options.getCheckpointDir(); - if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) { - LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case " - + "of failures this job may not recover properly or even at all.", checkpointDir); - } - LOG.info("Checkpoint dir set to: {}", checkpointDir); - jssc.checkpoint(checkpointDir); + checkpoint(jssc); // register listeners. for (JavaStreamingListener listener: options.as(SparkContextOptions.class).getListeners()) { @@ -95,6 +96,25 @@ public JavaStreamingContext create() { return jssc; } + private void checkpoint(JavaStreamingContext jssc) { + Path rootCheckpointPath = checkpointDir.getRootCheckpointDir(); + Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir(); + + try { + FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sc().hadoopConfiguration()); + if (!fileSystem.exists(rootCheckpointPath)) { + fileSystem.mkdirs(rootCheckpointPath); + } + if (!fileSystem.exists(sparkCheckpointPath)) { + fileSystem.mkdirs(sparkCheckpointPath); + } + } catch (IOException e) { + throw new RuntimeException("Failure while writing to checkpoint dir", e); + } + + jssc.checkpoint(sparkCheckpointPath.toString()); + } + public EvaluationContext getCtxt() { return ctxt; } From a1283fb9110541de60788ccf32e2cf4beb63fd83 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 29 Jan 2017 09:56:53 +0200 Subject: [PATCH 3/5] Removed unused method. --- .../streaming/SparkRunnerStreamingContextFactory.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index ead62c50e004..c6f6e3e69831 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -114,8 +114,4 @@ private void checkpoint(JavaStreamingContext jssc) { jssc.checkpoint(sparkCheckpointPath.toString()); } - - public EvaluationContext getCtxt() { - return ctxt; - } } From c4de47adcdc462088face6895272968f3600fd64 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Mon, 30 Jan 2017 10:30:05 +0200 Subject: [PATCH 4/5] Added javadoc and minor refactor --- .../org/apache/beam/runners/spark/SparkRunner.java | 5 +++-- .../spark/aggregators/AccumulatorSingleton.java | 10 ++++++---- .../runners/spark/aggregators/SparkAggregators.java | 5 +++-- .../spark/translation/streaming/CheckpointDir.java | 13 +++++++++++++ .../SparkRunnerStreamingContextFactory.java | 2 +- 5 files changed, 26 insertions(+), 9 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index e6ac1a94ee3d..578ed21019e4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -134,8 +134,9 @@ private SparkRunner(SparkPipelineOptions options) { } private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc) { - Optional maybeCheckpointDir = - opts.isStreaming() ? Optional.of(opts.getCheckpointDir()) : Optional.absent(); + Optional maybeCheckpointDir = + opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) + : Optional.absent(); final Accumulator accum = SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir); final NamedAggregators initialValue = accum.value(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java index 8407bdf47a99..ea1744147a16 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -42,7 +43,6 @@ public class AccumulatorSingleton { private static final Logger LOG = LoggerFactory.getLogger(AccumulatorSingleton.class); - private static final String BEAM_CHECKPOINT_DIR = "beam-checkpoint"; private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "beam_aggregators"; private static volatile Accumulator instance; @@ -54,7 +54,7 @@ public class AccumulatorSingleton { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") static Accumulator getInstance( JavaSparkContext jsc, - Optional checkpointDir) { + Optional checkpointDir) { if (instance == null) { synchronized (AccumulatorSingleton.class) { if (instance == null) { @@ -68,10 +68,12 @@ static Accumulator getInstance( return instance; } - private static void recoverValueFromCheckpoint(JavaSparkContext jsc, String checkpointDir) { + private static void recoverValueFromCheckpoint( + JavaSparkContext jsc, + CheckpointDir checkpointDir) { FSDataInputStream is = null; try { - Path beamCheckpointPath = new Path(checkpointDir, BEAM_CHECKPOINT_DIR); + Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); checkpointPath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME); tempCheckpointPath = checkpointPath.suffix(".tmp"); backupCheckpointPath = checkpointPath.suffix(".bak"); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java index 6a8aa85be7c5..245c69e7025e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; +import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; @@ -69,7 +70,7 @@ private static T valueOf(final Accumulator accum, * {@link NamedAggregators} instance */ public static Accumulator getNamedAggregators(JavaSparkContext jsc) { - return getOrCreateNamedAggregators(jsc, Optional.absent()); + return getOrCreateNamedAggregators(jsc, Optional.absent()); } /** @@ -83,7 +84,7 @@ public static Accumulator getNamedAggregators(JavaSparkContext @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public static Accumulator getOrCreateNamedAggregators( JavaSparkContext jsc, - Optional checkpointDir) { + Optional checkpointDir) { return AccumulatorSingleton.getInstance(jsc, checkpointDir); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java index f272c3a71e9f..5b192bda0cf7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark.translation.streaming; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,15 +26,22 @@ /** * Spark checkpoint dir tree. + * + * {@link SparkPipelineOptions} checkpointDir is used as a root directory under which one directory + * is created for Spark's checkpoint and another for Beam's Spark runner's fault tolerance needs. + * Spark's checkpoint relies on Hadoop's {@link org.apache.hadoop.fs.FileSystem} and is used for + * Beam as well rather than {@link org.apache.beam.sdk.io.FileSystem} to be consistent with Spark. */ public class CheckpointDir { private static final Logger LOG = LoggerFactory.getLogger(CheckpointDir.class); private static final String SPARK_CHECKPOINT_DIR = "spark-checkpoint"; + private static final String BEAM_CHECKPOINT_DIR = "beam-checkpoint"; private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)"; private final Path rootCheckpointDir; private final Path sparkCheckpointDir; + private final Path beamCheckpointDir; public CheckpointDir(String rootCheckpointDir) { if (!rootCheckpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) { @@ -44,6 +52,7 @@ public CheckpointDir(String rootCheckpointDir) { this.rootCheckpointDir = new Path(rootCheckpointDir); this.sparkCheckpointDir = new Path(rootCheckpointDir, SPARK_CHECKPOINT_DIR); + this.beamCheckpointDir = new Path(rootCheckpointDir, BEAM_CHECKPOINT_DIR); } public Path getRootCheckpointDir() { @@ -53,4 +62,8 @@ public Path getRootCheckpointDir() { public Path getSparkCheckpointDir() { return sparkCheckpointDir; } + + public Path getBeamCheckpointDir() { + return beamCheckpointDir; + } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index c6f6e3e69831..66f0187b5643 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -109,7 +109,7 @@ private void checkpoint(JavaStreamingContext jssc) { fileSystem.mkdirs(sparkCheckpointPath); } } catch (IOException e) { - throw new RuntimeException("Failure while writing to checkpoint dir", e); + throw new RuntimeException("Failed to create checkpoint dir", e); } jssc.checkpoint(sparkCheckpointPath.toString()); From ceda1696cd73e256b5a26d543427adde33c7f670 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Mon, 30 Jan 2017 15:06:03 +0200 Subject: [PATCH 5/5] Moved creation of beam checkpoint dir --- .../beam/runners/spark/aggregators/AccumulatorSingleton.java | 3 --- .../streaming/SparkRunnerStreamingContextFactory.java | 4 ++++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java index ea1744147a16..473750cb5718 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java @@ -78,9 +78,6 @@ private static void recoverValueFromCheckpoint( tempCheckpointPath = checkpointPath.suffix(".tmp"); backupCheckpointPath = checkpointPath.suffix(".bak"); fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration()); - if (!fileSystem.exists(beamCheckpointPath)) { - fileSystem.mkdirs(beamCheckpointPath); - } if (fileSystem.exists(checkpointPath)) { is = fileSystem.open(checkpointPath); } else if (fileSystem.exists(backupCheckpointPath)) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index 66f0187b5643..6d254e180d8c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -99,6 +99,7 @@ public JavaStreamingContext create() { private void checkpoint(JavaStreamingContext jssc) { Path rootCheckpointPath = checkpointDir.getRootCheckpointDir(); Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir(); + Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); try { FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sc().hadoopConfiguration()); @@ -108,6 +109,9 @@ private void checkpoint(JavaStreamingContext jssc) { if (!fileSystem.exists(sparkCheckpointPath)) { fileSystem.mkdirs(sparkCheckpointPath); } + if (!fileSystem.exists(beamCheckpointPath)) { + fileSystem.mkdirs(beamCheckpointPath); + } } catch (IOException e) { throw new RuntimeException("Failed to create checkpoint dir", e); }