From 5f5cf7d1e77aeb8a59d31f0390d129dccd5e1cb8 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 26 Jul 2017 15:34:23 -0700 Subject: [PATCH] Remove References to CloudObject from the Java Harness Migrates to using the shared Runner API definitions. --- .../beam/fn/harness/BeamFnDataReadRunner.java | 27 +++++++---------- .../fn/harness/BeamFnDataWriteRunner.java | 22 +++++--------- .../fn/harness/BeamFnDataReadRunnerTest.java | 28 +++++++++--------- .../fn/harness/BeamFnDataWriteRunnerTest.java | 24 ++++++--------- .../beam/fn/harness/FnApiDoFnRunnerTest.java | 29 ------------------- 5 files changed, 41 insertions(+), 89 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java index e2c17b0b8ef9..1e611db2b169 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java @@ -24,7 +24,6 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -35,8 +34,8 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObject; -import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; @@ -91,8 +90,9 @@ public BeamFnDataReadRunner createRunnerForPTransform( .setPrimitiveTransformReference(pTransformId) .setName(getOnlyElement(pTransform.getOutputsMap().keySet())) .build(); - RunnerApi.Coder coderSpec = coders.get(pCollections.get( - getOnlyElement(pTransform.getOutputsMap().values())).getCoderId()); + RunnerApi.Coder coderSpec = + coders.get( + pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId()); Collection>> consumers = (Collection) pCollectionIdsToConsumers.get( getOnlyElement(pTransform.getOutputsMap().values())); @@ -102,6 +102,7 @@ public BeamFnDataReadRunner createRunnerForPTransform( processBundleInstructionId, target, coderSpec, + coders, beamFnDataClient, consumers); addStartFunction.accept(runner::registerInputLocation); @@ -124,6 +125,7 @@ public BeamFnDataReadRunner createRunnerForPTransform( Supplier processBundleInstructionIdSupplier, BeamFnApi.Target inputTarget, RunnerApi.Coder coderSpec, + Map coders, BeamFnDataClient beamFnDataClientFactory, Collection>> consumers) throws IOException { @@ -137,17 +139,10 @@ public BeamFnDataReadRunner createRunnerForPTransform( @SuppressWarnings("unchecked") Coder> coder = (Coder>) - CloudObjects.coderFromCloudObject( - CloudObject.fromSpec( - OBJECT_MAPPER.readValue( - coderSpec - .getSpec() - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .newInput(), - Map.class))); + CoderTranslation.fromProto( + coderSpec, + RehydratedComponents.forComponents( + RunnerApi.Components.newBuilder().putAllCoders(coders).build())); this.coder = coder; } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java index eec4dfde83c8..bbed75301bd8 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java @@ -24,7 +24,6 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.Map; import java.util.function.Consumer; @@ -34,8 +33,8 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObject; -import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; @@ -93,6 +92,7 @@ public BeamFnDataWriteRunner createRunnerForPTransform( processBundleInstructionId, target, coderSpec, + coders, beamFnDataClient); addStartFunction.accept(runner::registerForOutput); pCollectionIdsToConsumers.put( @@ -117,6 +117,7 @@ public BeamFnDataWriteRunner createRunnerForPTransform( Supplier processBundleInstructionIdSupplier, BeamFnApi.Target outputTarget, RunnerApi.Coder coderSpec, + Map coders, BeamFnDataClient beamFnDataClientFactory) throws IOException { this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class) @@ -128,17 +129,10 @@ public BeamFnDataWriteRunner createRunnerForPTransform( @SuppressWarnings("unchecked") Coder> coder = (Coder>) - CloudObjects.coderFromCloudObject( - CloudObject.fromSpec( - OBJECT_MAPPER.readValue( - coderSpec - .getSpec() - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .newInput(), - Map.class))); + CoderTranslation.fromProto( + coderSpec, + RehydratedComponents.forComponents( + RunnerApi.Components.newBuilder().putAllCoders(coders).build())); this.coder = coder; } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java index a7c66663bbd8..d712f5fa4738 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java @@ -30,7 +30,6 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; @@ -39,8 +38,6 @@ import com.google.common.collect.Multimap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -56,10 +53,11 @@ import org.apache.beam.fn.harness.test.TestExecutors; import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -79,7 +77,6 @@ @RunWith(JUnit4.class) public class BeamFnDataReadRunnerTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder() @@ -88,19 +85,19 @@ public class BeamFnDataReadRunnerTest { WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); private static final String CODER_SPEC_ID = "string-coder-id"; private static final RunnerApi.Coder CODER_SPEC; + private static final RunnerApi.Components COMPONENTS; private static final String URN = "urn:org.apache.beam:source:runner:0.1"; static { try { - CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec( - RunnerApi.SdkFunctionSpec.newBuilder().setSpec( - RunnerApi.FunctionSpec.newBuilder().setParameter( - Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))) - .build())) - .build()) - .build()) - .build(); + MessageWithComponents coderAndComponents = CoderTranslation.toProto(CODER); + CODER_SPEC = coderAndComponents.getCoder(); + COMPONENTS = + coderAndComponents + .getComponents() + .toBuilder() + .putCoders(CODER_SPEC_ID, CODER_SPEC) + .build(); } catch (IOException e) { throw new ExceptionInInitializerError(e); } @@ -150,7 +147,7 @@ public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception { Suppliers.ofInstance(bundleId)::get, ImmutableMap.of("outputPC", RunnerApi.PCollection.newBuilder().setCoderId(CODER_SPEC_ID).build()), - ImmutableMap.of(CODER_SPEC_ID, CODER_SPEC), + COMPONENTS.getCodersMap(), consumers, startFunctions::add, finishFunctions::add); @@ -200,6 +197,7 @@ public void testReuseForMultipleBundles() throws Exception { bundleId::get, INPUT_TARGET, CODER_SPEC, + COMPONENTS.getCodersMap(), mockBeamFnDataClient, ImmutableList.of(valuesA::add, valuesB::add)); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java index 28838b127be2..0caf19e318ef 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java @@ -32,15 +32,12 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -53,10 +50,11 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -74,7 +72,6 @@ @RunWith(JUnit4.class) public class BeamFnDataWriteRunnerTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder() @@ -83,19 +80,15 @@ public class BeamFnDataWriteRunnerTest { private static final Coder> CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); private static final RunnerApi.Coder CODER_SPEC; + private static final RunnerApi.Components COMPONENTS; private static final String URN = "urn:org.apache.beam:sink:runner:0.1"; static { try { - CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec( - RunnerApi.SdkFunctionSpec.newBuilder().setSpec( - RunnerApi.FunctionSpec.newBuilder().setParameter( - Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))) - .build())) - .build()) - .build()) - .build(); + MessageWithComponents coderAndComponents = CoderTranslation.toProto(CODER); + CODER_SPEC = coderAndComponents.getCoder(); + COMPONENTS = + coderAndComponents.getComponents().toBuilder().putCoders(CODER_ID, CODER_SPEC).build(); } catch (IOException e) { throw new ExceptionInInitializerError(e); } @@ -140,7 +133,7 @@ public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception { Suppliers.ofInstance(bundleId)::get, ImmutableMap.of("inputPC", RunnerApi.PCollection.newBuilder().setCoderId(CODER_ID).build()), - ImmutableMap.of(CODER_ID, CODER_SPEC), + COMPONENTS.getCodersMap(), consumers, startFunctions::add, finishFunctions::add); @@ -201,6 +194,7 @@ public void testReuseForMultipleBundles() throws Exception { bundleId::get, OUTPUT_TARGET, CODER_SPEC, + COMPONENTS.getCodersMap(), mockBeamFnDataClient); // Process for bundle id 0 diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 98362a22a934..e269bcc59567 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; @@ -35,19 +34,14 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; -import com.google.protobuf.Message; -import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.ServiceLoader; import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.runners.core.construction.ParDoTranslation; -import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.DoFnInfo; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -66,28 +60,6 @@ /** Tests for {@link FnApiDoFnRunner}. */ @RunWith(JUnit4.class) public class FnApiDoFnRunnerTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final Coder> STRING_CODER = - WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); - private static final String STRING_CODER_SPEC_ID = "999L"; - private static final RunnerApi.Coder STRING_CODER_SPEC; - - static { - try { - STRING_CODER_SPEC = RunnerApi.Coder.newBuilder() - .setSpec(RunnerApi.SdkFunctionSpec.newBuilder() - .setSpec(RunnerApi.FunctionSpec.newBuilder() - .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER)))) - .build()))) - .build()) - .build(); - } catch (IOException e) { - throw new ExceptionInInitializerError(e); - } - } - private static class TestDoFn extends DoFn { private static final TupleTag mainOutput = new TupleTag<>("mainOutput"); private static final TupleTag additionalOutput = new TupleTag<>("output"); @@ -117,7 +89,6 @@ public void finishBundle(FinishBundleContext context) { */ @Test public void testCreatingAndProcessingDoFn() throws Exception { - Map fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); String pTransformId = "pTransformId"; String mainOutputId = "101"; String additionalOutputId = "102";