Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
Expand All @@ -58,15 +57,19 @@ class ParDoTranslator<InputT, OutputT>
@Override
public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
DoFn<InputT, OutputT> doFn = transform.getFn();
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());

if (signature.processElement().isSplittable()) {
if (DoFnSignatures.isSplittable(doFn)) {
throw new UnsupportedOperationException(
String.format(
"%s does not support splittable DoFn: %s", ApexRunner.class.getSimpleName(), doFn));
}

if (signature.timerDeclarations().size() > 0) {
if (DoFnSignatures.requiresTimeSortedInput(doFn)) {
throw new UnsupportedOperationException(
String.format(
"%s doesn't currently support @RequiresTimeSortedInput",
ApexRunner.class.getSimpleName()));
}
if (DoFnSignatures.usesTimers(doFn)) {
throw new UnsupportedOperationException(
String.format(
"Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.gearpump.GearpumpRunner;
import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
Expand Down Expand Up @@ -80,6 +82,13 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
Map<String, PCollectionView<?>> sideInputMapping =
ParDoTranslation.getSideInputMapping(context.getCurrentTransform());

if (DoFnSignatures.requiresTimeSortedInput(transform.getFn())) {
throw new UnsupportedOperationException(
String.format(
"%s doesn't currently support @RequiresTimeSortedInput annotation",
GearpumpRunner.class.getSimpleName()));
}

JavaStream<TranslatorUtils.RawUnionValue> outputStream =
TranslatorUtils.toList(unionStream)
.flatMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ ParDo.SingleOutput<KV<K, InputT>, OutputT> getOriginalParDo() {
public PCollection<OutputT> expand(PCollection<KV<K, InputT>> input) {
DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
verifyFnIsStateful(fn);
DataflowRunner.verifyStateSupported(fn);
DataflowRunner.verifyDoFnSupportedBatch(fn);
DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());

if (isFnApi) {
Expand Down Expand Up @@ -212,7 +212,7 @@ static class StatefulMultiOutputParDo<K, InputT, OutputT>
public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
verifyFnIsStateful(fn);
DataflowRunner.verifyStateSupported(fn);
DataflowRunner.verifyDoFnSupportedBatch(fn);
DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());

if (isFnApi) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,10 +1224,10 @@ private static void translateFn(
Map<TupleTag<?>, Coder<?>> outputCoders,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());

if (signature.usesState() || signature.usesTimers()) {
DataflowRunner.verifyStateSupported(fn);
boolean isStateful = DoFnSignatures.isStateful(fn);
if (isStateful) {
DataflowRunner.verifyDoFnSupported(fn, context.getPipelineOptions().isStreaming());
DataflowRunner.verifyStateSupportForWindowingStrategy(windowingStrategy);
}

Expand Down Expand Up @@ -1255,8 +1255,7 @@ private static void translateFn(

// Setting USES_KEYED_STATE will cause an ungrouped shuffle, which works
// in streaming but does not work in batch
if (context.getPipelineOptions().isStreaming()
&& (signature.usesState() || signature.usesTimers())) {
if (context.getPipelineOptions().isStreaming() && isStateful) {
stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand All @@ -151,7 +150,6 @@
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
Expand Down Expand Up @@ -1937,26 +1935,34 @@ static String getContainerImageForJob(DataflowPipelineOptions options) {
}
}

static void verifyStateSupported(DoFn<?, ?> fn) {
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());

for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) {
static void verifyDoFnSupportedBatch(DoFn<?, ?> fn) {
verifyDoFnSupported(fn, false);
}

// https://issues.apache.org/jira/browse/BEAM-1474
if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(MapState.class))) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support %s",
DataflowRunner.class.getSimpleName(), MapState.class.getSimpleName()));
}
static void verifyDoFnSupportedStreaming(DoFn<?, ?> fn) {
verifyDoFnSupported(fn, true);
}

static void verifyDoFnSupported(DoFn<?, ?> fn, boolean streaming) {
if (DoFnSignatures.usesSetState(fn)) {
// https://issues.apache.org/jira/browse/BEAM-1479
if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(SetState.class))) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support %s",
DataflowRunner.class.getSimpleName(), SetState.class.getSimpleName()));
}
throw new UnsupportedOperationException(
String.format(
"%s does not currently support %s",
DataflowRunner.class.getSimpleName(), SetState.class.getSimpleName()));
}
if (DoFnSignatures.usesMapState(fn)) {
// https://issues.apache.org/jira/browse/BEAM-1474
throw new UnsupportedOperationException(
String.format(
"%s does not currently support %s",
DataflowRunner.class.getSimpleName(), MapState.class.getSimpleName()));
}
if (streaming && DoFnSignatures.requiresTimeSortedInput(fn)) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support @RequiresTimeSortedInput in streaming mode.",
DataflowRunner.class.getSimpleName()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,17 @@ static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> appliedTransform) {
static DoFn<?, ?> getDoFn(AppliedPTransform<?, ?, ?> appliedTransform) {
try {
DoFn<?, ?> doFn = ParDoTranslation.getDoFn(appliedTransform);
if (DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable()) {
if (DoFnSignatures.isSplittable(doFn)) {
throw new IllegalStateException(
"Not expected to directly translate splittable DoFn, should have been overridden: "
+ doFn); // todo
}
if (DoFnSignatures.requiresTimeSortedInput(doFn)) {
throw new UnsupportedOperationException(
String.format(
"%s doesn't current support @RequiresTimeSortedInput annotation.",
JetRunner.class.getSimpleName()));
}
return doFn;
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> create(

final SamzaExecutionContext executionContext =
(SamzaExecutionContext) context.getApplicationContainerContext();
if (signature.usesState()) {
if (DoFnSignatures.isStateful(doFn)) {
keyedInternals = new KeyedInternals(stateInternalsFactory, timerInternalsFactory);
stateInternals = keyedInternals.stateInternals();
timerInternals = keyedInternals.timerInternals();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,16 @@ private static <InT, OutT> void doTranslate(
.collect(
Collectors.toMap(e -> e.getKey(), e -> ((PCollection<?>) e.getValue()).getCoder()));

final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
final Coder<?> keyCoder =
signature.usesState() ? ((KvCoder<?, ?>) input.getCoder()).getKeyCoder() : null;
boolean isStateful = DoFnSignatures.isStateful(transform.getFn());
final Coder<?> keyCoder = isStateful ? ((KvCoder<?, ?>) input.getCoder()).getKeyCoder() : null;

if (signature.processElement().isSplittable()) {
if (DoFnSignatures.isSplittable(transform.getFn())) {
throw new UnsupportedOperationException("Splittable DoFn is not currently supported");
}
if (DoFnSignatures.requiresTimeSortedInput(transform.getFn())) {
throw new UnsupportedOperationException(
"@RequiresTimeSortedInput annotation is not currently supported");
}

final MessageStream<OpMessage<InT>> inputStream = ctx.getMessageStream(input);
final List<MessageStream<OpMessage<InT>>> sideInputStreams =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -72,17 +71,17 @@ public void translateTransform(
// TODO: add support of Splittable DoFn
DoFn<InputT, OutputT> doFn = getDoFn(context);
checkState(
!DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(),
!DoFnSignatures.isSplittable(doFn),
"Not expected to directly translate splittable DoFn, should have been overridden: %s",
doFn);

// TODO: add support of states and timers
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
boolean stateful =
signature.stateDeclarations().size() > 0
|| signature.timerDeclarations().size() > 0
|| signature.timerFamilyDeclarations().size() > 0;
checkState(!stateful, "States and timers are not supported for the moment.");
checkState(
!DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment.");

checkState(
!DoFnSignatures.requiresTimeSortedInput(doFn),
"@RequiresTimeSortedInput is not " + "supported for the moment");

DoFnSchemaInformation doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,17 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand Down Expand Up @@ -2026,4 +2031,49 @@ public static TimerSpec getTimerFamilySpecOrThrow(
timerFamilyDeclaration.field().getName()));
}
}

public static boolean isSplittable(DoFn<?, ?> doFn) {
return signatureForDoFn(doFn).processElement().isSplittable();
}

public static boolean isStateful(DoFn<?, ?> doFn) {
return usesState(doFn) || usesTimers(doFn);
}

public static boolean usesMapState(DoFn<?, ?> doFn) {
return usesGivenStateClass(doFn, MapState.class);
}

public static boolean usesSetState(DoFn<?, ?> doFn) {
return usesGivenStateClass(doFn, SetState.class);
}

public static boolean usesValueState(DoFn<?, ?> doFn) {
return usesGivenStateClass(doFn, ValueState.class) || requiresTimeSortedInput(doFn);
}

public static boolean usesBagState(DoFn<?, ?> doFn) {
return usesGivenStateClass(doFn, BagState.class) || requiresTimeSortedInput(doFn);
}

public static boolean usesWatermarkHold(DoFn<?, ?> doFn) {
return usesGivenStateClass(doFn, WatermarkHoldState.class) || requiresTimeSortedInput(doFn);
}

public static boolean usesTimers(DoFn<?, ?> doFn) {
return signatureForDoFn(doFn).usesTimers() || requiresTimeSortedInput(doFn);
}

public static boolean usesState(DoFn<?, ?> doFn) {
return signatureForDoFn(doFn).usesState() || requiresTimeSortedInput(doFn);
}

public static boolean requiresTimeSortedInput(DoFn<?, ?> doFn) {
return signatureForDoFn(doFn).processElement().requiresTimeSortedInput();
}

private static boolean usesGivenStateClass(DoFn<?, ?> doFn, Class<? extends State> stateClass) {
return signatureForDoFn(doFn).stateDeclarations().values().stream()
.anyMatch(d -> d.stateType().isSubtypeOf(TypeDescriptor.of(stateClass)));
}
}
Loading