From 3fd85c2c5e8c2329d05971f24f6e425b95ce2c79 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 19 May 2017 14:06:12 -0700 Subject: [PATCH 1/4] Remove Unused DoFnInfo methods --- .../beam/runners/dataflow/util/DoFnInfo.java | 25 ------------------- .../control/ProcessBundleHandlerTest.java | 2 +- 2 files changed, 1 insertion(+), 26 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index bd2742f147da..4a26795ec138 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -53,25 +53,6 @@ public static DoFnInfo forFn( doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); } - /** TODO: remove this when Dataflow worker uses the DoFn overload. */ - @Deprecated - @SuppressWarnings("unchecked") - public static DoFnInfo forFn( - Serializable doFn, - WindowingStrategy windowingStrategy, - Iterable> sideInputViews, - Coder inputCoder, - long mainOutput, - Map> outputMap) { - return forFn( - (DoFn) doFn, - windowingStrategy, - sideInputViews, - inputCoder, - mainOutput, - outputMap); - } - public DoFnInfo withFn(DoFn newFn) { return DoFnInfo.forFn(newFn, windowingStrategy, @@ -96,12 +77,6 @@ private DoFnInfo( this.outputMap = outputMap; } - /** TODO: remove this when Dataflow worker uses {@link #getDoFn}. */ - @Deprecated - public Serializable getFn() { - return doFn; - } - /** Returns the embedded function. */ public DoFn getDoFn() { return doFn; diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 748ffea1f13c..f40572843bee 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -340,7 +340,7 @@ public void testCreatingAndProcessingDoFn() throws Exception { new TestDoFn(), WindowingStrategy.globalDefault(), ImmutableList.of(), - STRING_CODER, + StringUtf8Coder.of(), mainOutputId, ImmutableMap.of( mainOutputId, TestDoFn.mainOutput, From 6698929a60e9f98fe90dbef9355acd39716788c5 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 19 May 2017 14:24:07 -0700 Subject: [PATCH 2/4] Extract the Main Input PCollection in ParDos --- .../runners/core/construction/ParDos.java | 21 +++++++++++++++++++ .../runners/core/construction/ParDosTest.java | 4 ++++ 2 files changed, 25 insertions(+) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java index b2b29df65dc0..eed3429b44ab 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java @@ -146,6 +146,27 @@ public static TupleTag getMainOutputTag(ParDoPayload payload) return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag(); } + public static RunnerApi.PCollection getMainInput( + RunnerApi.PTransform ptransform, Components components) throws IOException { + checkArgument( + ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN), + "Unexpected payload type %s", + ptransform.getSpec().getUrn()); + ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class); + String mainInputId = null; + for (Map.Entry input : ptransform.getInputsMap().entrySet()) { + if (!payload.getSideInputsMap().containsKey(input.getKey())) { + checkArgument( + mainInputId == null, + "Multiple non-side input inputs (Found %s and %s)", + mainInputId, + input.getValue()); + mainInputId = input.getValue(); + } + } + return components.getPcollectionsOrThrow(mainInputId); + } + // TODO: Implement private static StateSpec toProto(StateDeclaration state) { throw new UnsupportedOperationException("Not yet supported"); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java index 74edec15af81..b6f0b7d9d68e 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java @@ -149,6 +149,10 @@ public void toAndFromTransformProto() throws Exception { view.getWindowingStrategyInternal().fixDefaults())); assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal())); } + String mainInputId = components.registerPCollection(mainInput); + assertThat( + ParDos.getMainInput(protoTransform, protoComponents), + equalTo(protoComponents.getPcollectionsOrThrow(mainInputId))); } private static class DropElementsFn extends DoFn, Void> { From 1930e3c1b45049d5daadd4d1233093c9f539c830 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 19 May 2017 14:58:39 -0700 Subject: [PATCH 3/4] fixup! Extract the Main Input PCollection in ParDos --- .../beam/runners/core/construction/ParDos.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java index eed3429b44ab..28fd472f817f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java @@ -22,6 +22,8 @@ import com.google.auto.value.AutoValue; import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; @@ -153,17 +155,10 @@ public static RunnerApi.PCollection getMainInput( "Unexpected payload type %s", ptransform.getSpec().getUrn()); ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class); - String mainInputId = null; - for (Map.Entry input : ptransform.getInputsMap().entrySet()) { - if (!payload.getSideInputsMap().containsKey(input.getKey())) { - checkArgument( - mainInputId == null, - "Multiple non-side input inputs (Found %s and %s)", - mainInputId, - input.getValue()); - mainInputId = input.getValue(); - } - } + String mainInputId = + Iterables.getOnlyElement( + Sets.difference( + ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet())); return components.getPcollectionsOrThrow(mainInputId); } From cf12477895d76593de4d2f8616892b7b4f48467a Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 22 May 2017 09:08:25 -0700 Subject: [PATCH 4/4] fixup! Extract the Main Input PCollection in ParDos --- .../java/org/apache/beam/runners/core/construction/ParDos.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java index 28fd472f817f..93740719577e 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java @@ -159,7 +159,7 @@ public static RunnerApi.PCollection getMainInput( Iterables.getOnlyElement( Sets.difference( ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet())); - return components.getPcollectionsOrThrow(mainInputId); + return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId)); } // TODO: Implement