diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 32d7f36a9e281..7af0c164bb219 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -173,7 +173,6 @@ dependencies { compile project(path: ":model:pipeline", configuration: "shadow") compile project(path: ":model:job-management", configuration: "shadow") compile project(":sdks:java:fn-execution") - compile project(path: ":vendor:sdks-java-extensions-protobuf", configuration: "shadow") compile library.java.jackson_databind compile "org.apache.flink:flink-annotations:$flink_version" compile "org.apache.flink:flink-optimizer_2.11:$flink_version" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 42949c074b258..b73ea99f3f4fc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -82,6 +82,7 @@ import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.runners.fnexecution.state.StateRequestHandlers; +import org.apache.beam.runners.fnexecution.wire.ByteStringCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.fn.data.FnDataReceiver; @@ -106,7 +107,6 @@ import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java index 49039a4a86040..b4d02a63992b6 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java @@ -80,6 +80,7 @@ import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.runners.fnexecution.state.StateRequestHandlers; +import org.apache.beam.runners.fnexecution.wire.ByteStringCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -105,7 +106,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle index c5a16b36e54cc..325a9b8a23398 100644 --- a/runners/java-fn-execution/build.gradle +++ b/runners/java-fn-execution/build.gradle @@ -32,7 +32,6 @@ dependencies { compile project(path: ":sdks:java:core", configuration: "shadow") compile project(":sdks:java:fn-execution") compile project(":runners:core-construction-java") - compile project(path: ":vendor:sdks-java-extensions-protobuf", configuration: "shadow") compile library.java.vendored_grpc_1_26_0 compile library.java.slf4j_api compile project(path: ":model:job-management", configuration: "shadow") diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java index f9c2d87113dbb..3c522d38c6e56 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.core.construction.graph.TimerReference; import org.apache.beam.runners.core.construction.graph.UserStateReference; import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; +import org.apache.beam.runners.fnexecution.wire.ByteStringCoder; import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders; import org.apache.beam.runners.fnexecution.wire.WireCoders; import org.apache.beam.sdk.coders.Coder; @@ -61,7 +62,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableTable; -import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder; import org.checkerframework.checker.nullness.qual.Nullable; /** Utility methods for creating {@link ProcessBundleDescriptor} instances. */ diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java index e5e751935f5a7..e8ad54f99ce5a 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java @@ -44,6 +44,7 @@ import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.BagUserStateSpec; import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor; import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.SideInputSpec; +import org.apache.beam.runners.fnexecution.wire.ByteStringCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.fn.stream.DataStreams; @@ -55,7 +56,6 @@ import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder; /** * A set of utility methods which construct {@link StateRequestHandler}s. diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java new file mode 100644 index 0000000000000..90955bc1a816b --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.wire; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; + +/** + * A duplicate of {@link ByteStringCoder} that uses the Apache Beam vendored protobuf. + * + *

For internal use only, no backwards-compatibility guarantees. + */ +@Internal +public class ByteStringCoder extends AtomicCoder { + + public static ByteStringCoder of() { + return INSTANCE; + } + + /** ************************ */ + private static final ByteStringCoder INSTANCE = new ByteStringCoder(); + + private static final TypeDescriptor TYPE_DESCRIPTOR = + new TypeDescriptor() {}; + + private ByteStringCoder() {} + + @Override + public void encode(ByteString value, OutputStream outStream) throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override + public void encode(ByteString value, OutputStream outStream, Context context) + throws IOException, CoderException { + if (value == null) { + throw new CoderException("cannot encode a null ByteString"); + } + + if (!context.isWholeStream) { + // ByteString is not delimited, so write its size before its contents. + VarInt.encode(value.size(), outStream); + } + value.writeTo(outStream); + } + + @Override + public ByteString decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override + public ByteString decode(InputStream inStream, Context context) throws IOException { + if (context.isWholeStream) { + return ByteString.readFrom(inStream); + } + + int size = VarInt.decodeInt(inStream); + // ByteString reads to the end of the input stream, so give it a limited stream of exactly + // the right length. Also set its chunk size so that the ByteString will contain exactly + // one chunk. + return ByteString.readFrom(ByteStreams.limit(inStream, size), size); + } + + @Override + protected long getEncodedElementByteSize(ByteString value) throws Exception { + int size = value.size(); + return (long) VarInt.getLength(size) + size; + } + + @Override + public void verifyDeterministic() {} + + /** + * {@inheritDoc} + * + *

Returns true; the encoded output of two invocations of {@link ByteStringCoder} in the same + * {@link Coder.Context} will be identical if and only if the original {@link ByteString} objects + * are equal according to {@link Object#equals}. + */ + @Override + public boolean consistentWithEquals() { + return true; + } + + /** + * {@inheritDoc} + * + *

Returns true. {@link ByteString#size} returns the size of an array and a {@link VarInt}. + */ + @Override + public boolean isRegisterByteSizeObserverCheap(ByteString value) { + return true; + } + + @Override + public TypeDescriptor getEncodedTypeDescriptor() { + return TYPE_DESCRIPTOR; + } +} diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoderTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoderTest.java new file mode 100644 index 0000000000000..96cad84b8b742 --- /dev/null +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoderTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.wire; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test case for {@link ByteStringCoder}. */ +@RunWith(JUnit4.class) +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +public class ByteStringCoderTest { + + private static final ByteStringCoder TEST_CODER = ByteStringCoder.of(); + + private static final List TEST_STRING_VALUES = + Arrays.asList( + "", + "a", + "13", + "hello", + "a longer string with spaces and all that", + "a string with a \n newline", + "???????????????"); + private static final ImmutableList TEST_VALUES; + + static { + ImmutableList.Builder builder = ImmutableList.builder(); + for (String s : TEST_STRING_VALUES) { + builder.add(ByteString.copyFromUtf8(s)); + } + TEST_VALUES = builder.build(); + } + + /** + * Generated data to check that the wire format has not changed. To regenerate, see {@link + * org.apache.beam.sdk.coders.PrintBase64Encodings}. + */ + private static final List TEST_ENCODINGS = + Arrays.asList( + "", + "YQ", + "MTM", + "aGVsbG8", + "YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA", + "YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ", + "Pz8_Pz8_Pz8_Pz8_Pz8_"); + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testDecodeEncodeEqualInAllContexts() throws Exception { + for (ByteString value : TEST_VALUES) { + CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value); + } + } + + @Test + public void testWireFormatEncode() throws Exception { + CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS); + } + + @Test + public void testCoderDeterministic() throws Throwable { + TEST_CODER.verifyDeterministic(); + } + + @Test + public void testConsistentWithEquals() { + assertTrue(TEST_CODER.consistentWithEquals()); + } + + @Test + public void testEncodeNullThrowsCoderException() throws Exception { + thrown.expect(CoderException.class); + thrown.expectMessage("cannot encode a null ByteString"); + + CoderUtils.encodeToBase64(TEST_CODER, null); + } + + @Test + public void testNestedCoding() throws Throwable { + Coder> listCoder = ListCoder.of(TEST_CODER); + CoderProperties.coderDecodeEncodeContentsEqual(listCoder, TEST_VALUES); + CoderProperties.coderDecodeEncodeContentsInSameOrder(listCoder, TEST_VALUES); + } + + @Test + public void testEncodedElementByteSize() throws Throwable { + for (ByteString value : TEST_VALUES) { + byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, value, Context.NESTED); + assertEquals(encoded.length, TEST_CODER.getEncodedElementByteSize(value)); + } + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(ByteString.class))); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 6ce1c9037a714..e9b4ae96896d3 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -212,7 +212,6 @@ include(":vendor:grpc-1_26_0") include(":vendor:bytebuddy-1_10_8") include(":vendor:calcite-1_20_0") include(":vendor:guava-26_0-jre") -include(":vendor:sdks-java-extensions-protobuf") include(":website") include(":runners:google-cloud-dataflow-java:worker:legacy-worker") include(":runners:google-cloud-dataflow-java:worker") diff --git a/vendor/sdks-java-extensions-protobuf/build.gradle b/vendor/sdks-java-extensions-protobuf/build.gradle deleted file mode 100644 index 1aaefb166ca05..0000000000000 --- a/vendor/sdks-java-extensions-protobuf/build.gradle +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.beam.gradle.GrpcVendoring_1_26_0 - -plugins { id 'org.apache.beam.module' } -applyJavaNature( - automaticModuleName: 'org.apache.beam.vendor.sdks.java.extensions.protobuf', - exportJavadoc: false, - shadowClosure: { - dependencies { - include(dependency("com.google.guava:guava:${GrpcVendoring_1_26_0.guava_version}")) - include(dependency("com.google.protobuf:protobuf-java:${GrpcVendoring_1_26_0.protobuf_version}")) - } - // We specifically relocate beam-sdks-extensions-protobuf under a vendored namespace - // but also vendor guava and protobuf to the same vendored namespace as the model/* - // implementations allowing the artifacts to encode/decode vendored byte strings and - // vendored protobuf messages - relocate "org.apache.beam.sdk.extensions.protobuf", "org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf" - - // guava uses the com.google.common and com.google.thirdparty package namespaces - relocate "com.google.common", "org.apache.beam.vendor.grpc.v1p26p0.com.google.common" - relocate "com.google.thirdparty", "org.apache.beam.vendor.grpc.v1p26p0.com.google.thirdparty" - - relocate "com.google.protobuf", "org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf" - }, -) - -description = "Apache Beam :: Vendored Dependencies :: SDKs :: Java :: Extensions :: Protobuf" -ext.summary = "Add support to Apache Beam for Vendored Google Protobuf." - -/* - * We need to rely on manually specifying these evaluationDependsOn to ensure that - * the following projects are evaluated before we evaluate this project. This is because - * we are attempting to reference the "sourceSets.main.java.srcDirs" directly. - */ -evaluationDependsOn(":sdks:java:extensions:protobuf") - -compileJava { - source project(":sdks:java:extensions:protobuf").sourceSets.main.java.srcDirs -} - -dependencies { - compile "com.google.guava:guava:${GrpcVendoring_1_26_0.guava_version}" - compile "com.google.protobuf:protobuf-java:${GrpcVendoring_1_26_0.protobuf_version}" - shadow project(path: ":sdks:java:core", configuration: "shadow") -}