Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,10 @@ public interface MultiOutputReceiver {
@Target(ElementType.PARAMETER)
public @interface Element {}

/** Parameter annotation for the input element timestamp for a {@link ProcessElement} method. */
/**
* Parameter annotation for the input element timestamp for {@link ProcessElement}, {@link
* GetInitialRestriction}, {@link SplitRestriction}, and {@link NewTracker} methods.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
Expand Down Expand Up @@ -723,7 +726,23 @@ public interface MultiOutputReceiver {
* Annotation for the method that maps an element to an initial restriction for a <a
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
*
* <p>Signature: {@code RestrictionT getInitialRestriction(InputT element);}
* <p>Signature: {@code RestrictionT getInitialRestriction(InputT element, <optional arguments>);}
*
* <p>The optional arguments are allowed to be:
*
* <ul>
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
* passed the timestamp of the current element being processed; the argument must be of type
* {@link Instant}.
* <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
* window of the current element. When applied by {@link ParDo} the subtype of {@link
* BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
* window is not accessed a runner may perform additional optimizations.
* <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information
* about the current triggering pane.
* <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
* options for the current pipeline.
* </ul>
*/
// TODO: Make the InputT parameter optional.
@Documented
Expand Down Expand Up @@ -788,7 +807,23 @@ public interface MultiOutputReceiver {
* be processed in parallel.
*
* <p>Signature: {@code void splitRestriction(InputT element, RestrictionT restriction,
* OutputReceiver<RestrictionT> receiver);}
* OutputReceiver<RestrictionT> receiver, <optional arguments>);}
*
* <p>The optional arguments are allowed to be:
*
* <ul>
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
* passed the timestamp of the current element being processed; the argument must be of type
* {@link Instant}.
* <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
* window of the current element. When applied by {@link ParDo} the subtype of {@link
* BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
* window is not accessed a runner may perform additional optimizations.
* <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information
* about the current triggering pane.
* <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
* options for the current pipeline.
* </ul>
*
* <p>Optional: if this method is omitted, the restriction will not be split (equivalent to
* defining the method and outputting the {@code restriction} unchanged).
Expand All @@ -804,8 +839,25 @@ public interface MultiOutputReceiver {
* Annotation for the method that creates a new {@link RestrictionTracker} for the restriction of
* a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
*
* <p>Signature: {@code MyRestrictionTracker newTracker(RestrictionT restriction);} where {@code
* MyRestrictionTracker} must be a subtype of {@code RestrictionTracker<RestrictionT>}.
* <p>Signature: {@code MyRestrictionTracker newTracker(RestrictionT restriction, <optional
* arguments>);} where {@code MyRestrictionTracker} must be a subtype of {@code
* RestrictionTracker<RestrictionT>}.
*
* <p>The optional arguments are allowed to be:
*
* <ul>
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
* passed the timestamp of the current element being processed; the argument must be of type
* {@link Instant}.
* <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
* window of the current element. When applied by {@link ParDo} the subtype of {@link
* BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
* window is not accessed a runner may perform additional optimizations.
* <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information
* about the current triggering pane.
* <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
* options for the current pipeline.
* </ul>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,37 +945,65 @@ static LifecycleMethod create(Method targetMethod) {

/** Describes a {@link DoFn.GetInitialRestriction} method. */
@AutoValue
public abstract static class GetInitialRestrictionMethod implements DoFnMethod {
public abstract static class GetInitialRestrictionMethod implements MethodWithExtraParameters {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();

/** Type of the returned restriction. */
public abstract TypeDescriptor<?> restrictionT();

static GetInitialRestrictionMethod create(Method targetMethod, TypeDescriptor<?> restrictionT) {
return new AutoValue_DoFnSignature_GetInitialRestrictionMethod(targetMethod, restrictionT);
/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();

/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();

static GetInitialRestrictionMethod create(
Method targetMethod,
TypeDescriptor<?> restrictionT,
TypeDescriptor<? extends BoundedWindow> windowT,
List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_GetInitialRestrictionMethod(
targetMethod, restrictionT, windowT, extraParameters);
}
}

/** Describes a {@link DoFn.SplitRestriction} method. */
@AutoValue
public abstract static class SplitRestrictionMethod implements DoFnMethod {
public abstract static class SplitRestrictionMethod implements MethodWithExtraParameters {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();

/** Type of the restriction taken and returned. */
public abstract TypeDescriptor<?> restrictionT();

static SplitRestrictionMethod create(Method targetMethod, TypeDescriptor<?> restrictionT) {
return new AutoValue_DoFnSignature_SplitRestrictionMethod(targetMethod, restrictionT);
/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();

/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();

static SplitRestrictionMethod create(
Method targetMethod,
TypeDescriptor<?> restrictionT,
TypeDescriptor<? extends BoundedWindow> windowT,
List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_SplitRestrictionMethod(
targetMethod, restrictionT, windowT, extraParameters);
}
}

/** Describes a {@link DoFn.NewTracker} method. */
@AutoValue
public abstract static class NewTrackerMethod implements DoFnMethod {
public abstract static class NewTrackerMethod implements MethodWithExtraParameters {
/** The annotated method itself. */
@Override
public abstract Method targetMethod();
Expand All @@ -986,9 +1014,23 @@ public abstract static class NewTrackerMethod implements DoFnMethod {
/** Type of the returned {@link RestrictionTracker}. */
public abstract TypeDescriptor<?> trackerT();

/** The window type used by this method, if any. */
@Nullable
@Override
public abstract TypeDescriptor<? extends BoundedWindow> windowT();

/** Types of optional parameters of the annotated method, in the order they appear. */
@Override
public abstract List<Parameter> extraParameters();

static NewTrackerMethod create(
Method targetMethod, TypeDescriptor<?> restrictionT, TypeDescriptor<?> trackerT) {
return new AutoValue_DoFnSignature_NewTrackerMethod(targetMethod, restrictionT, trackerT);
Method targetMethod,
TypeDescriptor<?> restrictionT,
TypeDescriptor<?> trackerT,
TypeDescriptor<? extends BoundedWindow> windowT,
List<Parameter> extraParameters) {
return new AutoValue_DoFnSignature_NewTrackerMethod(
targetMethod, restrictionT, trackerT, windowT, extraParameters);
}
}

Expand Down
Loading