Skip to content

Commit

Permalink
Fix flaky SEPExecutor.changingMaxWorkersMeetsConcurrencyGoalsTest
Browse files Browse the repository at this point in the history
Thread scheduling is not guaranteed to be fair and having the BusyWork
tasks reschedule itself makes sure there is always more work for
the SEPWorker once it finishes, so it can hog all the CPU if
run with a low number of cores.  To randomize the scheduling better,
introduce a second thread that keeps the executor primed with work,
but guarantees a thread switch by waiting on the sempahore.

Also resolves a cleanup bug - the sharedPool was not being shutdown
correctly.

Patch by Jon Meredith; reviewed by David Capwell and Dinesh Joshi for CASSANDRA-15709
  • Loading branch information
jonmeredith authored and dineshjoshi committed Apr 18, 2020
1 parent 0a860b9 commit f9ddaf1
Showing 1 changed file with 52 additions and 33 deletions.
85 changes: 52 additions & 33 deletions test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -32,6 +34,7 @@

import org.apache.cassandra.utils.FBUtilities;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;

public class SEPExecutorTest
Expand Down Expand Up @@ -75,48 +78,68 @@ public void write(int b) { }
}

@Test
public void changingMaxWorkersMeetsConcurrencyGoalsTest() throws InterruptedException
public void changingMaxWorkersMeetsConcurrencyGoalsTest() throws InterruptedException, TimeoutException
{
final int numBusyWorkers = 2; // Number of busy worker threads to run and gum things up
// Number of busy worker threads to run and gum things up. Chosen to be
// between the low and high max pool size so the test exercises resizing
// under a number of different conditions.
final int numBusyWorkers = 2;
SharedExecutorPool sharedPool = new SharedExecutorPool("ChangingMaxWorkersMeetsConcurrencyGoalsTest");
final AtomicInteger notifiedMaxPoolSize = new AtomicInteger();

LocalAwareExecutorService executor = sharedPool.newExecutor(0, notifiedMaxPoolSize::set, 4, "internal", "resizetest");

// Keep feeding the executor work while resizing
// so it stays under load.
AtomicBoolean stayBusy = new AtomicBoolean(true);
for (int i = 0; i < numBusyWorkers; i++)
{
executor.execute(new BusyWork(executor, stayBusy));
}
Semaphore busyWorkerPermits = new Semaphore(numBusyWorkers);
Thread makeBusy = new Thread(() -> {
while (stayBusy.get() == true)
{
try
{
if (busyWorkerPermits.tryAcquire(1, MILLISECONDS)) {
executor.execute(new BusyWork(busyWorkerPermits));
}
}
catch (InterruptedException e)
{
// ignore, will either stop looping if done or retry the lock
}
}
});

final int previousConcurrency = executor.getMaximumPoolSize();
makeBusy.start();
try
{
assertMaxTaskConcurrency(executor, 1);
Assert.assertEquals(1, notifiedMaxPoolSize.get());
for (int repeat = 0; repeat < 1000; repeat++)
{
assertMaxTaskConcurrency(executor, 1);
Assert.assertEquals(1, notifiedMaxPoolSize.get());

assertMaxTaskConcurrency(executor, 2);
Assert.assertEquals(2, notifiedMaxPoolSize.get());
assertMaxTaskConcurrency(executor, 2);
Assert.assertEquals(2, notifiedMaxPoolSize.get());

assertMaxTaskConcurrency(executor, 1);
Assert.assertEquals(1, notifiedMaxPoolSize.get());
assertMaxTaskConcurrency(executor, 1);
Assert.assertEquals(1, notifiedMaxPoolSize.get());

assertMaxTaskConcurrency(executor, 3);
Assert.assertEquals(3, notifiedMaxPoolSize.get());
assertMaxTaskConcurrency(executor, 3);
Assert.assertEquals(3, notifiedMaxPoolSize.get());

executor.setMaximumPoolSize(0);
Assert.assertEquals(0, notifiedMaxPoolSize.get());
executor.setMaximumPoolSize(0);
Assert.assertEquals(0, notifiedMaxPoolSize.get());

assertMaxTaskConcurrency(executor, 4);
Assert.assertEquals(4, notifiedMaxPoolSize.get());
assertMaxTaskConcurrency(executor, 4);
Assert.assertEquals(4, notifiedMaxPoolSize.get());
}
}
finally
{
stayBusy.set(false);
executor.setMaximumPoolSize(previousConcurrency);
executor.shutdownNow();
Assert.assertTrue(executor.isShutdown());
Assert.assertTrue(executor.awaitTermination(1L, TimeUnit.MINUTES));
makeBusy.join(TimeUnit.SECONDS.toMillis(5));
Assert.assertFalse("makeBusy thread should have checked stayBusy and exited",
makeBusy.isAlive());
sharedPool.shutdownAndWait(1L, MINUTES);
}
}

Expand Down Expand Up @@ -149,21 +172,16 @@ public void run()

static class BusyWork implements Runnable
{
private ExecutorService executor;
private AtomicBoolean stayBusy;
private Semaphore busyWorkers;

public BusyWork(ExecutorService executor, AtomicBoolean stayBusy)
public BusyWork(Semaphore busyWorkers)
{
this.executor = executor;
this.stayBusy = stayBusy;
this.busyWorkers = busyWorkers;
}

public void run()
{
if (stayBusy.get())
{
executor.execute(new BusyWork(executor, stayBusy));
}
busyWorkers.release();
}
}

Expand All @@ -177,6 +195,7 @@ void assertMaxTaskConcurrency(LocalAwareExecutorService executor, int concurrenc
executor.execute(new LatchWaiter(concurrencyGoal, 5L, TimeUnit.SECONDS));
}
// Will return true if all of the LatchWaiters count down before the timeout
Assert.assertEquals(true, concurrencyGoal.await(3L, TimeUnit.SECONDS));
Assert.assertEquals("Test tasks did not hit max concurrency goal",
true, concurrencyGoal.await(3L, TimeUnit.SECONDS));
}
}

0 comments on commit f9ddaf1

Please sign in to comment.