Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -115,7 +114,20 @@ static RunnerApi.PTransform toProto(
// TODO: Display Data

PTransform<?, ?> transform = appliedPTransform.getTransform();
if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
// A RawPTransform directly vends its payload. Because it will generally be
// a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
if (transform instanceof RawPTransform) {
RawPTransform<?, ?> rawPTransform = (RawPTransform<?, ?>) transform;

if (rawPTransform.getUrn() != null) {
FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn());
@Nullable Any parameter = rawPTransform.getPayload();
if (parameter != null) {
payload.setParameter(parameter);
}
transformBuilder.setSpec(payload);
}
} else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
FunctionSpec payload =
KNOWN_PAYLOAD_TRANSLATORS
.get(transform.getClass())
Expand Down Expand Up @@ -144,6 +156,25 @@ private static String toProto(TupleTag<?> tag) {
return tag.getId();
}

/**
* Returns the URN for the transform if it is known, otherwise {@code null}.
*/
@Nullable
public static String urnForTransformOrNull(PTransform<?, ?> transform) {

// A RawPTransform directly vends its URN. Because it will generally be
// a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
if (transform instanceof RawPTransform) {
return ((RawPTransform) transform).getUrn();
}

TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
if (translator == null) {
return null;
}
return translator.getUrn(transform);
}

/**
* Returns the URN for the transform if it is known, otherwise throws.
*/
Expand Down Expand Up @@ -176,38 +207,44 @@ FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents com
* fully expanded in the pipeline proto.
*/
public abstract static class RawPTransform<
InputT extends PInput, OutputT extends POutput, PayloadT extends Message>
InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {

@Nullable
public abstract String getUrn();

@Nullable
PayloadT getPayload() {
public Any getPayload() {
return null;
}
}

/**
* A translator that uses the explicit URN and payload from a {@link RawPTransform}.
*/
public static class RawPTransformTranslator<PayloadT extends Message>
implements TransformPayloadTranslator<RawPTransform<?, ?, PayloadT>> {
public static class RawPTransformTranslator
implements TransformPayloadTranslator<RawPTransform<?, ?>> {
@Override
public String getUrn(RawPTransform<?, ?, PayloadT> transform) {
public String getUrn(RawPTransform<?, ?> transform) {
return transform.getUrn();
}

@Override
public FunctionSpec translate(
AppliedPTransform<?, ?, RawPTransform<?, ?, PayloadT>> transform,
AppliedPTransform<?, ?, RawPTransform<?, ?>> transform,
SdkComponents components) {
PayloadT payload = transform.getTransform().getPayload();

// Anonymous composites have no spec
if (transform.getTransform().getUrn() == null) {
return null;
}

FunctionSpec.Builder transformSpec =
FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform()));

Any payload = transform.getTransform().getPayload();
if (payload != null) {
transformSpec.setParameter(Any.pack(payload));
transformSpec.setParameter(payload);
}

return transformSpec.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents

ParDoPayload.Builder builder = ParDoPayload.newBuilder();
builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
builder.setSplittable(signature.processElement().isSplittable());
for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
}
Expand Down Expand Up @@ -496,6 +497,25 @@ private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) {
.build();
}

private static <T> ParDoPayload getParDoPayload(AppliedPTransform<?, ?, ?> transform)
throws IOException {
return PTransformTranslation.toProto(
transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
.getSpec()
.getParameter()
.unpack(ParDoPayload.class);
}

public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
ParDoPayload payload = getParDoPayload(transform);
return payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0;
}

public static boolean isSplittable(AppliedPTransform<?, ?, ?> transform) throws IOException {
ParDoPayload payload = getParDoPayload(transform);
return payload.getSplittable();
}

private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
throws InvalidProtocolBufferException {
FunctionSpec spec = viewFn.getSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.List;
import java.util.UUID;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -67,6 +68,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
public static final String SPLITTABLE_PROCESS_URN =
"urn:beam:runners_core:transforms:splittable_process:v1";

public static final String SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN =
"urn:beam:runners_core:transforms:splittable_process_keyed_elements:v1";

public static final String SPLITTABLE_GBKIKWI_URN =
"urn:beam:runners_core:transforms:splittable_gbkikwi:v1";

/**
* Creates the transform for the given original multi-output {@link ParDo}.
*
Expand Down Expand Up @@ -133,11 +140,11 @@ public void process(ProcessContext c, BoundedWindow window) {

/**
* Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement}
* method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input
* {@link PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys.
* method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input {@link
* PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys.
*/
public static class ProcessKeyedElements<InputT, OutputT, RestrictionT>
extends PTransform<
extends RawPTransform<
PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> {
private final DoFn<InputT, OutputT> fn;
private final Coder<InputT> elementCoder;
Expand Down Expand Up @@ -227,6 +234,11 @@ public static <OutputT> PCollectionTuple createPrimitiveOutputFor(

return outputs;
}

@Override
public String getUrn() {
return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -111,7 +112,11 @@ public static class TestParDoPayloadTranslation {
ParDo.of(new DropElementsFn())
.withOutputTags(
new TupleTag<Void>(),
TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})),
ParDo.of(new SplittableDropElementsFn())
.withOutputTags(
new TupleTag<Void>(),
TupleTagList.empty()));
}

@Parameter(0)
Expand Down Expand Up @@ -235,6 +240,34 @@ public int hashCode() {
}
}

