From 8c61010f771cb0c113e19b6e050131cd78092ba3 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Tue, 9 Jan 2018 17:28:46 -0800 Subject: [PATCH] [BEAM-2863] Add the ability to length prefix unknown coders using the portable representation allowing a Runner to not need to know about all coder representations. This is towards supporting the side inputs over the portability framework. --- runners/java-fn-execution/build.gradle | 1 + runners/java-fn-execution/pom.xml | 6 + .../graph/LengthPrefixUnknownCoders.java | 183 ++++++++++++++++++ .../fnexecution/graph/package-info.java | 22 +++ .../graph/LengthPrefixUnknownCodersTest.java | 147 ++++++++++++++ 5 files changed, 359 insertions(+) create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/package-info.java create mode 100644 runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCodersTest.java diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle index e948c7ce6251..9bfa32c63f1a 100644 --- a/runners/java-fn-execution/build.gradle +++ b/runners/java-fn-execution/build.gradle @@ -40,6 +40,7 @@ dependencies { shadow library.java.grpc_netty shadow library.java.slf4j_api testCompile project(":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-fn-execution").sourceSets.test.output + testCompile project(path: ":beam-runners-parent:beam-runners-core-construction-java", configuration: "shadow") testCompile library.java.junit testCompile library.java.hamcrest_core testCompile library.java.hamcrest_library diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml index 1877e2a56abf..77f1d317e6e6 100644 --- a/runners/java-fn-execution/pom.xml +++ b/runners/java-fn-execution/pom.xml @@ -97,6 +97,12 @@ test + + org.apache.beam + beam-runners-core-construction-java + test + + org.apache.beam beam-sdks-java-fn-execution diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java new file mode 100644 index 000000000000..ac7e745b939f --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java @@ -0,0 +1,183 @@ +/* + * Copyright (C) 2017 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.beam.runners.fnexecution.graph; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import java.util.Set; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Coder; +import org.apache.beam.model.pipeline.v1.RunnerApi.Components; +import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; + +/** + * Utilities for replacing or wrapping unknown coders with {@link LengthPrefixCoder}. + * + *

TODO: Support a dynamic list of well known coders using either registration or manual listing. + */ +public class LengthPrefixUnknownCoders { + private static final String BYTES_CODER_TYPE = "urn:beam:coders:bytes:0.1"; + private static final String LENGTH_PREFIX_CODER_TYPE = "urn:beam:coders:length_prefix:0.1"; + private static final Set WELL_KNOWN_CODER_URNS = + ImmutableSet.of( + BYTES_CODER_TYPE, + "urn:beam:coders:kv:0.1", + "urn:beam:coders:varint:0.1", + "urn:beam:coders:interval_window:0.1", + "urn:beam:coders:stream:0.1", + LENGTH_PREFIX_CODER_TYPE, + "urn:beam:coders:global_window:0.1", + "urn:beam:coders:windowed_value:0.1"); + + /** + * Recursively traverse the coder tree and wrap the first unknown coder in every branch with a + * {@link LengthPrefixCoder} unless an ancestor coder is itself a {@link LengthPrefixCoder}. If + * {@code replaceWithByteArrayCoder} is set, then replace that unknown coder with a + * {@link ByteArrayCoder}. Note that no ids that are generated will collide with the ids supplied + * within the {@link Components#getCodersMap() coder map} key space. + * + * @param coderId The root coder contained within {@code coders} to start the recursive descent + * from. + * @param components Contains the root coder and all component coders. + * @param replaceWithByteArrayCoder whether to replace an unknown coder with a + * {@link ByteArrayCoder}. + * @return A {@link MessageWithComponents} with the + * {@link MessageWithComponents#getCoder() root coder} and its component coders. Note that no ids + * that are generated will collide with the ids supplied within the + * {@link Components#getCodersMap() coder map} key space. + */ + public static RunnerApi.MessageWithComponents forCoder( + String coderId, + RunnerApi.Components components, + boolean replaceWithByteArrayCoder) { + + RunnerApi.Coder currentCoder = components.getCodersOrThrow(coderId); + + // We handle three cases: + // 1) the requested coder is already a length prefix coder. In this case we just honor the + // request to replace the coder with a byte array coder. + // 2) the requested coder is a known coder but not a length prefix coder. In this case we + // rebuild the coder by recursively length prefixing any unknown component coders. + // 3) the requested coder is an unknown coder. In this case we either wrap the requested coder + // with a length prefix coder or replace it with a length prefix byte array coder. + if (LENGTH_PREFIX_CODER_TYPE.equals(currentCoder.getSpec().getSpec().getUrn())) { + if (replaceWithByteArrayCoder) { + return createLengthPrefixByteArrayCoder(coderId, components); + } + + MessageWithComponents.Builder rvalBuilder = MessageWithComponents.newBuilder(); + rvalBuilder.setCoder(currentCoder); + rvalBuilder.setComponents(components); + return rvalBuilder.build(); + } else if (WELL_KNOWN_CODER_URNS.contains(currentCoder.getSpec().getSpec().getUrn())) { + return lengthPrefixUnknownComponentCoders(coderId, components, replaceWithByteArrayCoder); + } else { + return lengthPrefixUnknownCoder(coderId, components, replaceWithByteArrayCoder); + } + } + + private static MessageWithComponents lengthPrefixUnknownComponentCoders( + String coderId, + RunnerApi.Components components, + boolean replaceWithByteArrayCoder) { + + MessageWithComponents.Builder rvalBuilder = MessageWithComponents.newBuilder(); + RunnerApi.Coder currentCoder = components.getCodersOrThrow(coderId); + RunnerApi.Coder.Builder updatedCoder = currentCoder.toBuilder(); + // Rebuild the component coder ids to handle if any of the component coders changed. + updatedCoder.clearComponentCoderIds(); + for (final String componentCoderId : currentCoder.getComponentCoderIdsList()) { + MessageWithComponents componentCoder = + forCoder(componentCoderId, components, replaceWithByteArrayCoder); + String newComponentCoderId = componentCoderId; + if (!components.getCodersOrThrow(componentCoderId).equals(componentCoder.getCoder())) { + // Generate a new id if the component coder changed. + newComponentCoderId = generateUniqueId( + coderId + "-length_prefix", + Sets.union(components.getCodersMap().keySet(), + rvalBuilder.getComponents().getCodersMap().keySet())); + } + updatedCoder.addComponentCoderIds(newComponentCoderId); + rvalBuilder.getComponentsBuilder().putCoders(newComponentCoderId, componentCoder.getCoder()); + // Insert all component coders of the component coder. + rvalBuilder.getComponentsBuilder().putAllCoders( + componentCoder.getComponents().getCodersMap()); + } + rvalBuilder.setCoder(updatedCoder); + + return rvalBuilder.build(); + } + + // If we are handling an unknown URN then we need to wrap it with a length prefix coder. + // If requested we also replace the unknown coder with a byte array coder. + private static MessageWithComponents lengthPrefixUnknownCoder( + String coderId, + RunnerApi.Components components, + boolean replaceWithByteArrayCoder) { + MessageWithComponents.Builder rvalBuilder = MessageWithComponents.newBuilder(); + RunnerApi.Coder currentCoder = components.getCodersOrThrow(coderId); + + String lengthPrefixComponentCoderId = coderId; + if (replaceWithByteArrayCoder) { + return createLengthPrefixByteArrayCoder(coderId, components); + } else { + rvalBuilder.getComponentsBuilder().putCoders(coderId, currentCoder); + } + + rvalBuilder.getCoderBuilder() + .addComponentCoderIds(lengthPrefixComponentCoderId) + .getSpecBuilder() + .getSpecBuilder() + .setUrn(LENGTH_PREFIX_CODER_TYPE); + return rvalBuilder.build(); + } + + private static MessageWithComponents createLengthPrefixByteArrayCoder( + String coderId, + RunnerApi.Components components) { + MessageWithComponents.Builder rvalBuilder = MessageWithComponents.newBuilder(); + + String byteArrayCoderId = generateUniqueId( + coderId + "-byte_array", + Sets.union(components.getCodersMap().keySet(), + rvalBuilder.getComponents().getCodersMap().keySet())); + Coder.Builder byteArrayCoder = Coder.newBuilder(); + byteArrayCoder.getSpecBuilder().getSpecBuilder().setUrn(BYTES_CODER_TYPE); + rvalBuilder.getComponentsBuilder().putCoders(byteArrayCoderId, + byteArrayCoder.build()); + rvalBuilder.getCoderBuilder() + .addComponentCoderIds(byteArrayCoderId) + .getSpecBuilder() + .getSpecBuilder() + .setUrn(LENGTH_PREFIX_CODER_TYPE); + + return rvalBuilder.build(); + } + + /** + * Generates a unique id given a prefix and the set of existing ids. + */ + static String generateUniqueId(String prefix, Set existingIds) { + int i = 0; + while (existingIds.contains(prefix + i)) { + i += 1; + } + return prefix + i; + } +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/package-info.java new file mode 100644 index 000000000000..c7ffe538b638 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utilities for a Beam runner to interact with the pipeline proto representation. + */ +package org.apache.beam.runners.fnexecution.graph; diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCodersTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCodersTest.java new file mode 100644 index 000000000000..4cfa5c0c545c --- /dev/null +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCodersTest.java @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2017 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.runners.fnexecution.graph; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; +import org.apache.beam.model.pipeline.v1.RunnerApi.Components; +import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link LengthPrefixUnknownCoders}. */ +@RunWith(Parameterized.class) +public class LengthPrefixUnknownCodersTest { + + private static class UnknownCoder extends CustomCoder { + private static final Coder INSTANCE = new UnknownCoder(); + @Override + public void encode(String value, OutputStream outStream) throws CoderException, IOException { + } + + @Override + public String decode(InputStream inStream) throws CoderException, IOException { + return ""; + } + + @Override + public int hashCode() { + return 1278890232; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof UnknownCoder; + } + } + + @Parameters + public static Collection data() { + return ImmutableList.of( + /** Test wrapping unknown coders with {@code LengthPrefixCoder}. */ + new Object[] { + WindowedValue.getFullCoder( + KvCoder.of(UnknownCoder.INSTANCE, UnknownCoder.INSTANCE), + GlobalWindow.Coder.INSTANCE), + WindowedValue.getFullCoder( + KvCoder.of(LengthPrefixCoder.of(UnknownCoder.INSTANCE), + LengthPrefixCoder.of(UnknownCoder.INSTANCE)), + GlobalWindow.Coder.INSTANCE), + false + }, + /** + * Test bypassing unknown coders that are already wrapped with + * {@code LengthPrefixCoder}. + */ + new Object[] { + WindowedValue.getFullCoder( + KvCoder.of(UnknownCoder.INSTANCE, + LengthPrefixCoder.of(UnknownCoder.INSTANCE)), + GlobalWindow.Coder.INSTANCE), + WindowedValue.getFullCoder( + KvCoder.of(LengthPrefixCoder.of(UnknownCoder.INSTANCE), + LengthPrefixCoder.of(UnknownCoder.INSTANCE)), + GlobalWindow.Coder.INSTANCE), + false + }, + /** Test replacing unknown coders with {@code LengthPrefixCoder}. */ + new Object[] { + WindowedValue.getFullCoder( + KvCoder.of(LengthPrefixCoder.of(UnknownCoder.INSTANCE), + UnknownCoder.INSTANCE), + GlobalWindow.Coder.INSTANCE), + WindowedValue.getFullCoder( + KvCoder.of(LengthPrefixCoder.of(ByteArrayCoder.of()), + LengthPrefixCoder.of(ByteArrayCoder.of())), + GlobalWindow.Coder.INSTANCE), + true + }, + /** Test skipping a top level length prefix coder. */ + new Object[] { + LengthPrefixCoder.of(UnknownCoder.INSTANCE), + LengthPrefixCoder.of(UnknownCoder.INSTANCE), + false + }, + /** Test replacing a top level length prefix coder with byte array coder. */ + new Object[] { + LengthPrefixCoder.of(UnknownCoder.INSTANCE), + LengthPrefixCoder.of(ByteArrayCoder.of()), + true + } + ); + } + + @Parameter + public Coder original; + + @Parameter(1) + public Coder expected; + + @Parameter(2) + public boolean replaceWithByteArray; + + @Test + public void test() throws IOException { + MessageWithComponents originalCoderProto = CoderTranslation.toProto(original); + Components.Builder builder = originalCoderProto.getComponents().toBuilder(); + String coderId = LengthPrefixUnknownCoders.generateUniqueId("rootTestId", + originalCoderProto.getComponents().getCodersMap().keySet()); + builder.putCoders(coderId, originalCoderProto.getCoder()); + MessageWithComponents updatedCoderProto = LengthPrefixUnknownCoders.forCoder( + coderId, builder.build(), replaceWithByteArray); + assertEquals(expected, + CoderTranslation.fromProto(updatedCoderProto.getCoder(), + RehydratedComponents.forComponents(updatedCoderProto.getComponents()))); + } +}