From 09acf872fba9f3a77ad57c35545b410aeee9477f Mon Sep 17 00:00:00 2001 From: Borisa Zivkovic Date: Mon, 8 May 2017 13:53:17 +0100 Subject: [PATCH] adding common method to extract singleton main input --- .../java/org/apache/beam/runners/apex/ApexRunner.java | 9 +++++---- .../construction/SingleInputOutputOverrideFactory.java | 9 +++++++++ .../SingleInputOutputOverrideFactoryTest.java | 3 ++- .../DirectGBKIntoKeyedWorkItemsOverrideFactory.java | 3 +-- .../runners/direct/DirectGroupByKeyOverrideFactory.java | 5 +++-- .../runners/flink/FlinkStreamingPipelineTranslator.java | 4 +++- .../apache/beam/runners/dataflow/BatchViewOverrides.java | 5 +++-- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 4 +++- .../runners/dataflow/PrimitiveParDoSingleFactory.java | 5 ++--- .../beam/runners/dataflow/ReshuffleOverrideFactory.java | 3 +-- .../beam/runners/dataflow/StreamingViewOverrides.java | 4 ++-- 11 files changed, 34 insertions(+), 20 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 366308ec6642..4ee91c1e9b78 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -23,6 +23,7 @@ import com.datatorrent.api.StreamingApplication; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -33,13 +34,13 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; + import org.apache.apex.api.EmbeddedAppLauncher; import org.apache.apex.api.Launcher; import org.apache.apex.api.Launcher.AppHandle; import org.apache.apex.api.Launcher.LaunchMode; import org.apache.beam.runners.apex.translation.ApexPipelineTranslator; import org.apache.beam.runners.core.construction.PTransformMatchers; -import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PrimitiveCreate; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.Pipeline; @@ -272,7 +273,7 @@ static class Factory GloballyAsSingletonView> transform) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), new StreamingCombineGloballyAsSingletonView<>(transform.getTransform())); } } @@ -337,7 +338,7 @@ static class Factory public PTransformReplacement, PCollectionView> getReplacementTransform( AppliedPTransform, PCollectionView, AsSingleton> transform) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), new StreamingViewAsSingleton<>(transform.getTransform())); } } @@ -372,7 +373,7 @@ static class Factory AppliedPTransform, PCollectionView>, AsIterable> transform) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), new StreamingViewAsIterable()); } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java index 7a59c1c1c80b..79e7b6bd1c70 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java @@ -19,8 +19,11 @@ package org.apache.beam.runners.core.construction; import java.util.Map; + +import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; @@ -39,4 +42,10 @@ public final Map mapOutputs( Map, PValue> outputs, OutputT newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } + + protected final PCollection getSingletonMainInput( + AppliedPTransform, ?, ?> transform) { + return PTransformReplacements.getSingletonMainInput(transform); + } + } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java index 80bb0a7a6f2c..d53e177ff36f 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.Map; + import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.testing.TestPipeline; @@ -62,7 +63,7 @@ public class SingleInputOutputOverrideFactoryTest implements Serializable { MapElements> transform) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), transform.getTransform()); } }; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index 64eecc852afd..dcc7354c1f93 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -19,7 +19,6 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems; -import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.values.KV; @@ -42,7 +41,7 @@ class DirectGBKIntoKeyedWorkItemsOverrideFactory GBKIntoKeyedWorkItems> transform) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), new DirectGroupByKey.DirectGroupByKeyOnly()); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java index c2eb5e72b842..55f72999e3df 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -36,7 +35,9 @@ final class DirectGroupByKeyOverrideFactory PCollection>, PCollection>>, GroupByKey> transform) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), new DirectGroupByKey<>(transform.getTransform())); } + + } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 35d1bcd19f68..f98bcb2c6561 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -18,8 +18,10 @@ package org.apache.beam.runners.flink; import com.google.common.collect.ImmutableList; + import java.util.List; import java.util.Map; + import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.PTransformReplacements; @@ -241,7 +243,7 @@ private ReflectiveOneToOneOverrideFactory( public PTransformReplacement, PCollection> getReplacementTransform( AppliedPTransform, PCollection, TransformT> transform) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), InstanceBuilder.ofType(replacement) .withArg(FlinkRunner.class, runner) .withArg( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index ecd0365ed88c..06cf897715cf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -29,6 +29,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -39,7 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.construction.PTransformReplacements; + import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.dataflow.internal.IsmFormat; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; @@ -1397,7 +1398,7 @@ static class BatchCombineGloballyAsSingletonViewFactory transform) { GloballyAsSingletonView combine = transform.getTransform(); return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), new BatchCombineGloballyAsSingletonView<>( runner, combine.getCombineFn(), combine.getFanout(), combine.getInsertDefault())); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5278a4a576d0..ddf987a4fb65 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -40,6 +40,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; + import java.io.File; import java.io.IOException; import java.io.PrintWriter; @@ -57,6 +58,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; + import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory; import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory; import org.apache.beam.runners.core.construction.PTransformMatchers; @@ -416,7 +418,7 @@ public PTransformReplacement, PCollection> getRepla .withArg( (Class) transform.getTransform().getClass(), transform.getTransform()) .build(); - return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), rep); + return PTransformReplacement.of(getSingletonMainInput(transform), rep); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java index 8611d3cad945..504549c1e17d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java @@ -19,10 +19,9 @@ package org.apache.beam.runners.dataflow; import java.util.List; + import org.apache.beam.runners.core.construction.ForwardingPTransform; -import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.DisplayData; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.DoFn; @@ -48,7 +47,7 @@ public class PrimitiveParDoSingleFactory SingleOutput> transform) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), new ParDoSingle<>(transform.getTransform())); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java index cd9378c60822..4ab9e063a1b2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow; -import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -50,7 +49,7 @@ class ReshuffleOverrideFactory AppliedPTransform>, PCollection>, Reshuffle> transform) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), + getSingletonMainInput(transform), new ReshuffleWithOnlyTrigger()); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java index 6c385d74085b..9f44cbe2d915 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.beam.runners.core.construction.PTransformReplacements; + import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.dataflow.DataflowRunner.StreamingPCollectionViewWriterFn; import org.apache.beam.sdk.coders.Coder; @@ -52,7 +52,7 @@ static class StreamingCreatePCollectionViewFactory StreamingCreatePCollectionView streamingView = new StreamingCreatePCollectionView<>(transform.getTransform().getView()); return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), streamingView); + getSingletonMainInput(transform), streamingView); } private static class StreamingCreatePCollectionView