From 4948c8116e0568eeae5d5c4197d4b12e2f6072e6 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 11 Oct 2018 18:23:20 +0200 Subject: [PATCH 1/6] SEP Worker shutdown --- .../cassandra/concurrent/SEPWorker.java | 51 ++++++++++++- .../concurrent/SharedExecutorPool.java | 16 ++++- .../cassandra/concurrent/SEPExecutorTest.java | 71 +++++++++++++++++++ 3 files changed, 136 insertions(+), 2 deletions(-) create mode 100644 test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index 4549b4866cf1..82c09ce80e54 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.concurrent; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -33,6 +34,7 @@ final class SEPWorker extends AtomicReference implements Runnabl private static final Logger logger = LoggerFactory.getLogger(SEPWorker.class); private static final boolean SET_THREAD_NAME = Boolean.parseBoolean(System.getProperty("cassandra.set_sep_thread_name", "true")); + final CountDownLatch stopLatch = new CountDownLatch(1); final Long workerId; final Thread thread; final SharedExecutorPool pool; @@ -127,6 +129,10 @@ public void run() startSpinning(); } } + catch (PoolStoppedException e) + { + stopLatch.countDown(); + } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); @@ -180,6 +186,14 @@ boolean assign(Work work, boolean self) return false; } + void kill() throws InterruptedException + { + set(Work.DEAD); + LockSupport.unpark(thread); + stopLatch.await(); + thread.join(); + } + // try to assign ourselves an executor with work available private boolean selfAssign() { @@ -338,12 +352,43 @@ private boolean isStopped() * -> SPINNING|(ASSIGNED) */ - static final class Work + static class Work { static final Work STOP_SIGNALLED = new Work(); static final Work STOPPED = new Work(); static final Work SPINNING = new Work(); static final Work WORKING = new Work(); + static final Work DEAD = new Work () { + boolean canAssign(boolean self) + { + throw new PoolStoppedException(); + } + + boolean isSpinning() + { + throw new PoolStoppedException(); + } + + boolean isWorking() + { + throw new PoolStoppedException(); + } + + boolean isStop() + { + throw new PoolStoppedException(); + } + + boolean isStopped() + { + throw new PoolStoppedException(); + } + + boolean isAssigned() + { + throw new PoolStoppedException(); + } + }; final SEPExecutor assigned; @@ -390,4 +435,8 @@ boolean isAssigned() return assigned != null; } } + + private static final class PoolStoppedException extends RuntimeException + { + } } diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java index 3b0600fa03d8..44d44aa13f14 100644 --- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java +++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -73,6 +74,7 @@ public class SharedExecutorPool final ConcurrentSkipListMap spinning = new ConcurrentSkipListMap<>(); // the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last final ConcurrentSkipListMap descheduled = new ConcurrentSkipListMap<>(); + final List workers = new CopyOnWriteArrayList<>(); public SharedExecutorPool(String poolName) { @@ -91,7 +93,7 @@ void schedule(Work work) return; if (!work.isStop()) - new SEPWorker(workerId.incrementAndGet(), work, this); + workers.add(new SEPWorker(workerId.incrementAndGet(), work, this)); } void maybeStartSpinningWorker() @@ -109,4 +111,16 @@ public LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTa executors.add(executor); return executor; } + + public void shutdown() throws InterruptedException + { + for (SEPExecutor executor : executors) + { + executor.shutdownNow(); + executor.awaitTermination(60, TimeUnit.SECONDS); + } + + for (SEPWorker worker : workers) + worker.kill(); + } } diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java new file mode 100644 index 000000000000..da9cf4d3001c --- /dev/null +++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.junit.Assert; +import org.junit.Test; + +public class SEPExecutorTest +{ + @Test + public void shutdownTest() throws Throwable + { + for (int i = 0; i < 1000; i++) + { + shutdownOnce(i); + } + } + + private static void shutdownOnce(int run) throws Throwable + { + List list = new ArrayList<>(); + SharedExecutorPool SHARED = new SharedExecutorPool("SharedPool"); + String MAGIC = "IRREPETABLE_MAGIC_STRING"; + OutputStream nullOutputStream = new OutputStream() { + public void write(int b) throws IOException { } + }; + PrintStream nullPrintSteam = new PrintStream(nullOutputStream); + + for (int idx = 0; idx < 20; idx++) + { + ExecutorService es = SHARED.newExecutor(10, Integer.MAX_VALUE, "STAGE", run + MAGIC + idx); + list.add(es); + es.execute(() -> nullPrintSteam.println("TEST" + es)); + } + + SHARED.shutdown(); + for (Thread thread : Thread.getAllStackTraces().keySet()) + { + if (thread.toString().contains("STAGE")) + { + System.out.println(thread); + System.out.println(Arrays.toString(thread.getStackTrace())); + Assert.fail(); + } + } + } +} From daaaeb40adf236473180000f2253a26ea010de2c Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 11 Oct 2018 18:27:34 +0200 Subject: [PATCH 2/6] Get rid of useless latch, cleanup the test --- src/java/org/apache/cassandra/concurrent/SEPWorker.java | 5 +---- .../org/apache/cassandra/concurrent/SEPExecutorTest.java | 4 +--- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index 82c09ce80e54..405c49047b1c 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.concurrent; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -34,7 +33,6 @@ final class SEPWorker extends AtomicReference implements Runnabl private static final Logger logger = LoggerFactory.getLogger(SEPWorker.class); private static final boolean SET_THREAD_NAME = Boolean.parseBoolean(System.getProperty("cassandra.set_sep_thread_name", "true")); - final CountDownLatch stopLatch = new CountDownLatch(1); final Long workerId; final Thread thread; final SharedExecutorPool pool; @@ -131,7 +129,7 @@ public void run() } catch (PoolStoppedException e) { - stopLatch.countDown(); + // Fall-through } catch (Throwable t) { @@ -190,7 +188,6 @@ void kill() throws InterruptedException { set(Work.DEAD); LockSupport.unpark(thread); - stopLatch.await(); thread.join(); } diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java index da9cf4d3001c..29a030480ee2 100644 --- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java +++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java @@ -42,7 +42,6 @@ public void shutdownTest() throws Throwable private static void shutdownOnce(int run) throws Throwable { - List list = new ArrayList<>(); SharedExecutorPool SHARED = new SharedExecutorPool("SharedPool"); String MAGIC = "IRREPETABLE_MAGIC_STRING"; OutputStream nullOutputStream = new OutputStream() { @@ -53,14 +52,13 @@ public void write(int b) throws IOException { } for (int idx = 0; idx < 20; idx++) { ExecutorService es = SHARED.newExecutor(10, Integer.MAX_VALUE, "STAGE", run + MAGIC + idx); - list.add(es); es.execute(() -> nullPrintSteam.println("TEST" + es)); } SHARED.shutdown(); for (Thread thread : Thread.getAllStackTraces().keySet()) { - if (thread.toString().contains("STAGE")) + if (thread.toString().contains(MAGIC)) { System.out.println(thread); System.out.println(Arrays.toString(thread.getStackTrace())); From e2f9392823fe325f20a89c63e49fbeda617c0342 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 11 Oct 2018 18:48:14 +0200 Subject: [PATCH 3/6] Get rid of exceptions --- .../cassandra/concurrent/SEPWorker.java | 55 +++++-------------- .../cassandra/concurrent/SEPExecutorTest.java | 3 +- 2 files changed, 16 insertions(+), 42 deletions(-) diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index 405c49047b1c..948d5b6247f5 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -74,6 +74,9 @@ public void run() { while (true) { + if (isDead()) + return; + if (isSpinning() && !selfAssign()) { doWaitSpin(); @@ -127,10 +130,6 @@ public void run() startSpinning(); } } - catch (PoolStoppedException e) - { - // Fall-through - } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); @@ -310,6 +309,11 @@ private boolean isSpinning() return get().isSpinning(); } + private boolean isDead() + { + return get().isDead(); + } + private boolean stop() { return get().isStop() && compareAndSet(Work.STOP_SIGNALLED, Work.STOPPED); @@ -349,43 +353,13 @@ private boolean isStopped() * -> SPINNING|(ASSIGNED) */ - static class Work + static final class Work { static final Work STOP_SIGNALLED = new Work(); static final Work STOPPED = new Work(); static final Work SPINNING = new Work(); static final Work WORKING = new Work(); - static final Work DEAD = new Work () { - boolean canAssign(boolean self) - { - throw new PoolStoppedException(); - } - - boolean isSpinning() - { - throw new PoolStoppedException(); - } - - boolean isWorking() - { - throw new PoolStoppedException(); - } - - boolean isStop() - { - throw new PoolStoppedException(); - } - - boolean isStopped() - { - throw new PoolStoppedException(); - } - - boolean isAssigned() - { - throw new PoolStoppedException(); - } - }; + static final Work DEAD = new Work (); final SEPExecutor assigned; @@ -427,13 +401,14 @@ boolean isStopped() return this == Work.STOPPED; } + boolean isDead() + { + return this == Work.DEAD; + } + boolean isAssigned() { return assigned != null; } } - - private static final class PoolStoppedException extends RuntimeException - { - } } diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java index 29a030480ee2..22e3c3bce39e 100644 --- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java +++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java @@ -61,8 +61,7 @@ public void write(int b) throws IOException { } if (thread.toString().contains(MAGIC)) { System.out.println(thread); - System.out.println(Arrays.toString(thread.getStackTrace())); - Assert.fail(); + Assert.fail(thread + " is still running " + Arrays.toString(thread.getStackTrace())); } } } From 41f56a0df846cf22bce7291fd8a3258782a31b91 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 11 Oct 2018 18:50:32 +0200 Subject: [PATCH 4/6] =?UTF-8?q?Cleanup=20imports,=20elaborate=20on=20?= =?UTF-8?q?=E2=80=9Ckill=E2=80=9D=20call?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/cassandra/concurrent/SharedExecutorPool.java | 1 + test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java index 44d44aa13f14..54c5d850aa87 100644 --- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java +++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java @@ -120,6 +120,7 @@ public void shutdown() throws InterruptedException executor.awaitTermination(60, TimeUnit.SECONDS); } + // Make sure the pooled workers waiting for reuse are not left hanging for (SEPWorker worker : workers) worker.kill(); } diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java index 22e3c3bce39e..b88c2e1766db 100644 --- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java +++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.concurrent.ExecutorService; import org.junit.Assert; From b8f5c9b88f6753743d457b92a3c1b1729f5307f6 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 11 Oct 2018 19:26:53 +0200 Subject: [PATCH 5/6] Cleanup --- .../cassandra/concurrent/SEPExecutorTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java index b88c2e1766db..25a4d7d0f087 100644 --- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java +++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.concurrent; -import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.util.Arrays; @@ -27,6 +26,8 @@ import org.junit.Assert; import org.junit.Test; +import org.apache.cassandra.utils.FBUtilities; + public class SEPExecutorTest { @Test @@ -40,25 +41,25 @@ public void shutdownTest() throws Throwable private static void shutdownOnce(int run) throws Throwable { - SharedExecutorPool SHARED = new SharedExecutorPool("SharedPool"); + SharedExecutorPool sharedPool = new SharedExecutorPool("SharedPool"); String MAGIC = "IRREPETABLE_MAGIC_STRING"; OutputStream nullOutputStream = new OutputStream() { - public void write(int b) throws IOException { } + public void write(int b) { } }; PrintStream nullPrintSteam = new PrintStream(nullOutputStream); for (int idx = 0; idx < 20; idx++) { - ExecutorService es = SHARED.newExecutor(10, Integer.MAX_VALUE, "STAGE", run + MAGIC + idx); + ExecutorService es = sharedPool.newExecutor(FBUtilities.getAvailableProcessors(), Integer.MAX_VALUE, "STAGE", run + MAGIC + idx); + // Write to black hole es.execute(() -> nullPrintSteam.println("TEST" + es)); } - SHARED.shutdown(); + sharedPool.shutdown(); for (Thread thread : Thread.getAllStackTraces().keySet()) { if (thread.toString().contains(MAGIC)) { - System.out.println(thread); Assert.fail(thread + " is still running " + Arrays.toString(thread.getStackTrace())); } } From 0df8f74300d77b7f4b3cf49e2ede0a0cd3b286d9 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Thu, 11 Oct 2018 19:30:51 +0200 Subject: [PATCH 6/6] Enable dtests --- .circleci/config.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5a84f724fcf8..76a2c9f84178 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only - build # Set env_settings, env_vars, and workflows/build_and_run_tests based on environment env_settings: &env_settings - <<: *default_env_settings - #<<: *high_capacity_env_settings + #<<: *default_env_settings + <<: *high_capacity_env_settings env_vars: &env_vars - <<: *resource_constrained_env_vars - #<<: *high_capacity_env_vars + #<<: *resource_constrained_env_vars + <<: *high_capacity_env_vars workflows: version: 2 - build_and_run_tests: *default_jobs + #build_and_run_tests: *default_jobs #build_and_run_tests: *with_dtest_jobs_only - #build_and_run_tests: *with_dtest_jobs + build_and_run_tests: *with_dtest_jobs docker_image: &docker_image kjellman/cassandra-test:0.4.3 version: 2 jobs: