Skip to content

Commit

Permalink
Reintroduces DoFn.ProcessContinuation (Dataflow worker compatibility …
Browse files Browse the repository at this point in the history
…part)
  • Loading branch information
jkff committed Jun 16, 2017
1 parent dd9abc3 commit 71599e1
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ public interface OutputReceiver<T> {
@Experimental(Kind.SPLITTABLE_DO_FN)
public @interface UnboundedPerElement {}

/** Temporary, do not use. See https://issues.apache.org/jira/browse/BEAM-1904 */
public class ProcessContinuation {}

/**
* Finalize the {@link DoFn} construction to prepare for processing.
* This method should be called by runners before any processing methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
import net.bytebuddy.implementation.bytecode.constant.NullConstant;
import net.bytebuddy.implementation.bytecode.constant.TextConstant;
import net.bytebuddy.implementation.bytecode.member.FieldAccess;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
Expand Down Expand Up @@ -667,6 +668,11 @@ protected StackManipulation beforeDelegation(MethodDescription instrumentedMetho
}
return new StackManipulation.Compound(pushParameters);
}

@Override
protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
return new StackManipulation.Compound(NullConstant.INSTANCE, MethodReturn.REFERENCE);
}
}

private static class UserCodeMethodInvocation implements StackManipulation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ public interface DoFnInvoker<InputT, OutputT> {
* Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
*
* @param extra Factory for producing extra parameter objects (such as window), if necessary.
* @return {@code null} - see <a href="https://issues.apache.org/jira/browse/BEAM-1904">JIRA</a>
* tracking the complete removal of {@link DoFn.ProcessContinuation}.
*/
void invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);

/** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */
void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments);
Expand Down

0 comments on commit 71599e1

Please sign in to comment.