Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11804] Remove vendors/sdk-java-extensions-protobuf #13968

Merged
merged 1 commit into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ dependencies {
compile project(path: ":model:pipeline", configuration: "shadow")
compile project(path: ":model:job-management", configuration: "shadow")
compile project(":sdks:java:fn-execution")
compile project(path: ":vendor:sdks-java-extensions-protobuf", configuration: "shadow")
compile library.java.jackson_databind
compile "org.apache.flink:flink-annotations:$flink_version"
compile "org.apache.flink:flink-optimizer_2.11:$flink_version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.wire.ByteStringCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
Expand All @@ -106,7 +107,6 @@
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.wire.ByteStringCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand All @@ -105,7 +106,6 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down
1 change: 0 additions & 1 deletion runners/java-fn-execution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ dependencies {
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:fn-execution")
compile project(":runners:core-construction-java")
compile project(path: ":vendor:sdks-java-extensions-protobuf", configuration: "shadow")
compile library.java.vendored_grpc_1_26_0
compile library.java.slf4j_api
compile project(path: ":model:job-management", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.runners.core.construction.graph.UserStateReference;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.wire.ByteStringCoder;
import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -61,7 +62,6 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableTable;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Utility methods for creating {@link ProcessBundleDescriptor} instances. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.BagUserStateSpec;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.SideInputSpec;
import org.apache.beam.runners.fnexecution.wire.ByteStringCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.stream.DataStreams;
Expand All @@ -55,7 +56,6 @@
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;

/**
* A set of utility methods which construct {@link StateRequestHandler}s.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.wire;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;

/**
* A duplicate of {@link ByteStringCoder} that uses the Apache Beam vendored protobuf.
*
* <p>For internal use only, no backwards-compatibility guarantees.
*/
@Internal
public class ByteStringCoder extends AtomicCoder<ByteString> {

public static ByteStringCoder of() {
return INSTANCE;
}

/** ************************ */
private static final ByteStringCoder INSTANCE = new ByteStringCoder();

private static final TypeDescriptor<ByteString> TYPE_DESCRIPTOR =
new TypeDescriptor<ByteString>() {};

private ByteStringCoder() {}

@Override
public void encode(ByteString value, OutputStream outStream) throws IOException, CoderException {
encode(value, outStream, Context.NESTED);
}

@Override
public void encode(ByteString value, OutputStream outStream, Context context)
throws IOException, CoderException {
if (value == null) {
throw new CoderException("cannot encode a null ByteString");
}

if (!context.isWholeStream) {
// ByteString is not delimited, so write its size before its contents.
VarInt.encode(value.size(), outStream);
}
value.writeTo(outStream);
}

@Override
public ByteString decode(InputStream inStream) throws IOException {
return decode(inStream, Context.NESTED);
}

@Override
public ByteString decode(InputStream inStream, Context context) throws IOException {
if (context.isWholeStream) {
return ByteString.readFrom(inStream);
}

int size = VarInt.decodeInt(inStream);
// ByteString reads to the end of the input stream, so give it a limited stream of exactly
// the right length. Also set its chunk size so that the ByteString will contain exactly
// one chunk.
return ByteString.readFrom(ByteStreams.limit(inStream, size), size);
}

@Override
protected long getEncodedElementByteSize(ByteString value) throws Exception {
int size = value.size();
return (long) VarInt.getLength(size) + size;
}

@Override
public void verifyDeterministic() {}

/**
* {@inheritDoc}
*
* <p>Returns true; the encoded output of two invocations of {@link ByteStringCoder} in the same
* {@link Coder.Context} will be identical if and only if the original {@link ByteString} objects
* are equal according to {@link Object#equals}.
*/
@Override
public boolean consistentWithEquals() {
return true;
}

/**
* {@inheritDoc}
*
* <p>Returns true. {@link ByteString#size} returns the size of an array and a {@link VarInt}.
*/
@Override
public boolean isRegisterByteSizeObserverCheap(ByteString value) {
return true;
}

@Override
public TypeDescriptor<ByteString> getEncodedTypeDescriptor() {
return TYPE_DESCRIPTOR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.wire;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Test case for {@link ByteStringCoder}. */
@RunWith(JUnit4.class)
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class ByteStringCoderTest {

private static final ByteStringCoder TEST_CODER = ByteStringCoder.of();

private static final List<String> TEST_STRING_VALUES =
Arrays.asList(
"",
"a",
"13",
"hello",
"a longer string with spaces and all that",
"a string with a \n newline",
"???????????????");
private static final ImmutableList<ByteString> TEST_VALUES;

static {
ImmutableList.Builder<ByteString> builder = ImmutableList.builder();
for (String s : TEST_STRING_VALUES) {
builder.add(ByteString.copyFromUtf8(s));
}
TEST_VALUES = builder.build();
}

/**
* Generated data to check that the wire format has not changed. To regenerate, see {@link
* org.apache.beam.sdk.coders.PrintBase64Encodings}.
*/
private static final List<String> TEST_ENCODINGS =
Arrays.asList(
"",
"YQ",
"MTM",
"aGVsbG8",
"YSBsb25nZXIgc3RyaW5nIHdpdGggc3BhY2VzIGFuZCBhbGwgdGhhdA",
"YSBzdHJpbmcgd2l0aCBhIAogbmV3bGluZQ",
"Pz8_Pz8_Pz8_Pz8_Pz8_");

@Rule public ExpectedException thrown = ExpectedException.none();

@Test
public void testDecodeEncodeEqualInAllContexts() throws Exception {
for (ByteString value : TEST_VALUES) {
CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
}
}

@Test
public void testWireFormatEncode() throws Exception {
CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
}

@Test
public void testCoderDeterministic() throws Throwable {
TEST_CODER.verifyDeterministic();
}

@Test
public void testConsistentWithEquals() {
assertTrue(TEST_CODER.consistentWithEquals());
}

@Test
public void testEncodeNullThrowsCoderException() throws Exception {
thrown.expect(CoderException.class);
thrown.expectMessage("cannot encode a null ByteString");

CoderUtils.encodeToBase64(TEST_CODER, null);
}

@Test
public void testNestedCoding() throws Throwable {
Coder<List<ByteString>> listCoder = ListCoder.of(TEST_CODER);
CoderProperties.coderDecodeEncodeContentsEqual(listCoder, TEST_VALUES);
CoderProperties.coderDecodeEncodeContentsInSameOrder(listCoder, TEST_VALUES);
}

@Test
public void testEncodedElementByteSize() throws Throwable {
for (ByteString value : TEST_VALUES) {
byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, value, Context.NESTED);
assertEquals(encoded.length, TEST_CODER.getEncodedElementByteSize(value));
}
}

@Test
public void testEncodedTypeDescriptor() throws Exception {
assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(ByteString.class)));
}
}
1 change: 0 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ include(":vendor:grpc-1_26_0")
include(":vendor:bytebuddy-1_10_8")
include(":vendor:calcite-1_20_0")
include(":vendor:guava-26_0-jre")
include(":vendor:sdks-java-extensions-protobuf")
include(":website")
include(":runners:google-cloud-dataflow-java:worker:legacy-worker")
include(":runners:google-cloud-dataflow-java:worker")
Expand Down
Loading