From fa3a5abbc94db629feae8d7d73a31e7dda06bf76 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 7 Jun 2017 14:35:09 -0700 Subject: [PATCH] Use dehydration-insensitive APIs in ParDoEvaluatorFactory --- .../runners/direct/ParDoEvaluatorFactory.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 516f798aba97..408a7df9ebbb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; @@ -63,15 +64,8 @@ public DoFnLifecycleManager load(DoFn key) throws Exception { public TransformEvaluator forApplication( AppliedPTransform application, CommittedBundle inputBundle) throws Exception { - @SuppressWarnings("unchecked") - AppliedPTransform, PCollectionTuple, ParDo.MultiOutput> - parDoApplication = - (AppliedPTransform< - PCollection, PCollectionTuple, ParDo.MultiOutput>) - application; - - ParDo.MultiOutput transform = parDoApplication.getTransform(); - final DoFn doFn = transform.getFn(); + final DoFn doFn = + (DoFn) ParDoTranslation.getDoFn(application); @SuppressWarnings({"unchecked", "rawtypes"}) TransformEvaluator evaluator = @@ -81,9 +75,9 @@ public TransformEvaluator forApplication( (PCollection) inputBundle.getPCollection(), inputBundle.getKey(), doFn, - transform.getSideInputs(), - transform.getMainOutputTag(), - transform.getAdditionalOutputTags().getAll()); + ParDoTranslation.getSideInputs(application), + (TupleTag) ParDoTranslation.getMainOutputTag(application), + ParDoTranslation.getAdditionalOutputTags(application).getAll()); return evaluator; }