private static class SplittableDropElementsFn extends DoFn<KV<Long, String>, Void> {
@ProcessElement
public void proc(ProcessContext context, RestrictionTracker<Integer> restriction) {
context.output(null);
}

@GetInitialRestriction
public Integer restriction(KV<Long, String> elem) {
return 42;
}

@NewTracker
public RestrictionTracker<Integer> newTracker(Integer restriction) {
throw new UnsupportedOperationException("Should never be called; only to test translation");
}


@Override
public boolean equals(Object other) {
return other instanceof SplittableDropElementsFn;
}

@Override
public int hashCode() {
return SplittableDropElementsFn.class.hashCode();
}
}

@SuppressWarnings("unused")
private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> {
private static final String BAG_STATE_ID = "bagState";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.Map;
import org.apache.beam.runners.core.construction.ElementAndRestriction;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
Expand Down Expand Up @@ -67,12 +69,18 @@ public class SplittableParDoViaKeyedWorkItems {
* emits output immediately.
*/
public static class GBKIntoKeyedWorkItems<KeyT, InputT>
extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
extends RawPTransform<
PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
@Override
public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
}

@Override
public String getUrn() {
return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
}
}

/** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */
Expand Down
5 changes: 0 additions & 5 deletions runners/direct-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.protobuf.Message;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
Expand Down Expand Up @@ -74,7 +73,7 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {

static final class DirectGroupByKeyOnly<K, V>
extends PTransformTranslation.RawPTransform<
PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, Message> {
PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
@Override
public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
Expand All @@ -101,7 +100,7 @@ public String getUrn() {

static final class DirectGroupAlsoByWindow<K, V>
extends PTransformTranslation.RawPTransform<
PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, Message> {
PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {

private final WindowingStrategy<?, ?> inputWindowingStrategy;
private final WindowingStrategy<?, ?> outputWindowingStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
class KeyedPValueTrackingVisitor extends PipelineVisitor.Defaults {

private static final Set<Class<? extends PTransform>> PRODUCES_KEYED_OUTPUTS =
ImmutableSet.of(
ImmutableSet.<Class<? extends PTransform>>of(
SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class,
DirectGroupByKeyOnly.class,
DirectGroupAlsoByWindow.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static com.google.common.base.Preconditions.checkState;

import com.google.protobuf.Message;
import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
Expand Down Expand Up @@ -172,7 +171,7 @@ public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {

static class StatefulParDo<K, InputT, OutputT>
extends PTransformTranslation.RawPTransform<
PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, Message> {
PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> {
private final transient MultiOutput<KV<K, InputT>, OutputT> underlyingParDo;
private final transient PCollection<KV<K, InputT>> originalInput;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.protobuf.Message;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -185,7 +184,7 @@ public Map<PValue, ReplacementOutput> mapOutputs(
static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1";

static class DirectTestStream<T>
extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>, Message> {
extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>> {
private final transient DirectRunner runner;
private final TestStream<T> original;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ public static class DirectTransformsRegistrar implements TransformPayloadTransla
.<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
.put(
DirectGroupByKey.DirectGroupByKeyOnly.class,
new PTransformTranslation.RawPTransformTranslator<>())
new PTransformTranslation.RawPTransformTranslator())
.put(
DirectGroupByKey.DirectGroupAlsoByWindow.class,
new PTransformTranslation.RawPTransformTranslator())
.put(
ParDoMultiOverrideFactory.StatefulParDo.class,
new PTransformTranslation.RawPTransformTranslator<>())
new PTransformTranslation.RawPTransformTranslator())
.put(
ViewOverrideFactory.WriteView.class,
new PTransformTranslation.RawPTransformTranslator<>())
.put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator<>())
new PTransformTranslation.RawPTransformTranslator())
.put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator())
.put(
SplittableParDoViaKeyedWorkItems.ProcessElements.class,
new SplittableParDoProcessElementsTranslator())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.beam.runners.direct;

import com.google.protobuf.Message;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
Expand Down Expand Up @@ -95,7 +94,7 @@ protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
* to {@link ViewT}.
*/
static final class WriteView<ElemT, ViewT>
extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>, Message> {
extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
private final CreatePCollectionView<ElemT, ViewT> og;

WriteView(CreatePCollectionView<ElemT, ViewT> og) {
Expand Down
Loading