Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 24 additions & 33 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ message PTransform {
// there is none, or it is not relevant (such as use by the Fn API)
// then it may be omitted.
DisplayData display_data = 6;

string environment_id = 7;
}

message StandardPTransforms {
Expand Down Expand Up @@ -364,8 +366,8 @@ message PCollection {
// The payload for the primitive ParDo transform.
message ParDoPayload {

// (Required) The SdkFunctionSpec of the DoFn.
SdkFunctionSpec do_fn = 1;
// (Required) The FunctionSpec of the DoFn.
FunctionSpec do_fn = 1;

// (Required) Additional pieces of context the DoFn may require that
// are not otherwise represented in the payload.
Expand Down Expand Up @@ -439,7 +441,7 @@ message BagStateSpec {

message CombiningStateSpec {
string accumulator_coder_id = 1;
SdkFunctionSpec combine_fn = 2;
FunctionSpec combine_fn = 2;
}

message MapStateSpec {
Expand Down Expand Up @@ -467,8 +469,8 @@ message IsBounded {
// The payload for the primitive Read transform.
message ReadPayload {

// (Required) The SdkFunctionSpec of the source for this Read.
SdkFunctionSpec source = 1;
// (Required) The FunctionSpec of the source for this Read.
FunctionSpec source = 1;

// (Required) Whether the source is bounded or unbounded
IsBounded.Enum is_bounded = 2;
Expand All @@ -479,15 +481,15 @@ message ReadPayload {
// The payload for the WindowInto transform.
message WindowIntoPayload {

// (Required) The SdkFunctionSpec of the WindowFn.
SdkFunctionSpec window_fn = 1;
// (Required) The FunctionSpec of the WindowFn.
FunctionSpec window_fn = 1;
}

// The payload for the special-but-not-primitive Combine transform.
message CombinePayload {

// (Required) The SdkFunctionSpec of the CombineFn.
SdkFunctionSpec combine_fn = 1;
// (Required) The FunctionSpec of the CombineFn.
FunctionSpec combine_fn = 1;

// (Required) A reference to the Coder to use for accumulators of the CombineFn
string accumulator_coder_id = 2;
Expand Down Expand Up @@ -562,11 +564,11 @@ message EventsRequest {}
// The payload for the special-but-not-primitive WriteFiles transform.
message WriteFilesPayload {

// (Required) The SdkFunctionSpec of the FileBasedSink.
SdkFunctionSpec sink = 1;
// (Required) The FunctionSpec of the FileBasedSink.
FunctionSpec sink = 1;

// (Required) The format function.
SdkFunctionSpec format_function = 2;
FunctionSpec format_function = 2;

bool windowed_writes = 3;

Expand All @@ -588,7 +590,7 @@ message Coder {

// (Optional) If this coder is parametric, such as ListCoder(VarIntCoder),
// this is a list of the components. In order for encodings to be identical,
// the SdkFunctionSpec and all components must be identical, recursively.
// the FunctionSpec and all components must be identical, recursively.
repeated string component_coder_ids = 2;
}

Expand Down Expand Up @@ -741,10 +743,10 @@ message StandardCoders {
// TODO: consider inlining field on PCollection
message WindowingStrategy {

// (Required) The SdkFunctionSpec of the UDF that assigns windows,
// (Required) The FunctionSpec of the UDF that assigns windows,
// merges windows, and shifts timestamps before they are
// combined according to the OutputTime.
SdkFunctionSpec window_fn = 1;
FunctionSpec window_fn = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other uses of SdkFunctionSpec are part of a transform but finding the envrionment for the windowing strategy that the window_fn is part of when looking at a PCollection is difficult since one needs to find the environment that the upstream assign windows transform is part of. It is possible but annoying. Also this is different then coders which don't have an environment since both the upstream and downstream transforms need to understand the encoding and runners have the length prefixing technique to convert coders from unknown ones to known ones.

For now I'm for not adding an environment id here and forcing that graph traversal for the assign windows transform to find the environment and in the future to truly support custom window fns its likely we will have to come up with techniques like the coder length prefixing to make it so that they can be treated opaquely. (note that rest of the change looks good)

@chamikaramj @robertwb what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense and I'm actually running into this issue while updating SDKs. We probably have to add environment_id to WindowingStrategy since it's not directly attached to PTransform.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The windowing strategy cannot be coerced into an environment-agnostic one the same way a coder can be, as it involves actually executing user code as oppose to specifying constraints on its behavior. (In some sense, it is in a very real sense a cross-language UDF.)

Having to traverse the graph is ugly, but I suppose possible for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After hearing about some of the difficulties that Cham is running into. I would go either way on whether we add an environment id here or remove it completely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an environment_id to WindowingStrategy for now (retaining the current behavior). This can be further simplified in the future if needed.


// (Required) Whether or not the window fn is merging.
//
Expand Down Expand Up @@ -787,6 +789,8 @@ message WindowingStrategy {
//
// This knowledge is required for some optimizations
bool assigns_to_one_window = 10;

string environment_id = 11;
}

// Whether or not a PCollection's WindowFn is non-merging, merging, or
Expand Down Expand Up @@ -1043,20 +1047,20 @@ message SideInput {
// URN)
FunctionSpec access_pattern = 1;

// (Required) The SdkFunctionSpec of the UDF that adapts a particular
// (Required) The FunctionSpec of the UDF that adapts a particular
// access_pattern to a user-facing view type.
//
// For example, View.asSingleton() may include a `view_fn` that adapts a
// specially-designed multimap to a single value per window.
SdkFunctionSpec view_fn = 2;
FunctionSpec view_fn = 2;

// (Required) The SdkFunctionSpec of the UDF that maps a main input window
// (Required) The FunctionSpec of the UDF that maps a main input window
// to a side input window.
//
// For example, when the main input is in fixed windows of one hour, this
// can specify that the side input should be accessed according to the day
// in which that hour falls.
SdkFunctionSpec window_mapping_fn = 3;
FunctionSpec window_mapping_fn = 3;
}

// An environment for executing UDFs. By default, an SDK container URL, but
Expand Down Expand Up @@ -1099,18 +1103,6 @@ message ExternalPayload {
map<string, string> params = 2; // Arbitrary extra parameters to pass
}

// A specification of a user defined function.
//
message SdkFunctionSpec {

// (Required) A full specification of this function.
FunctionSpec spec = 1;

// (Required) Reference to an execution environment capable of
// invoking this function.
string environment_id = 2;
}

extend google.protobuf.EnumValueOptions {
// An extension to be used for specifying the standard URN of various
// pipeline entities, e.g. transforms, functions, coders etc.
Expand Down Expand Up @@ -1258,15 +1250,14 @@ message MessageWithComponents {
oneof root {
Coder coder = 2;
CombinePayload combine_payload = 3;
SdkFunctionSpec sdk_function_spec = 4;
FunctionSpec function_spec = 4;
ParDoPayload par_do_payload = 6;
PTransform ptransform = 7;
PCollection pcollection = 8;
ReadPayload read_payload = 9;
SideInput side_input = 11;
WindowIntoPayload window_into_payload = 12;
WindowingStrategy windowing_strategy = 13;
FunctionSpec function_spec = 14;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -234,15 +233,10 @@ private static CombinePayload combinePayload(
.build();
}

public static SdkFunctionSpec toProto(
GlobalCombineFn<?, ?, ?> combineFn, SdkComponents components) {
return SdkFunctionSpec.newBuilder()
.setEnvironmentId(components.getOnlyEnvironmentId())
.setSpec(
FunctionSpec.newBuilder()
.setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
.setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
.build())
public static FunctionSpec toProto(GlobalCombineFn<?, ?, ?> combineFn, SdkComponents components) {
return FunctionSpec.newBuilder()
.setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
.setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,56 +23,21 @@
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.DockerPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExternalPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.ProcessPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** Utilities for interacting with portability {@link Environment environments}. */
public class Environments {
private static final ImmutableMap<String, EnvironmentIdExtractor> KNOWN_URN_SPEC_EXTRACTORS =
ImmutableMap.<String, EnvironmentIdExtractor>builder()
.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, Environments::combineExtractor)
.put(
PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
Environments::combineExtractor)
.put(
PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
Environments::combineExtractor)
.put(
PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
Environments::combineExtractor)
.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, Environments::parDoExtractor)
.put(PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN, Environments::parDoExtractor)
.put(
PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
Environments::parDoExtractor)
.put(
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
Environments::parDoExtractor)
.put(
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
Environments::parDoExtractor)
.put(PTransformTranslation.READ_TRANSFORM_URN, Environments::readExtractor)
.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, Environments::windowExtractor)
.build();

private static final EnvironmentIdExtractor DEFAULT_SPEC_EXTRACTOR = transform -> null;

private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
Expand Down Expand Up @@ -181,73 +146,31 @@ public static Environment createProcessEnvironment(
}

public static Optional<Environment> getEnvironment(String ptransformId, Components components) {
try {
PTransform ptransform = components.getTransformsOrThrow(ptransformId);
String envId =
KNOWN_URN_SPEC_EXTRACTORS
.getOrDefault(ptransform.getSpec().getUrn(), DEFAULT_SPEC_EXTRACTOR)
.getEnvironmentId(ptransform);
if (Strings.isNullOrEmpty(envId)) {
// Some PTransform payloads may have an unspecified (empty) Environment ID, for example a
// WindowIntoPayload with a known WindowFn. Others will never have an Environment ID, such
// as a GroupByKeyPayload, and the Default extractor returns null in this case.
return Optional.empty();
} else {
return Optional.of(components.getEnvironmentsOrThrow(envId));
}
} catch (IOException e) {
throw new RuntimeException(e);
PTransform ptransform = components.getTransformsOrThrow(ptransformId);
String envId = ptransform.getEnvironmentId();
if (Strings.isNullOrEmpty(envId)) {
// Some PTransform payloads may have an unspecified (empty) Environment ID, for example a
// WindowIntoPayload with a known WindowFn. Others will never have an Environment ID, such
// as a GroupByKeyPayload, and we return null in this case.
return Optional.empty();
} else {
return Optional.of(components.getEnvironmentsOrThrow(envId));
}
}

public static Optional<Environment> getEnvironment(
PTransform ptransform, RehydratedComponents components) {
try {
String envId =
KNOWN_URN_SPEC_EXTRACTORS
.getOrDefault(ptransform.getSpec().getUrn(), DEFAULT_SPEC_EXTRACTOR)
.getEnvironmentId(ptransform);
if (!Strings.isNullOrEmpty(envId)) {
// Some PTransform payloads may have an empty (default) Environment ID, for example a
// WindowIntoPayload with a known WindowFn. Others will never have an Environment ID, such
// as a GroupByKeyPayload, and the Default extractor returns null in this case.
return Optional.of(components.getEnvironment(envId));
} else {
return Optional.empty();
}
} catch (IOException e) {
throw new RuntimeException(e);
String envId = ptransform.getEnvironmentId();
if (Strings.isNullOrEmpty(envId)) {
return Optional.empty();
} else {
// Some PTransform payloads may have an empty (default) Environment ID, for example a
// WindowIntoPayload with a known WindowFn. Others will never have an Environment ID, such
// as a GroupByKeyPayload, and we return null in this case.
return Optional.of(components.getEnvironment(envId));
}
}

private interface EnvironmentIdExtractor {
@Nullable
String getEnvironmentId(PTransform transform) throws IOException;
}

private static String parDoExtractor(PTransform pTransform)
throws InvalidProtocolBufferException {
return ParDoPayload.parseFrom(pTransform.getSpec().getPayload()).getDoFn().getEnvironmentId();
}

private static String combineExtractor(PTransform pTransform)
throws InvalidProtocolBufferException {
return CombinePayload.parseFrom(pTransform.getSpec().getPayload())
.getCombineFn()
.getEnvironmentId();
}

private static String readExtractor(PTransform transform) throws InvalidProtocolBufferException {
return ReadPayload.parseFrom(transform.getSpec().getPayload()).getSource().getEnvironmentId();
}

private static String windowExtractor(PTransform transform)
throws InvalidProtocolBufferException {
return WindowIntoPayload.parseFrom(transform.getSpec().getPayload())
.getWindowFn()
.getEnvironmentId();
}

private static class ProcessPayloadReferenceJSON {
@Nullable private String os;
@Nullable private String arch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public RunnerApi.PTransform translate(
transformBuilder
.setUniqueName(proto.getUniqueName())
.setSpec(proto.getSpec())
.setEnvironmentId(proto.getEnvironmentId())
.addAllSubtransforms(proto.getSubtransformsList());
for (Map.Entry<String, String> inputEntry : proto.getInputsMap().entrySet()) {
transformBuilder.putInputs(
Expand All @@ -144,6 +145,7 @@ public RunnerApi.PTransform translate(
.setUniqueName(expandedTransform.getUniqueName())
.setSpec(expandedTransform.getSpec())
.addAllSubtransforms(expandedTransform.getSubtransformsList())
.setEnvironmentId(expandedTransform.getEnvironmentId())
.putAllInputs(expandedTransform.getInputsMap());
for (Map.Entry<String, String> outputEntry : expandedTransform.getOutputsMap().entrySet()) {
rootTransformBuilder.putOutputs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ public static PCollectionView<?> viewFromProto(
}

/**
* Converts a {@link org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec} into a {@link
* Converts a {@link org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec} into a {@link
* ViewFn} using the URN.
*/
public static ViewFn<?, ?> viewFnFromProto(RunnerApi.SdkFunctionSpec viewFn)
public static ViewFn<?, ?> viewFnFromProto(RunnerApi.FunctionSpec viewFn)
throws InvalidProtocolBufferException {
RunnerApi.FunctionSpec spec = viewFn.getSpec();
RunnerApi.FunctionSpec spec = viewFn;
checkArgument(
spec.getUrn().equals(ParDoTranslation.CUSTOM_JAVA_VIEW_FN_URN),
"Can't deserialize unknown %s type %s",
Expand All @@ -89,12 +89,12 @@ public static PCollectionView<?> viewFromProto(
}

/**
* Converts a {@link org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec} into a {@link
* Converts a {@link org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec} into a {@link
* WindowMappingFn} using the URN.
*/
public static WindowMappingFn<?> windowMappingFnFromProto(
RunnerApi.SdkFunctionSpec windowMappingFn) throws InvalidProtocolBufferException {
RunnerApi.FunctionSpec spec = windowMappingFn.getSpec();
public static WindowMappingFn<?> windowMappingFnFromProto(RunnerApi.FunctionSpec windowMappingFn)
throws InvalidProtocolBufferException {
RunnerApi.FunctionSpec spec = windowMappingFn;
checkArgument(
spec.getUrn().equals(ParDoTranslation.CUSTOM_JAVA_WINDOW_MAPPING_FN_URN),
"Can't deserialize unknown %s type %s",
Expand Down
Loading