Skip to content

Commit

Permalink
FBUtilitities.testWaitFirstFuture is flaky
Browse files Browse the repository at this point in the history
patch by Robert Stupp; reviewed by Eduard Tudenhöfner for CASSANDRA-15744
  • Loading branch information
snazy committed Apr 21, 2020
1 parent a738c58 commit 2aa7cf8
Showing 1 changed file with 34 additions and 22 deletions.
56 changes: 34 additions & 22 deletions test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -140,30 +141,41 @@ public void testGetBroadcastRpcAddress() throws Exception
@Test
public void testWaitFirstFuture() throws ExecutionException, InterruptedException
{

ExecutorService executor = Executors.newFixedThreadPool(4);
FBUtilities.reset();
List<Future<?>> futures = new ArrayList<>();
for (int i = 4; i >= 1; i--)
final int threadCount = 10;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
try
{
List<Future<?>> futures = new ArrayList<>(threadCount);
List<CountDownLatch> latches = new ArrayList<>(threadCount);

for (int i = 0; i < threadCount; i++)
{
CountDownLatch latch = new CountDownLatch(1);
latches.add(latch);
int finalI = i;
futures.add(executor.submit(() -> {
latch.await(10, TimeUnit.SECONDS);
// Sleep to emulate "work" done by the future to make it not return immediately
// after counting down the latch in order to test for delay and spinning done
// in FBUtilities#waitOnFirstFuture.
TimeUnit.MILLISECONDS.sleep(10);
return latch.getCount() == 0 ? finalI : -1;
}));
}

for (int i = 0; i < threadCount; i++)
{
latches.get(i).countDown();
Future<?> fut = FBUtilities.waitOnFirstFuture(futures, 3);
int futSleep = (Integer) fut.get();
assertEquals(futSleep, i);
futures.remove(fut);
}
}
finally
{
final int sleep = i * 10;
futures.add(executor.submit(() -> { TimeUnit.MILLISECONDS.sleep(sleep); return sleep; }));
executor.shutdown();
}
Future<?> fut = FBUtilities.waitOnFirstFuture(futures, 3);
int futSleep = (Integer) fut.get();
assertEquals(futSleep, 10);
futures.remove(fut);
fut = FBUtilities.waitOnFirstFuture(futures, 3);
futSleep = (Integer) fut.get();
assertEquals(futSleep, 20);
futures.remove(fut);
fut = FBUtilities.waitOnFirstFuture(futures, 3);
futSleep = (Integer) fut.get();
assertEquals(futSleep, 30);
futures.remove(fut);
fut = FBUtilities.waitOnFirstFuture(futures, 3);
futSleep = (Integer) fut.get();
assertEquals(futSleep, 40);
}

}

0 comments on commit 2aa7cf8

Please sign in to comment.