Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13614, BEAM-13213] Add OnWindowExpiration support to the Java SDK harness and proto translation #16458

Merged
merged 1 commit into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,12 @@ message ParDoPayload {
// be placed in the pipeline requirements.
bool requires_stable_input = 11;

// If populated, the name of the timer family spec which should be notified
// on each window expiry.
// If this is set, the corresponding standard requirement should also
// be placed in the pipeline requirements.
string on_window_expiration_timer_family_spec = 12;

reserved 6;
}

Expand Down Expand Up @@ -1601,7 +1607,7 @@ message StandardRunnerProtocols {
// to be added in a forwards-compatible way).
message StandardRequirements {
enum Enum {
// This requirement indicates the state_spec and time_spec fields of ParDo
// This requirement indicates the state_specs and timer_family_specs fields of ParDo
// transform payloads must be inspected.
REQUIRES_STATEFUL_PROCESSING = 0 [(beam_urn) = "beam:requirement:pardo:stateful:v1"];

Expand All @@ -1620,6 +1626,10 @@ message StandardRequirements {
// This requirement indicates the restriction_coder_id field of ParDo
// transform payloads must be inspected.
REQUIRES_SPLITTABLE_DOFN = 4 [(beam_urn) = "beam:requirement:pardo:splittable_dofn:v1"];

// This requirement indicates that the on_window_expiration_timer_family_spec field
// of ParDo transform payloads must be inspected.
REQUIRES_ON_WINDOW_EXPIRATION = 5 [(beam_urn) = "beam:requirement:pardo:on_window_expiration:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -38,6 +39,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
Expand Down Expand Up @@ -116,6 +118,9 @@ public class ParDoTranslation {
*/
public static final String REQUIRES_SPLITTABLE_DOFN_URN =
"beam:requirement:pardo:splittable_dofn:v1";
/** This requirement indicates that the ParDo requires a callback on each window expiration. */
public static final String REQUIRES_ON_WINDOW_EXPIRATION_URN =
"beam:requirement:pardo:on_window_expiration:v1";

static {
checkState(
Expand All @@ -132,6 +137,9 @@ public class ParDoTranslation {
checkState(
REQUIRES_SPLITTABLE_DOFN_URN.equals(
getUrn(StandardRequirements.Enum.REQUIRES_SPLITTABLE_DOFN)));
checkState(
REQUIRES_ON_WINDOW_EXPIRATION_URN.equals(
getUrn(StandardRequirements.Enum.REQUIRES_ON_WINDOW_EXPIRATION)));
}

/** The URN for an unknown Java {@link DoFn}. */
Expand Down Expand Up @@ -281,8 +289,7 @@ public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents compon
}

@Override
public Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(
SdkComponents newComponents) {
public ParDoLikeTimerFamilySpecs translateTimerFamilySpecs(SdkComponents newComponents) {
Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs = new HashMap<>();

for (Map.Entry<String, TimerDeclaration> timer :
Expand All @@ -306,14 +313,34 @@ public Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(
windowCoder);
timerFamilySpecs.put(timerFamily.getKey(), spec);
}
return timerFamilySpecs;

String onWindowExpirationTimerFamilySpec = null;
if (signature.onWindowExpiration() != null) {
RunnerApi.TimerFamilySpec spec =
RunnerApi.TimerFamilySpec.newBuilder()
.setTimeDomain(translateTimeDomain(TimeDomain.EVENT_TIME))
.setTimerFamilyCoderId(
registerCoderOrThrow(components, Timer.Coder.of(keyCoder, windowCoder)))
.build();
for (int i = 0; i < Integer.MAX_VALUE; ++i) {
onWindowExpirationTimerFamilySpec = "onWindowExpiration" + i;
if (!timerFamilySpecs.containsKey(onWindowExpirationTimerFamilySpec)) {
break;
}
}
timerFamilySpecs.put(onWindowExpirationTimerFamilySpec, spec);
}

return ParDoLikeTimerFamilySpecs.create(
timerFamilySpecs, onWindowExpirationTimerFamilySpec);
}

@Override
public boolean isStateful() {
return !signature.stateDeclarations().isEmpty()
|| !signature.timerDeclarations().isEmpty()
|| !signature.timerFamilyDeclarations().isEmpty();
|| !signature.timerFamilyDeclarations().isEmpty()
|| signature.onWindowExpiration() != null;
}

@Override
Expand Down Expand Up @@ -645,7 +672,7 @@ static StateSpec<?> fromProto(RunnerApi.StateSpec stateSpec, RehydratedComponent
}
}

private static String registerCoderOrThrow(SdkComponents components, Coder coder) {
public static String registerCoderOrThrow(SdkComponents components, Coder coder) {
try {
return components.registerCoder(coder);
} catch (IOException exc) {
Expand All @@ -665,7 +692,7 @@ public static RunnerApi.TimerFamilySpec translateTimerFamilySpec(
.build();
}

private static RunnerApi.TimeDomain.Enum translateTimeDomain(TimeDomain timeDomain) {
public static RunnerApi.TimeDomain.Enum translateTimeDomain(TimeDomain timeDomain) {
switch (timeDomain) {
case EVENT_TIME:
return RunnerApi.TimeDomain.Enum.EVENT_TIME;
Expand Down Expand Up @@ -769,6 +796,22 @@ public static FunctionSpec translateWindowMappingFn(
.build();
}

@AutoValue
public abstract static class ParDoLikeTimerFamilySpecs {

public static ParDoLikeTimerFamilySpecs create(
Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs,
@Nullable String onWindowExpirationTimerFamilySpec) {
return new AutoValue_ParDoTranslation_ParDoLikeTimerFamilySpecs(
timerFamilySpecs, onWindowExpirationTimerFamilySpec);
}

abstract Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs();

@Nullable
abstract String onWindowExpirationTimerFamilySpec();
}

/** These methods drive to-proto translation from Java and from rehydrated ParDos. */
public interface ParDoLike {
FunctionSpec translateDoFn(SdkComponents newComponents);
Expand All @@ -778,7 +821,7 @@ public interface ParDoLike {
Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
throws IOException;

Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(SdkComponents newComponents);
ParDoLikeTimerFamilySpecs translateTimerFamilySpecs(SdkComponents newComponents);

boolean isStateful();

Expand Down Expand Up @@ -812,15 +855,24 @@ public static ParDoPayload payloadForParDoLike(ParDoLike parDo, SdkComponents co
components.addRequirement(REQUIRES_TIME_SORTED_INPUT_URN);
}

return ParDoPayload.newBuilder()
.setDoFn(parDo.translateDoFn(components))
.putAllStateSpecs(parDo.translateStateSpecs(components))
.putAllTimerFamilySpecs(parDo.translateTimerFamilySpecs(components))
.putAllSideInputs(parDo.translateSideInputs(components))
.setRequiresStableInput(parDo.isRequiresStableInput())
.setRequiresTimeSortedInput(parDo.isRequiresTimeSortedInput())
.setRestrictionCoderId(parDo.translateRestrictionCoderId(components))
.setRequestsFinalization(parDo.requestsFinalization())
.build();
ParDoLikeTimerFamilySpecs timerFamilySpecs = parDo.translateTimerFamilySpecs(components);
ParDoPayload.Builder builder =
ParDoPayload.newBuilder()
.setDoFn(parDo.translateDoFn(components))
.putAllStateSpecs(parDo.translateStateSpecs(components))
.putAllTimerFamilySpecs(timerFamilySpecs.timerFamilySpecs())
.putAllSideInputs(parDo.translateSideInputs(components))
.setRequiresStableInput(parDo.isRequiresStableInput())
.setRequiresTimeSortedInput(parDo.isRequiresTimeSortedInput())
.setRestrictionCoderId(parDo.translateRestrictionCoderId(components))
.setRequestsFinalization(parDo.requestsFinalization());

if (timerFamilySpecs.onWindowExpirationTimerFamilySpec() != null) {
components.addRequirement(REQUIRES_ON_WINDOW_EXPIRATION_URN);
builder.setOnWindowExpirationTimerFamilySpec(
timerFamilySpecs.onWindowExpirationTimerFamilySpec());
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLike;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLikeTimerFamilySpecs;
import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator;
import org.apache.beam.runners.core.construction.ReadTranslation.UnboundedReadPayloadTranslator;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -435,17 +435,16 @@ public Map<String, StateSpec> translateStateSpecs(SdkComponents components) {
}

@Override
public Map<String, TimerFamilySpec> translateTimerFamilySpecs(
public ParDoLikeTimerFamilySpecs translateTimerFamilySpecs(
SdkComponents newComponents) {
// SDFs don't have timers.
return ImmutableMap.of();
return ParDoLikeTimerFamilySpecs.create(ImmutableMap.of(), null);
}

@Override
public boolean isStateful() {
return !signature.stateDeclarations().isEmpty()
|| !signature.timerDeclarations().isEmpty()
|| !signature.timerFamilyDeclarations().isEmpty();
// SDFs don't have state or timers.
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ public void testToProto() throws Exception {
assertEquals(
parDo.getFn() instanceof StateTimerDropElementsFn,
components.requirements().contains(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN));
assertEquals(
parDo.getFn() instanceof StateTimerDropElementsFn,
components.requirements().contains(ParDoTranslation.REQUIRES_ON_WINDOW_EXPIRATION_URN));
assertEquals(
parDo.getFn() instanceof StateTimerDropElementsFn ? "onWindowExpiration0" : "",
payload.getOnWindowExpirationTimerFamilySpec());
}

@Test
Expand Down Expand Up @@ -339,6 +345,9 @@ public void onEventTime(OnTimerContext context) {}
@OnTimer(PROCESSING_TIMER_ID)
public void onProcessingTime(OnTimerContext context) {}

@OnWindowExpiration
public void onWindowExpiration() {}

@Override
public boolean equals(@Nullable Object other) {
return other instanceof StateTimerDropElementsFn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.beam.runners.dataflow;

import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
import static org.apache.beam.runners.core.construction.ParDoTranslation.registerCoderOrThrow;
import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimeDomain;
import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerFamilySpec;
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getStateSpecOrThrow;
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerFamilySpecOrThrow;
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerSpecOrThrow;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

Expand All @@ -37,14 +38,17 @@
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLikeTimerFamilySpecs;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -54,6 +58,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -245,37 +250,59 @@ public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents compon
}

@Override
public Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(
public ParDoLikeTimerFamilySpecs translateTimerFamilySpecs(
SdkComponents newComponents) {
Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs = new HashMap<>();
for (Map.Entry<String, DoFnSignature.TimerFamilyDeclaration> timerFamily :
signature.timerFamilyDeclarations().entrySet()) {

for (Map.Entry<String, TimerDeclaration> timer :
signature.timerDeclarations().entrySet()) {
RunnerApi.TimerFamilySpec spec =
translateTimerFamilySpec(
getTimerFamilySpecOrThrow(timerFamily.getValue(), doFn),
getTimerSpecOrThrow(timer.getValue(), doFn),
newComponents,
keyCoder,
windowCoder);
timerFamilySpecs.put(timerFamily.getKey(), spec);
timerFamilySpecs.put(timer.getKey(), spec);
}
for (Map.Entry<String, DoFnSignature.TimerDeclaration> timer :
signature.timerDeclarations().entrySet()) {

for (Map.Entry<String, DoFnSignature.TimerFamilyDeclaration> timerFamily :
signature.timerFamilyDeclarations().entrySet()) {
RunnerApi.TimerFamilySpec spec =
translateTimerFamilySpec(
getTimerSpecOrThrow(timer.getValue(), doFn),
DoFnSignatures.getTimerFamilySpecOrThrow(timerFamily.getValue(), doFn),
newComponents,
keyCoder,
windowCoder);
timerFamilySpecs.put(timer.getKey(), spec);
timerFamilySpecs.put(timerFamily.getKey(), spec);
}
return timerFamilySpecs;

String onWindowExpirationTimerFamilySpec = null;
if (signature.onWindowExpiration() != null) {
RunnerApi.TimerFamilySpec spec =
RunnerApi.TimerFamilySpec.newBuilder()
.setTimeDomain(translateTimeDomain(TimeDomain.EVENT_TIME))
.setTimerFamilyCoderId(
registerCoderOrThrow(components, Timer.Coder.of(keyCoder, windowCoder)))
.build();
for (int i = 0; i < Integer.MAX_VALUE; ++i) {
onWindowExpirationTimerFamilySpec = "onWindowExpiration" + i;
if (!timerFamilySpecs.containsKey(onWindowExpirationTimerFamilySpec)) {
break;
}
}
timerFamilySpecs.put(onWindowExpirationTimerFamilySpec, spec);
}

return ParDoLikeTimerFamilySpecs.create(
timerFamilySpecs, onWindowExpirationTimerFamilySpec);
}

@Override
public boolean isStateful() {
return !signature.stateDeclarations().isEmpty()
|| !signature.timerDeclarations().isEmpty()
|| !signature.timerFamilyDeclarations().isEmpty();
|| !signature.timerFamilyDeclarations().isEmpty()
|| signature.onWindowExpiration() != null;
}

@Override
Expand Down
Loading