From d6411c9ca29aedee87dffa2f598c772e135c507c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Fri, 24 Mar 2017 15:27:04 +0100 Subject: [PATCH 1/2] Fix caching in the Spark streaming, doing the cache update in the streaming context --- .../java/org/apache/beam/runners/spark/SparkRunner.java | 8 +------- .../streaming/SparkRunnerStreamingContextFactory.java | 7 +++---- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index fc5d4af3bd22..97532c4244e7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -38,7 +38,6 @@ import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; -import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; @@ -163,11 +162,6 @@ public SparkPipelineResult run(final Pipeline pipeline) { JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(), contextFactory); - // update cache candidates - translator = new StreamingTransformTranslator.Translator( - new TransformTranslator.Translator()); - updateCacheCandidates(pipeline, translator, contextFactory.getEvaluationContext()); - // Checkpoint aggregator/metrics values jssc.addStreamingListener( new JavaStreamingListenerWrapper( @@ -269,7 +263,7 @@ private void detectTranslationMode(Pipeline pipeline) { /** * Evaluator that update/populate the cache candidates. */ - private void updateCacheCandidates( + public static void updateCacheCandidates( Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index c298886f0ab2..ba924b0a9761 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -86,15 +86,14 @@ public JavaStreamingContext create() { pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); + // update cache candidates + SparkRunner.updateCacheCandidates(pipeline, translator, ctxt); + checkpoint(jssc); return jssc; } - public EvaluationContext getEvaluationContext() { - return this.ctxt; - } - private void checkpoint(JavaStreamingContext jssc) { Path rootCheckpointPath = checkpointDir.getRootCheckpointDir(); Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir(); From d25d73b092ee58f9f11a9370e6229c17b957104c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Fri, 24 Mar 2017 15:39:13 +0100 Subject: [PATCH 2/2] Fix order of call --- .../streaming/SparkRunnerStreamingContextFactory.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index ba924b0a9761..98521e931522 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -83,11 +83,10 @@ public JavaStreamingContext create() { SparkRunner.initAccumulators(options, jsc); ctxt = new EvaluationContext(jsc, pipeline, jssc); - pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); - ctxt.computeOutputs(); - // update cache candidates SparkRunner.updateCacheCandidates(pipeline, translator, ctxt); + pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); + ctxt.computeOutputs(); checkpoint(jssc);