From 9bd7dcf0a1d836fb09e485bf4b9af65b57a1fb08 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 7 Apr 2017 11:46:24 -0700 Subject: [PATCH 1/4] Add Coder utilities for Proto conversions --- runners/core-construction-java/pom.xml | 11 ++ .../runners/core/construction/Coders.java | 90 ++++++++++++++++ .../core/construction/SdkComponents.java | 5 +- .../runners/core/construction/CodersTest.java | 100 ++++++++++++++++++ .../core/construction/SdkComponentsTest.java | 25 ++++- 5 files changed, 228 insertions(+), 3 deletions(-) create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java create mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml index ee64f912872f..66a98b07cdbc 100644 --- a/runners/core-construction-java/pom.xml +++ b/runners/core-construction-java/pom.xml @@ -58,16 +58,27 @@ org.apache.beam beam-sdks-common-runner-api + org.apache.beam beam-sdks-java-core + + com.google.protobuf + protobuf-java + + com.fasterxml.jackson.core jackson-annotations + + com.fasterxml.jackson.core + jackson-databind + + com.google.code.findbugs jsr305 diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java new file mode 100644 index 000000000000..fa9f675e99a8 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.util.CloudObject; +import org.apache.beam.sdk.util.Serializer; + +/** Utilities for working with {@link Coder Coders}. */ +public class Coders { + // This URN says that the coder is just a UDF blob the indicated SDK understands + // TODO: standardize such things + public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static RunnerApi.Coder toProto( + Coder coder, @SuppressWarnings("unused") SdkComponents components) throws IOException { + return toCustomCoder(coder); + } + + private static RunnerApi.Coder toCustomCoder(Coder coder) throws IOException { + RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder(); + return coderBuilder + .setSpec( + SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_CODER_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + OBJECT_MAPPER.writeValueAsBytes(coder.asCloudObject()))) + .build())))) + .build(); + } + + public static Coder fromProto(RunnerApi.Coder protoCoder, Components components) + throws IOException { + String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn(); + if (coderSpecUrn.equals(CUSTOM_CODER_URN)) { + return fromCustomCoder(protoCoder, components); + } + throw new IllegalArgumentException( + String.format("Unknown %s URN %s", Coder.class.getSimpleName(), coderSpecUrn)); + } + + private static Coder fromCustomCoder( + RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components) + throws IOException { + CloudObject coderCloudObject = + OBJECT_MAPPER.readValue( + protoCoder + .getSpec() + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + CloudObject.class); + return Serializer.deserialize(coderCloudObject, Coder.class); + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index c4b8cf1c9337..5cb0a00e2389 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -21,6 +21,7 @@ import com.google.common.base.Equivalence; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import java.io.IOException; import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; @@ -119,7 +120,7 @@ String registerWindowingStrategy(WindowingStrategy windowingStrategy) { * #equals(Object)} and {@link #hashCode()} but incompatible binary formats are not considered the * same coder. */ - String registerCoder(Coder coder) { + String registerCoder(Coder coder) throws IOException { String existing = coderIds.get(Equivalence.identity().wrap(coder)); if (existing != null) { return existing; @@ -127,6 +128,8 @@ String registerCoder(Coder coder) { String baseName = NameUtils.approximateSimpleName(coder); String name = uniqify(baseName, coderIds.values()); coderIds.put(Equivalence.identity().wrap(coder), name); + RunnerApi.Coder coderProto = Coders.toProto(coder, this); + componentsBuilder.putCoders(name, coderProto); return name; } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java new file mode 100644 index 000000000000..cac06c48739b --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests for {@link Coders}. + */ +@RunWith(Parameterized.class) +public class CodersTest { + @Parameters(name = "{index}: {0}") + public static Iterable> data() { + return ImmutableList.>of( + StringUtf8Coder.of(), + IterableCoder.of(VarLongCoder.of()), + KvCoder.of(StringUtf8Coder.of(), ListCoder.of(VarLongCoder.of())), + SerializableCoder.of(Record.class), + new RecordCoder(), + KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class))); + } + + @Parameter(0) + public Coder coder; + + @Test + public void toAndFromProto() throws Exception { + SdkComponents componentsBuilder = SdkComponents.create(); + RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder); + + // TODO: Get some real components + Components encodedComponents = Components.getDefaultInstance(); + Coder decodedCoder = Coders.fromProto(coderProto, encodedComponents); + assertThat(decodedCoder, Matchers.>equalTo(coder)); + } + + static class Record implements Serializable { + } + + private static class RecordCoder extends CustomCoder { + @Override + public void encode(Record value, OutputStream outStream, Context context) + throws CoderException, IOException {} + + @Override + public Record decode(InputStream inStream, Context context) throws CoderException, IOException { + return new Record(); + } + + @Override + public boolean equals(Object other) { + return other != null && getClass().equals(other.getClass()); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + } +} diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index c96e57c0dad6..28b49113c437 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; +import java.io.IOException; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -39,6 +40,7 @@ import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -56,13 +58,32 @@ public class SdkComponentsTest { private SdkComponents components = SdkComponents.create(); @Test - public void registerCoder() { + public void registerCoder() throws IOException { Coder coder = KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of()))); String id = components.registerCoder(coder); assertThat(components.registerCoder(coder), equalTo(id)); assertThat(id, not(isEmptyOrNullString())); - assertThat(components.registerCoder(VarLongCoder.of()), not(equalTo(id))); + VarLongCoder otherCoder = VarLongCoder.of(); + assertThat(components.registerCoder(otherCoder), not(equalTo(id))); + + components.toComponents().getCodersOrThrow(id); + components.toComponents().getCodersOrThrow(components.registerCoder(otherCoder)); + } + + @Test + public void registerCoderEqualsNotSame() throws IOException { + Coder coder = + KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of()))); + Coder otherCoder = + KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of()))); + assertThat(coder, Matchers.>equalTo(otherCoder)); + String id = components.registerCoder(coder); + String otherId = components.registerCoder(otherCoder); + assertThat(otherId, not(equalTo(id))); + + components.toComponents().getCodersOrThrow(id); + components.toComponents().getCodersOrThrow(otherId); } @Test From 48ec9f20c403cbea06c08bd15ab94e8949beed2e Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 10 Apr 2017 16:29:58 -0700 Subject: [PATCH 2/4] squash! Add Coder utilities for Proto conversions Add Known Coders --- .../runners/core/construction/Coders.java | 78 ++++++++++++++++++- 1 file changed, 75 insertions(+), 3 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java index fa9f675e99a8..f2f2bf25e65c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -19,31 +19,74 @@ package org.apache.beam.runners.core.construction; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.Serializer; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; /** Utilities for working with {@link Coder Coders}. */ public class Coders { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + // This URN says that the coder is just a UDF blob the indicated SDK understands // TODO: standardize such things public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + // The URNs for coders are shared across languages + private static final BiMap, String> KNOWN_CODER_URNS = + ImmutableBiMap., String>builder() + .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1") + .put(KvCoder.class, "urn:beam:coders:kv:0.1") + .put(VarIntCoder.class, "urn:beam:coders:varint:0.1") + .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1") + .put(IterableCoder.class, "urn:beam:coders:stream:0.1") + .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1") + .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1") + .build(); public static RunnerApi.Coder toProto( Coder coder, @SuppressWarnings("unused") SdkComponents components) throws IOException { + if (KNOWN_CODER_URNS.containsKey(coder.getClass())) { + return toKnownCoder(coder, components); + } return toCustomCoder(coder); } + private static RunnerApi.Coder toKnownCoder(Coder coder, SdkComponents components) + throws IOException { + List componentIds = new ArrayList<>(); + for (Coder componentCoder : coder.getCoderArguments()) { + componentIds.add(components.registerCoder(componentCoder)); + } + return RunnerApi.Coder.newBuilder() + .addAllComponentCoderIds(componentIds) + .setSpec( + SdkFunctionSpec.newBuilder() + .setSpec(FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(coder.getClass())))) + .build(); + } + private static RunnerApi.Coder toCustomCoder(Coder coder) throws IOException { RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder(); return coderBuilder @@ -68,8 +111,37 @@ public static Coder fromProto(RunnerApi.Coder protoCoder, Components componen if (coderSpecUrn.equals(CUSTOM_CODER_URN)) { return fromCustomCoder(protoCoder, components); } - throw new IllegalArgumentException( - String.format("Unknown %s URN %s", Coder.class.getSimpleName(), coderSpecUrn)); + return fromKnownCoder(protoCoder, components); + } + + private static Coder fromKnownCoder(RunnerApi.Coder coder, Components components) + throws IOException { + String coderUrn = coder.getSpec().getSpec().getUrn(); + List> coderComponents = new LinkedList<>(); + for (String componentId : coder.getComponentCoderIdsList()) { + Coder innerCoder = fromProto(components.getCodersOrThrow(componentId), components); + coderComponents.add(innerCoder); + } + switch (coderUrn) { + case "urn:beam:coders:bytes:0.1": + return ByteArrayCoder.of(); + case "urn:beam:coders:kv:0.1": + return KvCoder.of(coderComponents); + case "urn:beam:coders:varint:0.1": + return VarLongCoder.of(); + case "urn:beam:coders:interval_window:0.1": + return IntervalWindowCoder.of(); + case "urn:beam:coders:stream:0.1": + return IterableCoder.of(coderComponents); + case "urn:beam:coders:global_window:0.1": + return GlobalWindow.Coder.INSTANCE; + case "urn:beam:coders:windowed_value:0.1": + return WindowedValue.FullWindowedValueCoder.of(coderComponents); + default: + throw new IllegalStateException( + String.format( + "Unknown coder URN %s. Known URNs: %s", coderUrn, KNOWN_CODER_URNS.values())); + } } private static Coder fromCustomCoder( From 756f28f8c47a7eb7a6ac23e4888151e3cd323046 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 10 Apr 2017 19:25:37 -0700 Subject: [PATCH 3/4] fixup! Add Coder utilities for Proto conversions --- .../org/apache/beam/runners/core/construction/CodersTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java index cac06c48739b..1a657b27e65a 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -68,8 +68,7 @@ public void toAndFromProto() throws Exception { SdkComponents componentsBuilder = SdkComponents.create(); RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder); - // TODO: Get some real components - Components encodedComponents = Components.getDefaultInstance(); + Components encodedComponents = componentsBuilder.toComponents(); Coder decodedCoder = Coders.fromProto(coderProto, encodedComponents); assertThat(decodedCoder, Matchers.>equalTo(coder)); } From 07c27a6aea86d155d812e9b64eb5014b08b7d7df Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 11 Apr 2017 09:58:12 -0700 Subject: [PATCH 4/4] fixup! Add Coder utilities for Proto conversions --- .../org/apache/beam/runners/core/construction/Coders.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java index f2f2bf25e65c..d890de76e306 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -45,15 +45,15 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -/** Utilities for working with {@link Coder Coders}. */ +/** Converts to and from Beam Runner API representations of {@link Coder Coders}. */ public class Coders { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - // This URN says that the coder is just a UDF blob the indicated SDK understands + // This URN says that the coder is just a UDF blob this SDK understands // TODO: standardize such things public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1"; - // The URNs for coders are shared across languages + // The URNs for coders which are shared across languages private static final BiMap, String> KNOWN_CODER_URNS = ImmutableBiMap., String>builder() .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1")