From e0fd0a222510b733b98d6c33c694431139bbc40d Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 7 Apr 2017 13:41:29 -0700 Subject: [PATCH] Use SdkComponents in WindowingStrategy.toProto --- .../runners/core/construction/Coders.java | 6 +- .../core/construction/SdkComponents.java | 5 +- .../construction/WindowingStrategies.java | 94 +++++++------------ .../core/construction/SdkComponentsTest.java | 6 +- .../construction/WindowingStrategiesTest.java | 18 ++++ 5 files changed, 66 insertions(+), 63 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java index d890de76e3068..7b96240397847 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -76,8 +76,10 @@ public static RunnerApi.Coder toProto( private static RunnerApi.Coder toKnownCoder(Coder coder, SdkComponents components) throws IOException { List componentIds = new ArrayList<>(); - for (Coder componentCoder : coder.getCoderArguments()) { - componentIds.add(components.registerCoder(componentCoder)); + if (coder.getCoderArguments() != null) { + for (Coder componentCoder : coder.getCoderArguments()) { + componentIds.add(components.registerCoder(componentCoder)); + } } return RunnerApi.Coder.newBuilder() .addAllComponentCoderIds(componentIds) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 5cb0a00e23891..03f3a03b43c1d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -96,7 +96,7 @@ String registerPCollection(PCollection pCollection) { * unique ID for the {@link WindowingStrategy}. Multiple registrations of the same {@link * WindowingStrategy} will return the same unique ID. */ - String registerWindowingStrategy(WindowingStrategy windowingStrategy) { + String registerWindowingStrategy(WindowingStrategy windowingStrategy) throws IOException { String existing = windowingStrategyIds.get(windowingStrategy); if (existing != null) { return existing; @@ -108,6 +108,9 @@ String registerWindowingStrategy(WindowingStrategy windowingStrategy) { NameUtils.approximateSimpleName(windowingStrategy.getWindowFn())); String name = uniqify(baseName, windowingStrategyIds.values()); windowingStrategyIds.put(windowingStrategy, name); + RunnerApi.WindowingStrategy windowingStrategyProto = + WindowingStrategies.toProto(windowingStrategy, this); + componentsBuilder.putWindowingStrategies(name, windowingStrategyProto); return name; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java index 353be05206861..6d721b0b8bf83 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java @@ -19,15 +19,12 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.Serializable; -import java.util.UUID; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; @@ -127,62 +124,30 @@ public static RunnerApi.OutputTime toProto(OutputTimeFn outputTimeFn) { } } - // This URN says that the coder is just a UDF blob the indicated SDK understands - // TODO: standardize such things - public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1"; - // This URN says that the WindowFn is just a UDF blob the indicated SDK understands // TODO: standardize such things public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - /** - * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where - * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link FunctionSpec} - * for the input {@link WindowFn}. + * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link + * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the + * input {@link WindowFn}. */ - public static RunnerApi.MessageWithComponents toProto(WindowFn windowFn) + public static SdkFunctionSpec toProto( + WindowFn windowFn, @SuppressWarnings("unused") SdkComponents components) throws IOException { - Coder windowCoder = windowFn.windowCoder(); - - // TODO: re-use components - String windowCoderId = UUID.randomUUID().toString(); - - SdkFunctionSpec windowFnSpec = - SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(CUSTOM_WINDOWFN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(windowFn))) - .build()))) - .build(); - - RunnerApi.Coder windowCoderProto = - RunnerApi.Coder.newBuilder() - .setSpec( - SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(CUSTOM_CODER_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes( - windowCoder.asCloudObject()))) - .build())))) - .build(); - - return RunnerApi.MessageWithComponents.newBuilder() - .setSdkFunctionSpec(windowFnSpec) - .setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto)) + return SdkFunctionSpec.newBuilder() + // TODO: Set environment ID + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_WINDOWFN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(windowFn))) + .build()))) .build(); } @@ -194,9 +159,22 @@ public static RunnerApi.MessageWithComponents toProto(WindowFn windowFn) */ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy windowingStrategy) throws IOException { + SdkComponents components = SdkComponents.create(); + RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components); - RunnerApi.MessageWithComponents windowFnWithComponents = - toProto(windowingStrategy.getWindowFn()); + return RunnerApi.MessageWithComponents.newBuilder() + .setWindowingStrategy(windowingStrategyProto) + .setComponents(components.toComponents()) + .build(); + } + + /** + * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering + * any components in the provided {@link SdkComponents}. + */ + public static RunnerApi.WindowingStrategy toProto( + WindowingStrategy windowingStrategy, SdkComponents components) throws IOException { + SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components); RunnerApi.WindowingStrategy.Builder windowingStrategyProto = RunnerApi.WindowingStrategy.newBuilder() @@ -205,11 +183,11 @@ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy wi .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior())) .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) .setTrigger(Triggers.toProto(windowingStrategy.getTrigger())) - .setWindowFn(windowFnWithComponents.getSdkFunctionSpec()); + .setWindowFn(windowFnSpec) + .setWindowCoderId( + components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); - return RunnerApi.MessageWithComponents.newBuilder() - .setWindowingStrategy(windowingStrategyProto) - .setComponents(windowFnWithComponents.getComponents()).build(); + return windowingStrategyProto.build(); } /** diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index 28b49113c437e..ef4b16bf3f8cc 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -132,15 +132,17 @@ public void registerPCollectionExistingNameCollision() { } @Test - public void registerWindowingStrategy() { + public void registerWindowingStrategy() throws IOException { WindowingStrategy strategy = WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); String name = components.registerWindowingStrategy(strategy); assertThat(name, not(isEmptyOrNullString())); + + components.toComponents().getWindowingStrategiesOrThrow(name); } @Test - public void registerWindowingStrategyIdEqualStrategies() { + public void registerWindowingStrategyIdEqualStrategies() throws IOException { WindowingStrategy strategy = WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); String name = components.registerWindowingStrategy(strategy); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java index b603d65ea4dc1..62bba8eb40ae7 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java @@ -22,6 +22,7 @@ import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; @@ -30,6 +31,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.Test; import org.junit.runner.RunWith; @@ -89,4 +91,20 @@ public void testToProtoAndBack() throws Exception { toProtoAndBackWindowingStrategy, equalTo((WindowingStrategy) windowingStrategy.fixDefaults())); } + + @Test + public void testToProtoAndBackWithComponents() throws Exception { + WindowingStrategy windowingStrategy = toProtoAndBackSpec.getWindowingStrategy(); + SdkComponents components = SdkComponents.create(); + RunnerApi.WindowingStrategy proto = + WindowingStrategies.toProto(windowingStrategy, components); + RunnerApi.Components protoComponents = components.toComponents(); + + assertThat( + WindowingStrategies.fromProto(proto, protoComponents).fixDefaults(), + Matchers.>equalTo(windowingStrategy.fixDefaults())); + + protoComponents.getCodersOrThrow( + components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); + } }