Skip to content

Commit

Permalink
This closes #2490
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed Apr 12, 2017
2 parents 7e603d5 + e0fd0a2 commit 571631a
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ public static RunnerApi.Coder toProto(
private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
throws IOException {
List<String> componentIds = new ArrayList<>();
for (Coder<?> componentCoder : coder.getCoderArguments()) {
componentIds.add(components.registerCoder(componentCoder));
if (coder.getCoderArguments() != null) {
for (Coder<?> componentCoder : coder.getCoderArguments()) {
componentIds.add(components.registerCoder(componentCoder));
}
}
return RunnerApi.Coder.newBuilder()
.addAllComponentCoderIds(componentIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ String registerPCollection(PCollection<?> pCollection) {
* unique ID for the {@link WindowingStrategy}. Multiple registrations of the same {@link
* WindowingStrategy} will return the same unique ID.
*/
String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) throws IOException {
String existing = windowingStrategyIds.get(windowingStrategy);
if (existing != null) {
return existing;
Expand All @@ -108,6 +108,9 @@ String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
NameUtils.approximateSimpleName(windowingStrategy.getWindowFn()));
String name = uniqify(baseName, windowingStrategyIds.values());
windowingStrategyIds.put(windowingStrategy, name);
RunnerApi.WindowingStrategy windowingStrategyProto =
WindowingStrategies.toProto(windowingStrategy, this);
componentsBuilder.putWindowingStrategies(name, windowingStrategyProto);
return name;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.io.Serializable;
import java.util.UUID;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
Expand Down Expand Up @@ -127,62 +124,30 @@ public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
}
}

// This URN says that the coder is just a UDF blob the indicated SDK understands
// TODO: standardize such things
public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";

// This URN says that the WindowFn is just a UDF blob the indicated SDK understands
// TODO: standardize such things
public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where
* {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link FunctionSpec}
* for the input {@link WindowFn}.
* Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
* RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the
* input {@link WindowFn}.
*/
public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
public static SdkFunctionSpec toProto(
WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components)
throws IOException {
Coder<?> windowCoder = windowFn.windowCoder();

// TODO: re-use components
String windowCoderId = UUID.randomUUID().toString();

SdkFunctionSpec windowFnSpec =
SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(CUSTOM_WINDOWFN_URN)
.setParameter(
Any.pack(
BytesValue.newBuilder()
.setValue(
ByteString.copyFrom(
SerializableUtils.serializeToByteArray(windowFn)))
.build())))
.build();

RunnerApi.Coder windowCoderProto =
RunnerApi.Coder.newBuilder()
.setSpec(
SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(CUSTOM_CODER_URN)
.setParameter(
Any.pack(
BytesValue.newBuilder()
.setValue(
ByteString.copyFrom(
OBJECT_MAPPER.writeValueAsBytes(
windowCoder.asCloudObject())))
.build()))))
.build();

return RunnerApi.MessageWithComponents.newBuilder()
.setSdkFunctionSpec(windowFnSpec)
.setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto))
return SdkFunctionSpec.newBuilder()
// TODO: Set environment ID
.setSpec(
FunctionSpec.newBuilder()
.setUrn(CUSTOM_WINDOWFN_URN)
.setParameter(
Any.pack(
BytesValue.newBuilder()
.setValue(
ByteString.copyFrom(
SerializableUtils.serializeToByteArray(windowFn)))
.build())))
.build();
}

Expand All @@ -194,9 +159,22 @@ public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
*/
public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
throws IOException {
SdkComponents components = SdkComponents.create();
RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components);

RunnerApi.MessageWithComponents windowFnWithComponents =
toProto(windowingStrategy.getWindowFn());
return RunnerApi.MessageWithComponents.newBuilder()
.setWindowingStrategy(windowingStrategyProto)
.setComponents(components.toComponents())
.build();
}

/**
* Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering
* any components in the provided {@link SdkComponents}.
*/
public static RunnerApi.WindowingStrategy toProto(
WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException {
SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components);

RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
RunnerApi.WindowingStrategy.newBuilder()
Expand All @@ -205,11 +183,11 @@ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> wi
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
.setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
.setWindowFn(windowFnWithComponents.getSdkFunctionSpec());
.setWindowFn(windowFnSpec)
.setWindowCoderId(
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));

return RunnerApi.MessageWithComponents.newBuilder()
.setWindowingStrategy(windowingStrategyProto)
.setComponents(windowFnWithComponents.getComponents()).build();
return windowingStrategyProto.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,17 @@ public void registerPCollectionExistingNameCollision() {
}

@Test
public void registerWindowingStrategy() {
public void registerWindowingStrategy() throws IOException {
WindowingStrategy<?, ?> strategy =
WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
String name = components.registerWindowingStrategy(strategy);
assertThat(name, not(isEmptyOrNullString()));

components.toComponents().getWindowingStrategiesOrThrow(name);
}

@Test
public void registerWindowingStrategyIdEqualStrategies() {
public void registerWindowingStrategyIdEqualStrategies() throws IOException {
WindowingStrategy<?, ?> strategy =
WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
String name = components.registerWindowingStrategy(strategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
Expand All @@ -30,6 +31,7 @@
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -89,4 +91,20 @@ public void testToProtoAndBack() throws Exception {
toProtoAndBackWindowingStrategy,
equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
}

@Test
public void testToProtoAndBackWithComponents() throws Exception {
WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
SdkComponents components = SdkComponents.create();
RunnerApi.WindowingStrategy proto =
WindowingStrategies.toProto(windowingStrategy, components);
RunnerApi.Components protoComponents = components.toComponents();

assertThat(
WindowingStrategies.fromProto(proto, protoComponents).fixDefaults(),
Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults()));

protoComponents.getCodersOrThrow(
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
}
}

0 comments on commit 571631a

Please sign in to comment.