diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index fc5d4af3bd22..97532c4244e7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -38,7 +38,6 @@ import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; -import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; @@ -163,11 +162,6 @@ public SparkPipelineResult run(final Pipeline pipeline) { JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(), contextFactory); - // update cache candidates - translator = new StreamingTransformTranslator.Translator( - new TransformTranslator.Translator()); - updateCacheCandidates(pipeline, translator, contextFactory.getEvaluationContext()); - // Checkpoint aggregator/metrics values jssc.addStreamingListener( new JavaStreamingListenerWrapper( @@ -269,7 +263,7 @@ private void detectTranslationMode(Pipeline pipeline) { /** * Evaluator that update/populate the cache candidates. */ - private void updateCacheCandidates( + public static void updateCacheCandidates( Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index c298886f0ab2..98521e931522 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -83,6 +83,8 @@ public JavaStreamingContext create() { SparkRunner.initAccumulators(options, jsc); ctxt = new EvaluationContext(jsc, pipeline, jssc); + // update cache candidates + SparkRunner.updateCacheCandidates(pipeline, translator, ctxt); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); @@ -91,10 +93,6 @@ public JavaStreamingContext create() { return jssc; } - public EvaluationContext getEvaluationContext() { - return this.ctxt; - } - private void checkpoint(JavaStreamingContext jssc) { Path rootCheckpointPath = checkpointDir.getRootCheckpointDir(); Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir();