diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index ee64f912872f..66a98b07cdbc 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -58,16 +58,27 @@
org.apache.beam
beam-sdks-common-runner-api
+
org.apache.beam
beam-sdks-java-core
+
+ com.google.protobuf
+ protobuf-java
+
+
com.fasterxml.jackson.core
jackson-annotations
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
com.google.code.findbugs
jsr305
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
new file mode 100644
index 000000000000..d890de76e306
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+
+/** Converts to and from Beam Runner API representations of {@link Coder Coders}. */
+public class Coders {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ // This URN says that the coder is just a UDF blob this SDK understands
+ // TODO: standardize such things
+ public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
+
+ // The URNs for coders which are shared across languages
+ private static final BiMap, String> KNOWN_CODER_URNS =
+ ImmutableBiMap., String>builder()
+ .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1")
+ .put(KvCoder.class, "urn:beam:coders:kv:0.1")
+ .put(VarIntCoder.class, "urn:beam:coders:varint:0.1")
+ .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1")
+ .put(IterableCoder.class, "urn:beam:coders:stream:0.1")
+ .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1")
+ .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1")
+ .build();
+
+ public static RunnerApi.Coder toProto(
+ Coder> coder, @SuppressWarnings("unused") SdkComponents components) throws IOException {
+ if (KNOWN_CODER_URNS.containsKey(coder.getClass())) {
+ return toKnownCoder(coder, components);
+ }
+ return toCustomCoder(coder);
+ }
+
+ private static RunnerApi.Coder toKnownCoder(Coder> coder, SdkComponents components)
+ throws IOException {
+ List componentIds = new ArrayList<>();
+ for (Coder> componentCoder : coder.getCoderArguments()) {
+ componentIds.add(components.registerCoder(componentCoder));
+ }
+ return RunnerApi.Coder.newBuilder()
+ .addAllComponentCoderIds(componentIds)
+ .setSpec(
+ SdkFunctionSpec.newBuilder()
+ .setSpec(FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(coder.getClass()))))
+ .build();
+ }
+
+ private static RunnerApi.Coder toCustomCoder(Coder> coder) throws IOException {
+ RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder();
+ return coderBuilder
+ .setSpec(
+ SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_CODER_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ OBJECT_MAPPER.writeValueAsBytes(coder.asCloudObject())))
+ .build()))))
+ .build();
+ }
+
+ public static Coder> fromProto(RunnerApi.Coder protoCoder, Components components)
+ throws IOException {
+ String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn();
+ if (coderSpecUrn.equals(CUSTOM_CODER_URN)) {
+ return fromCustomCoder(protoCoder, components);
+ }
+ return fromKnownCoder(protoCoder, components);
+ }
+
+ private static Coder> fromKnownCoder(RunnerApi.Coder coder, Components components)
+ throws IOException {
+ String coderUrn = coder.getSpec().getSpec().getUrn();
+ List> coderComponents = new LinkedList<>();
+ for (String componentId : coder.getComponentCoderIdsList()) {
+ Coder> innerCoder = fromProto(components.getCodersOrThrow(componentId), components);
+ coderComponents.add(innerCoder);
+ }
+ switch (coderUrn) {
+ case "urn:beam:coders:bytes:0.1":
+ return ByteArrayCoder.of();
+ case "urn:beam:coders:kv:0.1":
+ return KvCoder.of(coderComponents);
+ case "urn:beam:coders:varint:0.1":
+ return VarLongCoder.of();
+ case "urn:beam:coders:interval_window:0.1":
+ return IntervalWindowCoder.of();
+ case "urn:beam:coders:stream:0.1":
+ return IterableCoder.of(coderComponents);
+ case "urn:beam:coders:global_window:0.1":
+ return GlobalWindow.Coder.INSTANCE;
+ case "urn:beam:coders:windowed_value:0.1":
+ return WindowedValue.FullWindowedValueCoder.of(coderComponents);
+ default:
+ throw new IllegalStateException(
+ String.format(
+ "Unknown coder URN %s. Known URNs: %s", coderUrn, KNOWN_CODER_URNS.values()));
+ }
+ }
+
+ private static Coder> fromCustomCoder(
+ RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components)
+ throws IOException {
+ CloudObject coderCloudObject =
+ OBJECT_MAPPER.readValue(
+ protoCoder
+ .getSpec()
+ .getSpec()
+ .getParameter()
+ .unpack(BytesValue.class)
+ .getValue()
+ .toByteArray(),
+ CloudObject.class);
+ return Serializer.deserialize(coderCloudObject, Coder.class);
+ }
+}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index c4b8cf1c9337..5cb0a00e2389 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -21,6 +21,7 @@
import com.google.common.base.Equivalence;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
+import java.io.IOException;
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
@@ -119,7 +120,7 @@ String registerWindowingStrategy(WindowingStrategy, ?> windowingStrategy) {
* #equals(Object)} and {@link #hashCode()} but incompatible binary formats are not considered the
* same coder.
*/
- String registerCoder(Coder> coder) {
+ String registerCoder(Coder> coder) throws IOException {
String existing = coderIds.get(Equivalence.identity().wrap(coder));
if (existing != null) {
return existing;
@@ -127,6 +128,8 @@ String registerCoder(Coder> coder) {
String baseName = NameUtils.approximateSimpleName(coder);
String name = uniqify(baseName, coderIds.values());
coderIds.put(Equivalence.identity().wrap(coder), name);
+ RunnerApi.Coder coderProto = Coders.toProto(coder, this);
+ componentsBuilder.putCoders(name, coderProto);
return name;
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
new file mode 100644
index 000000000000..1a657b27e65a
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for {@link Coders}.
+ */
+@RunWith(Parameterized.class)
+public class CodersTest {
+ @Parameters(name = "{index}: {0}")
+ public static Iterable> data() {
+ return ImmutableList.>of(
+ StringUtf8Coder.of(),
+ IterableCoder.of(VarLongCoder.of()),
+ KvCoder.of(StringUtf8Coder.of(), ListCoder.of(VarLongCoder.of())),
+ SerializableCoder.of(Record.class),
+ new RecordCoder(),
+ KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)));
+ }
+
+ @Parameter(0)
+ public Coder> coder;
+
+ @Test
+ public void toAndFromProto() throws Exception {
+ SdkComponents componentsBuilder = SdkComponents.create();
+ RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder);
+
+ Components encodedComponents = componentsBuilder.toComponents();
+ Coder> decodedCoder = Coders.fromProto(coderProto, encodedComponents);
+ assertThat(decodedCoder, Matchers.>equalTo(coder));
+ }
+
+ static class Record implements Serializable {
+ }
+
+ private static class RecordCoder extends CustomCoder {
+ @Override
+ public void encode(Record value, OutputStream outStream, Context context)
+ throws CoderException, IOException {}
+
+ @Override
+ public Record decode(InputStream inStream, Context context) throws CoderException, IOException {
+ return new Record();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other != null && getClass().equals(other.getClass());
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
+}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index c96e57c0dad6..28b49113c437 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -24,6 +24,7 @@
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
+import java.io.IOException;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -39,6 +40,7 @@
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -56,13 +58,32 @@ public class SdkComponentsTest {
private SdkComponents components = SdkComponents.create();
@Test
- public void registerCoder() {
+ public void registerCoder() throws IOException {
Coder> coder =
KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
String id = components.registerCoder(coder);
assertThat(components.registerCoder(coder), equalTo(id));
assertThat(id, not(isEmptyOrNullString()));
- assertThat(components.registerCoder(VarLongCoder.of()), not(equalTo(id)));
+ VarLongCoder otherCoder = VarLongCoder.of();
+ assertThat(components.registerCoder(otherCoder), not(equalTo(id)));
+
+ components.toComponents().getCodersOrThrow(id);
+ components.toComponents().getCodersOrThrow(components.registerCoder(otherCoder));
+ }
+
+ @Test
+ public void registerCoderEqualsNotSame() throws IOException {
+ Coder> coder =
+ KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
+ Coder> otherCoder =
+ KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
+ assertThat(coder, Matchers.>equalTo(otherCoder));
+ String id = components.registerCoder(coder);
+ String otherId = components.registerCoder(otherCoder);
+ assertThat(otherId, not(equalTo(id)));
+
+ components.toComponents().getCodersOrThrow(id);
+ components.toComponents().getCodersOrThrow(otherId);
}
@Test