From c9b5c577cda0f206ab9d4a2cd343e94c0d25a557 Mon Sep 17 00:00:00 2001 From: Kirk Lund Date: Tue, 3 Dec 2019 10:31:31 -0800 Subject: [PATCH] GEODE-7526: Add new DistributedExecutorServiceRule JUnit Rule JUnit Rule that provides an ExecutorService in every distributed test VM including the main JUnit controller VM. See the javadocs on DistributedExecutorServiceRule for more info. --- .../DistributedExecutorServiceRuleTest.java | 170 ++++++++++++++++ .../rules/DistributedExecutorServiceRule.java | 189 ++++++++++++++++++ .../junit/rules/ExecutorServiceRuleTest.java | 2 +- 3 files changed, 360 insertions(+), 1 deletion(-) create mode 100644 geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleTest.java create mode 100644 geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleTest.java new file mode 100644 index 000000000000..4640026462a6 --- /dev/null +++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.test.dunit.rules.tests; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.apache.geode.test.dunit.VM.getVMCount; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule; +import org.apache.geode.test.dunit.rules.DistributedRule; + +@SuppressWarnings("serial") +public class DistributedExecutorServiceRuleTest implements Serializable { + + private static final long TIMEOUT_MILLIS = GeodeAwaitility.getTimeout().getValueInMS(); + + private static final AtomicReference executorService = new AtomicReference<>(); + private static final AtomicReference latch = new AtomicReference<>(); + private static final AtomicReference> voidFuture = new AtomicReference<>(); + private static final AtomicReference> booleanFuture = new AtomicReference<>(); + + @Rule + public DistributedRule distributedRule = new DistributedRule(); + + @Rule + public DistributedExecutorServiceRule executorServiceRule = new DistributedExecutorServiceRule(); + + @Test + public void eachVmHasAnExecutorService() { + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + executorService.set(executorServiceRule.getExecutorService()); + }); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + assertThat(executorService.get()).isNotNull(); + }); + } + } + + @Test + public void eachVmAwaitsItsOwnVoidFuture() { + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + latch.set(new CountDownLatch(1)); + }); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> voidFuture.set(executorServiceRule.submit(() -> { + latch.get().await(TIMEOUT_MILLIS, MILLISECONDS); + }))); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + assertThat(voidFuture.get().isDone()).isFalse(); + }); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> voidFuture.set(executorServiceRule.submit(() -> { + latch.get().countDown(); + }))); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> voidFuture.set(executorServiceRule.submit(() -> { + assertThat(voidFuture.get().isDone()).isTrue(); + }))); + } + } + + @Test + public void eachVmAwaitsItsOwnBooleanFuture() { + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + latch.set(new CountDownLatch(1)); + }); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> booleanFuture.set(executorServiceRule.submit(() -> { + return latch.get().await(TIMEOUT_MILLIS, MILLISECONDS); + }))); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + assertThat(booleanFuture.get().isDone()).isFalse(); + }); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> voidFuture.set(executorServiceRule.submit(() -> { + latch.get().countDown(); + }))); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> voidFuture.set(executorServiceRule.submit(() -> { + assertThat(latch.get().getCount()).isZero(); + }))); + } + } + + @Test + public void eachVmCompletesIndependently() { + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + latch.set(new CountDownLatch(1)); + }); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> voidFuture.set(executorServiceRule.submit(() -> { + latch.get().await(TIMEOUT_MILLIS, MILLISECONDS); + }))); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + assertThat(voidFuture.get().isDone()).isFalse(); + }); + } + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> voidFuture.set(executorServiceRule.submit(() -> { + latch.get().countDown(); + + assertThat(voidFuture.get().isDone()).isTrue(); + }))); + + for (int j = 0; j < getVMCount(); j++) { + if (j == i) { + continue; + } + + getVM(i).invoke(() -> voidFuture.set(executorServiceRule.submit(() -> { + assertThat(voidFuture.get().isDone()).isFalse(); + }))); + } + } + } +} diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java new file mode 100644 index 000000000000..44f7de5f718e --- /dev/null +++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.test.dunit.rules; + +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; +import org.apache.geode.test.junit.rules.ExecutorServiceRule.ThrowingRunnable; + +@SuppressWarnings("unused") +public class DistributedExecutorServiceRule extends AbstractDistributedRule { + + private static final AtomicReference delegate = new AtomicReference<>(); + + public DistributedExecutorServiceRule() { + // default vmCount + } + + public DistributedExecutorServiceRule(int vmCount) { + super(vmCount); + } + + public ExecutorService getExecutorService() { + return delegate.get().getExecutorService(); + } + + /** + * Executes the given command at some time in the future. + * + * @param command the runnable task + * @throws RejectedExecutionException if this task cannot be accepted for execution + * @throws NullPointerException if command is null + */ + public void execute(ThrowingRunnable command) { + delegate.get().execute(command); + } + + /** + * Submits a value-returning task for execution and returns a Future representing the pending + * results of the task. The Future's {@code get} method will return the task's result upon + * successful completion. + * + *

+ * If you would like to immediately block waiting for a task, you can use constructions of the + * form {@code result = exec.submit(aCallable).get();} + * + * @param task the task to submit + * @param the type of the task's result + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be scheduled for execution + * @throws NullPointerException if the task is null + */ + public Future submit(Callable task) { + return delegate.get().submit(task); + } + + /** + * Submits a Runnable task for execution and returns a Future representing that task. The Future's + * {@code get} method will return the given result upon successful completion. + * + * @param task the task to submit + * @param result the result to return + * @param the type of the result + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be scheduled for execution + * @throws NullPointerException if the task is null + */ + public Future submit(ThrowingRunnable task, T result) { + return delegate.get().submit(task, result); + } + + /** + * Submits a Runnable task for execution and returns a Future representing that task. The Future's + * {@code get} method will return {@code null} upon successful completion. + * + * @param task the task to submit + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be scheduled for execution + * @throws NullPointerException if the task is null + */ + public Future submit(ThrowingRunnable task) { + return delegate.get().submit(task); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed by a task running in the + * dedicated executor after it runs the given action. + * + * @param runnable the action to run before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public CompletableFuture runAsync(Runnable runnable) { + return delegate.get().runAsync(runnable); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed by a task running in the + * dedicated executor with the value obtained by calling the given Supplier. + * + * @param supplier a function returning the value to be used to complete the returned + * CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + */ + public CompletableFuture supplyAsync(Supplier supplier) { + return delegate.get().supplyAsync(supplier); + } + + /** + * Returns the {@code Thread}s that are directly in the {@code ExecutorService}'s + * {@code ThreadGroup} excluding subgroups. + */ + public Set getThreads() { + return delegate.get().getThreads(); + } + + /** + * Returns an array of {@code Thread Ids} that are directly in the {@code ExecutorService}'s + * {@code ThreadGroup} excluding subgroups. {@code long[]} is returned to facilitate using JDK + * APIs such as {@code ThreadMXBean#getThreadInfo(long[], int)}. + */ + public long[] getThreadIds() { + return delegate.get().getThreadIds(); + } + + /** + * Returns thread dumps for the {@code Thread}s that are in the {@code ExecutorService}'s + * {@code ThreadGroup} excluding subgroups. + */ + public String dumpThreads() { + return delegate.get().dumpThreads(); + } + + @Override + public void before() throws Exception { + invoker().invokeInEveryVMAndController(() -> invokeBefore()); + } + + @Override + public void after() { + invoker().invokeInEveryVMAndController(() -> invokeAfter()); + } + + @Override + protected void afterCreateVM(VM vm) { + vm.invoke(() -> invokeBefore()); + } + + @Override + protected void afterBounceVM(VM vm) { + vm.invoke(() -> invokeBefore()); + } + + private void invokeBefore() throws Exception { + try { + delegate.set(new ExecutorServiceRule()); + delegate.get().before(); + } catch (Throwable throwable) { + if (throwable instanceof Exception) { + throw (Exception) throwable; + } + throw new RuntimeException(throwable); + } + } + + private void invokeAfter() { + delegate.get().after(); + } +} diff --git a/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java index ce2481d90fe5..4c6f1467fb7a 100644 --- a/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java +++ b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java @@ -125,7 +125,7 @@ public abstract static class HasExecutorServiceRule { public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); @Before - public void setUpHasAsynchronousRule() { + public void setUpHasExecutorServiceRule() { executorService = executorServiceRule.getExecutorService(); } }