From d06adf6c75df1005692ce722a34ffdafb979f2f0 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 18 Oct 2017 11:26:48 -0700 Subject: [PATCH] Migrate shared Fn Execution code to Java7 --- runners/java-fn-execution/pom.xml | 14 ------ .../fnexecution/ServerFactoryTest.java | 45 ++++++++++++++---- runners/pom.xml | 2 +- sdks/java/fn-execution/pom.xml | 20 ++------ .../apache/beam/harness/test/Consumer.java | 26 ++++++++++ .../apache/beam/harness/test/Supplier.java | 26 ++++++++++ .../beam/harness/test/TestExecutors.java | 12 ++++- .../beam/harness/test/TestExecutorsTest.java | 29 +++++++++--- .../apache/beam/harness/test/TestStreams.java | 35 +++++++++++--- .../beam/harness/test/TestStreamsTest.java | 47 ++++++++++++++----- .../apache/beam/fn/harness/FnHarnessTest.java | 4 +- .../data/BeamFnDataGrpcClientTest.java | 5 +- .../stream/BufferingStreamObserverTest.java | 2 +- .../stream/DirectStreamObserverTest.java | 2 +- sdks/java/pom.xml | 2 +- 15 files changed, 196 insertions(+), 75 deletions(-) create mode 100644 sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java create mode 100644 sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml index bd4fcf0f9a01..f57c58b89465 100644 --- a/runners/java-fn-execution/pom.xml +++ b/runners/java-fn-execution/pom.xml @@ -32,20 +32,6 @@ jar - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.8 - 1.8 - - - - - org.apache.beam diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java index aa8d2461f236..b78e88a1d18f 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java @@ -39,6 +39,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import org.apache.beam.harness.channel.ManagedChannelFactory; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestStreams; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; @@ -74,24 +75,48 @@ private Endpoints.ApiServiceDescriptor runTestUsing( Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = Endpoints.ApiServiceDescriptor.newBuilder(); - Collection serverElements = new ArrayList<>(); - CountDownLatch clientHangedUp = new CountDownLatch(1); + final Collection serverElements = new ArrayList<>(); + final CountDownLatch clientHangedUp = new CountDownLatch(1); CallStreamObserver serverInboundObserver = - TestStreams.withOnNext(serverElements::add) - .withOnCompleted(clientHangedUp::countDown) - .build(); + TestStreams.withOnNext( + new Consumer() { + @Override + public void accept(Elements item) { + serverElements.add(item); + } + }) + .withOnCompleted( + new Runnable() { + @Override + public void run() { + clientHangedUp.countDown(); + } + }) + .build(); TestDataService service = new TestDataService(serverInboundObserver); Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder); assertFalse(server.isShutdown()); ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build()); BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel); - Collection clientElements = new ArrayList<>(); - CountDownLatch serverHangedUp = new CountDownLatch(1); + final Collection clientElements = new ArrayList<>(); + final CountDownLatch serverHangedUp = new CountDownLatch(1); CallStreamObserver clientInboundObserver = - TestStreams.withOnNext(clientElements::add) - .withOnCompleted(serverHangedUp::countDown) - .build(); + TestStreams.withOnNext( + new Consumer() { + @Override + public void accept(Elements item) { + clientElements.add(item); + } + }) + .withOnCompleted( + new Runnable() { + @Override + public void run() { + serverHangedUp.countDown(); + } + }) + .build(); StreamObserver clientOutboundObserver = stub.data(clientInboundObserver); StreamObserver serverOutboundObserver = service.outboundObservers.take(); diff --git a/runners/pom.xml b/runners/pom.xml index df3faa90315a..47f3c0e0d0f2 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -35,6 +35,7 @@ core-construction-java core-java + java-fn-execution local-artifact-service-java reference direct-java @@ -63,7 +64,6 @@ [1.8,) - java-fn-execution gearpump diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml index 9929c29c22c9..7c203ebbda03 100644 --- a/sdks/java/fn-execution/pom.xml +++ b/sdks/java/fn-execution/pom.xml @@ -27,27 +27,13 @@ beam-sdks-java-fn-execution - Apache Beam :: SDKs :: Java :: Harness Core - Contains code shared across the Beam Java SDK Harness and the Java Runner Harness - libraries. + Apache Beam :: SDKs :: Java :: Fn Execution + Contains code shared across the Beam Java SDK Harness Java Runners to execute using + the Beam Portability Framework jar - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.8 - 1.8 - - - - - org.apache.beam diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java new file mode 100644 index 000000000000..279fc29fc708 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.harness.test; + +/** + * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers. + */ +public interface Consumer { + void accept(T item); +} diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java new file mode 100644 index 000000000000..629afc2a877c --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.harness.test; + +/** + * A fork of the Java 8 Supplier interface, to enable migrations. + */ +public interface Supplier { + T get(); +} diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java index d818a61dbba5..ca12d5aa5bb8 100644 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.ForwardingExecutorService; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.junit.rules.TestRule; import org.junit.runner.Description; import org.junit.runners.model.Statement; @@ -31,6 +30,15 @@ * allows for testing that tasks have exercised the appropriate shutdown logic. */ public class TestExecutors { + public static TestExecutorService from(final ExecutorService staticExecutorService) { + return from(new Supplier() { + @Override + public ExecutorService get() { + return staticExecutorService; + } + }); + } + public static TestExecutorService from(Supplier executorServiceSuppler) { return new FromSupplier(executorServiceSuppler); } @@ -48,7 +56,7 @@ private FromSupplier(Supplier executorServiceSupplier) { } @Override - public Statement apply(Statement statement, Description arg1) { + public Statement apply(final Statement statement, Description arg1) { return new Statement() { @Override public void evaluate() throws Throwable { diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java index 1381b55a4f1d..f0c98e039747 100644 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java @@ -38,14 +38,19 @@ public class TestExecutorsTest { @Test public void testSuccessfulTermination() throws Throwable { ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); + final TestExecutorService testService = TestExecutors.from(service); final AtomicBoolean taskRan = new AtomicBoolean(); testService .apply( new Statement() { @Override public void evaluate() throws Throwable { - testService.submit(() -> taskRan.set(true)); + testService.submit(new Runnable() { + @Override + public void run() { + taskRan.set(true); + } + }); } }, null) @@ -57,7 +62,7 @@ public void evaluate() throws Throwable { @Test public void testTaskBlocksForeverCausesFailure() throws Throwable { ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); + final TestExecutorService testService = TestExecutors.from(service); final AtomicBoolean taskStarted = new AtomicBoolean(); final AtomicBoolean taskWasInterrupted = new AtomicBoolean(); try { @@ -66,7 +71,12 @@ public void testTaskBlocksForeverCausesFailure() throws Throwable { new Statement() { @Override public void evaluate() throws Throwable { - testService.submit(this::taskToRun); + testService.submit(new Runnable() { + @Override + public void run() { + taskToRun(); + } + }); } private void taskToRun() { @@ -94,7 +104,7 @@ private void taskToRun() { @Test public void testStatementFailurePropagatedCleanly() throws Throwable { ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); + final TestExecutorService testService = TestExecutors.from(service); final RuntimeException exceptionToThrow = new RuntimeException(); try { testService @@ -118,7 +128,7 @@ public void evaluate() throws Throwable { public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate() throws Throwable { ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); + final TestExecutorService testService = TestExecutors.from(service); final AtomicBoolean taskStarted = new AtomicBoolean(); final AtomicBoolean taskWasInterrupted = new AtomicBoolean(); final RuntimeException exceptionToThrow = new RuntimeException(); @@ -128,7 +138,12 @@ public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate( new Statement() { @Override public void evaluate() throws Throwable { - testService.submit(this::taskToRun); + testService.submit(new Runnable() { + @Override + public void run() { + taskToRun(); + } + }); throw exceptionToThrow; } diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java index a7b362dfb4d7..3df743a2f09f 100644 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java @@ -20,8 +20,6 @@ import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; -import java.util.function.Consumer; -import java.util.function.Supplier; /** Utility methods which enable testing of {@link StreamObserver}s. */ public class TestStreams { @@ -32,9 +30,9 @@ public class TestStreams { public static Builder withOnNext(Consumer onNext) { return new Builder<>(new ForwardingCallStreamObserver<>( onNext, - TestStreams::noop, - TestStreams::noop, - TestStreams::returnTrue)); + TestStreams.noopConsumer(), + TestStreams.noopRunnable(), + TestStreams.alwaysTrueSupplier())); } /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */ @@ -72,7 +70,7 @@ public Builder withOnCompleted(Runnable onCompleted) { * Returns a new {@link Builder} like this one with the specified * {@link StreamObserver#onError} callback. */ - public Builder withOnError(Runnable onError) { + public Builder withOnError(final Runnable onError) { return new Builder<>(new ForwardingCallStreamObserver<>( observer.onNext, new Consumer() { @@ -102,13 +100,38 @@ public CallStreamObserver build() { private static void noop() { } + private static Runnable noopRunnable() { + return new Runnable() { + @Override + public void run() { + } + }; + } + private static void noop(Throwable t) { } + private static Consumer noopConsumer() { + return new Consumer() { + @Override + public void accept(T item) { + } + }; + } + private static boolean returnTrue() { return true; } + private static Supplier alwaysTrueSupplier() { + return new Supplier() { + @Override + public Boolean get() { + return true; + } + }; + } + /** A {@link CallStreamObserver} which executes the supplied callbacks. */ private static class ForwardingCallStreamObserver extends CallStreamObserver { private final Consumer onNext; diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java index f5741ae3046e..c578397e6162 100644 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java @@ -18,7 +18,6 @@ package org.apache.beam.harness.test; -import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -26,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; +import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -35,8 +35,13 @@ public class TestStreamsTest { @Test public void testOnNextIsCalled() { - AtomicBoolean onNextWasCalled = new AtomicBoolean(); - TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true); + final AtomicBoolean onNextWasCalled = new AtomicBoolean(); + TestStreams.withOnNext(new Consumer() { + @Override + public void accept(Boolean item) { + onNextWasCalled.set(item); + } + }).build().onNext(true); assertTrue(onNextWasCalled.get()); } @@ -44,7 +49,12 @@ public void testOnNextIsCalled() { public void testIsReadyIsCalled() { final AtomicBoolean isReadyWasCalled = new AtomicBoolean(); assertFalse(TestStreams.withOnNext(null) - .withIsReady(() -> isReadyWasCalled.getAndSet(true)) + .withIsReady(new Supplier() { + @Override + public Boolean get() { + return isReadyWasCalled.getAndSet(true); + } + }) .build() .isReady()); assertTrue(isReadyWasCalled.get()); @@ -52,9 +62,14 @@ public void testIsReadyIsCalled() { @Test public void testOnCompletedIsCalled() { - AtomicBoolean onCompletedWasCalled = new AtomicBoolean(); + final AtomicBoolean onCompletedWasCalled = new AtomicBoolean(); TestStreams.withOnNext(null) - .withOnCompleted(() -> onCompletedWasCalled.set(true)) + .withOnCompleted(new Runnable() { + @Override + public void run() { + onCompletedWasCalled.set(true); + } + }) .build() .onCompleted(); assertTrue(onCompletedWasCalled.get()); @@ -63,9 +78,14 @@ public void testOnCompletedIsCalled() { @Test public void testOnErrorRunnableIsCalled() { RuntimeException throwable = new RuntimeException(); - AtomicBoolean onErrorWasCalled = new AtomicBoolean(); + final AtomicBoolean onErrorWasCalled = new AtomicBoolean(); TestStreams.withOnNext(null) - .withOnError(() -> onErrorWasCalled.set(true)) + .withOnError(new Runnable() { + @Override + public void run() { + onErrorWasCalled.set(true); + } + }) .build() .onError(throwable); assertTrue(onErrorWasCalled.get()); @@ -74,11 +94,16 @@ public void testOnErrorRunnableIsCalled() { @Test public void testOnErrorConsumerIsCalled() { RuntimeException throwable = new RuntimeException(); - Collection onErrorWasCalled = new ArrayList<>(); + final Collection onErrorWasCalled = new ArrayList<>(); TestStreams.withOnNext(null) - .withOnError(onErrorWasCalled::add) + .withOnError(new Consumer() { + @Override + public void accept(Throwable item) { + onErrorWasCalled.add(item); + } + }) .build() .onError(throwable); - assertThat(onErrorWasCalled, contains(throwable)); + assertThat(onErrorWasCalled, Matchers.contains(throwable)); } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java index 66c31a8e001e..c926414d7718 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java @@ -28,7 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.function.Consumer; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestStreams; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest; @@ -91,7 +91,7 @@ public void run() { responseObserver.onCompleted(); } }); - return TestStreams.withOnNext(new Consumer() { + return TestStreams.withOnNext(new Consumer() { @Override public void accept(InstructionResponse t) { instructionResponses.add(t); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java index 7df892580de5..9e2139809feb 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java @@ -41,12 +41,13 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestStreams; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; import org.apache.beam.sdk.coders.Coder; @@ -263,7 +264,7 @@ public void testForOutboundConsumer() throws Exception { Collection inboundServerValues = new ConcurrentLinkedQueue<>(); CallStreamObserver inboundServerObserver = TestStreams.withOnNext( - new Consumer() { + new Consumer() { @Override public void accept(BeamFnApi.Elements t) { inboundServerValues.add(t); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java index 3f66c4c68c6f..96648e92c3b3 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestExecutors; import org.apache.beam.harness.test.TestExecutors.TestExecutorService; import org.apache.beam.harness.test.TestStreams; diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java index 120a73d8d365..05d8d5ae1dae 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestExecutors; import org.apache.beam.harness.test.TestExecutors.TestExecutorService; import org.apache.beam.harness.test.TestStreams; diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index 62e4ec3e1306..c6ab2349f4dd 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -40,6 +40,7 @@ io maven-archetypes extensions + fn-execution @@ -53,7 +54,6 @@ [1.8,) - fn-execution harness container java8tests