From 25660affe84cb105ffb0c1315f16782be9a9b853 Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 20 Jul 2016 15:47:05 -0700 Subject: [PATCH] [BEAM-386] Remove StreamingCreate in DataflowRunner --- .../beam/runners/dataflow/DataflowRunner.java | 91 ------------------- 1 file changed, 91 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 05ddf45a1e32..8f9e76e0a90a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -46,7 +46,6 @@ import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.CoderException; @@ -88,7 +87,6 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -148,7 +146,6 @@ import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; -import org.joda.time.Duration; import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -329,7 +326,6 @@ public static DataflowRunner fromOptions(PipelineOptions options) { if (options.isStreaming()) { builder.put(Combine.GloballyAsSingletonView.class, StreamingCombineGloballyAsSingletonView.class); - builder.put(Create.Values.class, StreamingCreate.class); builder.put(View.AsMap.class, StreamingViewAsMap.class); builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class); builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class); @@ -2376,93 +2372,6 @@ public final PCollection apply(PInput input) { } } - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the - * Dataflow runner in streaming mode. - */ - private static class StreamingCreate extends PTransform> { - private final Create.Values transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingCreate(DataflowRunner runner, Create.Values transform) { - this.transform = transform; - } - - /** - * {@link DoFn} that outputs a single KV.of(null, null) kick off the {@link GroupByKey} - * in the streaming create implementation. - */ - private static class OutputNullKv extends DoFn> { - @Override - public void processElement(DoFn>.ProcessContext c) throws Exception { - c.output(KV.of((Void) null, (Void) null)); - } - } - - /** - * A {@link DoFn} which outputs the specified elements by first encoding them to bytes using - * the specified {@link Coder} so that they are serialized as part of the {@link DoFn} but - * need not implement {@code Serializable}. - */ - private static class OutputElements extends DoFn { - private final Coder coder; - private final List encodedElements; - - public OutputElements(Iterable elems, Coder coder) { - this.coder = coder; - this.encodedElements = new ArrayList<>(); - for (T t : elems) { - try { - encodedElements.add(CoderUtils.encodeToByteArray(coder, t)); - } catch (CoderException e) { - throw new IllegalArgumentException("Unable to encode value " + t - + " with coder " + coder, e); - } - } - } - - @Override - public void processElement(ProcessContext c) throws IOException { - for (byte[] encodedElement : encodedElements) { - c.output(CoderUtils.decodeFromByteArray(coder, encodedElement)); - } - } - } - - @Override - public PCollection apply(PInput input) { - try { - Coder coder = transform.getDefaultOutputCoder(input); - return Pipeline.applyTransform( - "StartingSignal", input, PubsubIO.Read.subscription("_starting_signal/")) - .apply(ParDo.of(new OutputNullKv())) - .apply("GlobalSingleton", Window.>into(new GlobalWindows()) - .triggering(AfterPane.elementCountAtLeast(1)) - .withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()) - .apply(GroupByKey.create()) - // Go back to the default windowing strategy, so that our setting allowed lateness - // doesn't count as the user having set it. - .setWindowingStrategyInternal(WindowingStrategy.globalDefault()) - .apply(Window.>>into(new GlobalWindows())) - .apply(ParDo.of(new OutputElements<>(transform.getElements(), coder))) - .setCoder(coder).setIsBoundedInternal(IsBounded.BOUNDED); - } catch (CannotProvideCoderException e) { - throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. " - + "Please set a coder by invoking Create.withCoder() explicitly.", e); - } - } - - @Override - protected String getKindString() { - return "StreamingCreate"; - } - } - /** * A specialized {@link DoFn} for writing the contents of a {@link PCollection} * to a streaming {@link PCollectionView} backend implementation.