From e63088d787b17ec201157cfb1f83a35fdb73c8c1 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Thu, 4 Oct 2018 17:08:52 -0600 Subject: [PATCH 1/5] Add build targets for running burn tests. --- build.xml | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/build.xml b/build.xml index d7e54441afdd..d7e6c4b0d972 100644 --- a/build.xml +++ b/build.xml @@ -1345,6 +1345,14 @@ + + + + + + @@ -1742,6 +1750,31 @@ + + + + + + + + + + + + + + + + + + + + + From 83656e1216f80842493f40f098c223c11689a2db Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Thu, 4 Oct 2018 17:10:02 -0600 Subject: [PATCH 2/5] Harden LongBufferPoolTest against java.lang.OutOfMemoryError Catch all possible errors on the worker threads, otherwise if threads exit due to java.lang.OutOfMemoryError, the test either reports a lack of progress or the chunk recycling check fails. Add an explicit exit to the main on errors as the REQUEST-SCHEDULER thread is not marked as a daemon and prevents the test from exiting. Added some extra logging to clear up how large the buffer pool size is under test, as loading the default config prints `Global buffer pool is enabled, when pool is exahusted (max is 512 mb) it will allocate on heap` and that is not what gets tested. --- .../utils/memory/LongBufferPoolTest.java | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java index 17ac5693e67b..36e0fc7265d9 100644 --- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -97,6 +97,7 @@ public void testAllocate(int threadCount, long duration, int poolSize) throws In ExecutorService executorService = Executors.newFixedThreadPool(threadCount + 2); List> ret = new ArrayList<>(threadCount); long prevPoolSize = BufferPool.MEMORY_USAGE_THRESHOLD; + logger.info("Overriding configured BufferPool.MEMORY_USAGE_THRESHOLD={} and enabling BufferPool.DEBUG", poolSize); BufferPool.MEMORY_USAGE_THRESHOLD = poolSize; BufferPool.DEBUG = true; // sum(1..n) = n/2 * (n + 1); we set zero to CHUNK_SIZE, so have n=threadCount-1 @@ -360,7 +361,9 @@ private int sum1toN(int n) assertEquals(0, executorService.shutdownNow().size()); + logger.info("Reverting BufferPool.MEMORY_USAGE_THRESHOLD={}", prevPoolSize); BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize; + BufferPool.DEBUG = false; for (Future r : ret) assertTrue(r.get()); @@ -399,6 +402,14 @@ public Boolean call() throws Exception ex.printStackTrace(); return false; } + catch (Throwable tr) // for java.lang.OutOfMemoryError + { + logger.error("Got throwable {}, current chunk {}", + tr.getMessage(), + BufferPool.currentChunk()); + tr.printStackTrace(); + return false; + } finally { cleanup(); @@ -407,9 +418,19 @@ public Boolean call() throws Exception } } - public static void main(String[] args) throws InterruptedException, ExecutionException + public static void main(String[] args) { - new LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(), TimeUnit.HOURS.toNanos(2L), 16 << 20); + try + { + new LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(), + TimeUnit.HOURS.toNanos(2L), 16 << 20); + System.exit(0); + } + catch (Throwable tr) + { + System.out.println(String.format("Test failed - %s", tr.getMessage())); + System.exit(1); // Force exit so that non-daemon threads like REQUEST-SCHEDULER do not hang the process on failure + } } /** @@ -451,4 +472,4 @@ V poll() } } -} \ No newline at end of file +} From 631e11de61db240599e0233c7d92bf1d8d9f53bb Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Sun, 7 Oct 2018 13:47:49 -0600 Subject: [PATCH 3/5] Harden LongBufferPoolTest against flaky Chunk recycle assertion The test as written does not guarantee that all Chunks will be recycled, just that it is likely that once during the 10s check cycle, worker threads sets the target memory size to zero (a one-in-16k chance). Instead, make the thread free at least once per cycle and only do the recycle test when all threads are known to have released all memory as they might exit. Also, check the return code from the burn producer and consumer threads in case they throw an assertion error. The current burn producer/consumer threads currently double-free each buffer, which occasionally triggers a double-release assertion check. --- .../utils/memory/LongBufferPoolTest.java | 84 +++++++++++++++---- 1 file changed, 69 insertions(+), 15 deletions(-) diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java index 36e0fc7265d9..2391d9e17598 100644 --- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -36,6 +36,28 @@ import static org.junit.Assert.*; +/** + * Long BufferPool test - make sure that the BufferPool allocates and recycles + * ByteBuffers under heavy concurrent usage. + * + * The test creates two groups of threads + * + * - the burn producer/consumer pair that allocates 1/10 poolSize and then returns + * all the memory to the pool. 50% is freed by the producer, 50% passed to the consumer thread. + * + * - a ring of worker threads that allocate buffers and either immediately free them, + * or pass to the next worker thread for it to be freed on it's behalf. Periodically + * all memory is freed by the thread. + * + * While the burn/worker threads run, the original main thread checks that all of the threads are still + * making progress every 10s (no locking issues, or exits from assertion failures), + * and that every chunk has been freed at least once during the previous cycle (if that was possible). + * + * The test does not expect to survive out-of-memory errors, so needs sufficient heap memory + * for non-direct buffers and the debug tracking objects that check the allocate buffers. + * (The timing is very interesting when Xmx is lowered to increase garbage collection pauses, but do + * not set it too low). + */ public class LongBufferPoolTest { private static final Logger logger = LoggerFactory.getLogger(LongBufferPoolTest.class); @@ -88,10 +110,13 @@ public void testAllocate(int threadCount, long duration, int poolSize) throws In final CountDownLatch latch = new CountDownLatch(threadCount); final SPSCQueue[] sharedRecycle = new SPSCQueue[threadCount]; final AtomicBoolean[] makingProgress = new AtomicBoolean[threadCount]; + final AtomicBoolean burnFreed = new AtomicBoolean(false); + final AtomicBoolean[] freedAllMemory = new AtomicBoolean[threadCount]; for (int i = 0 ; i < sharedRecycle.length ; i++) { sharedRecycle[i] = new SPSCQueue<>(); - makingProgress[i] = new AtomicBoolean(true); + makingProgress[i] = new AtomicBoolean(false); + freedAllMemory[i] = new AtomicBoolean(false); } ExecutorService executorService = Executors.newFixedThreadPool(threadCount + 2); @@ -109,17 +134,22 @@ public void testAllocate(int threadCount, long duration, int poolSize) throws In // setup some high churn allocate/deallocate, without any checking final SPSCQueue burn = new SPSCQueue<>(); final CountDownLatch doneAdd = new CountDownLatch(1); - executorService.submit(new TestUntil(until) + ret.add(executorService.submit(new TestUntil(until) { int count = 0; + final ThreadLocalRandom rand = ThreadLocalRandom.current(); void testOne() throws Exception { if (count * BufferPool.CHUNK_SIZE >= poolSize / 10) { if (burn.exhausted) + { count = 0; - else + burnFreed.compareAndSet(false, true); + } else + { Thread.yield(); + } return; } @@ -130,16 +160,21 @@ void testOne() throws Exception return; } - BufferPool.put(buffer); - burn.add(buffer); + // 50/50 chance of returning the buffer from the producer thread, or + // pass it on to the consumer. + if (rand.nextBoolean()) + BufferPool.put(buffer); + else + burn.add(buffer); + count++; } void cleanup() { doneAdd.countDown(); } - }); - executorService.submit(new TestUntil(until) + })); + ret.add(executorService.submit(new TestUntil(until) { void testOne() throws Exception { @@ -155,7 +190,7 @@ void cleanup() { Uninterruptibles.awaitUninterruptibly(doneAdd); } - }); + })); } for (int t = 0; t < threadCount; t++) @@ -182,7 +217,7 @@ void checkpoint() void testOne() throws Exception { - long currentTargetSize = rand.nextInt(poolSize / 1024) == 0 ? 0 : targetSize; + long currentTargetSize = (rand.nextInt(poolSize / 1024) == 0 || !freedAllMemory[threadIdx].get()) ? 0 : targetSize; int spinCount = 0; while (totalSize > currentTargetSize - freeingSize) { @@ -231,6 +266,9 @@ else if (!recycleFromNeighbour()) } } + if (currentTargetSize == 0) + freedAllMemory[threadIdx].compareAndSet(false, true); + // allocate a new buffer size = (int) Math.max(1, avgBufferSize + (stdevBufferSize * rand.nextGaussian())); if (size <= BufferPool.CHUNK_SIZE) @@ -336,16 +374,32 @@ private int sum1toN(int n) })); } - boolean first = true; while (!latch.await(10L, TimeUnit.SECONDS)) { - if (!first) - BufferPool.assertAllRecycled(); - first = false; + int stalledThreads = 0; + int doneThreads = 0; + for (AtomicBoolean progress : makingProgress) { - assert progress.get(); - progress.set(false); + if (!progress.getAndSet(false)) + stalledThreads++; + } + + for (Future r : ret) + { + if (r.isDone()) + doneThreads++; + } + if (doneThreads == 0) // If any threads have completed, they will stop making progress/recycling buffers. + { // Assertions failures on the threads will be caught below. + assert stalledThreads == 0; + boolean allFreed = burnFreed.getAndSet(false); + for (AtomicBoolean freedMemory : freedAllMemory) + allFreed = allFreed && freedMemory.getAndSet(false); + if (allFreed) + BufferPool.assertAllRecycled(); + else + logger.info("All threads did not free all memory in this time slot - skipping buffer recycle check"); } } From dda50237e1cddc991917ed3e06f5533925f89990 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Thu, 4 Oct 2018 17:29:54 -0600 Subject: [PATCH 4/5] Change target allocation size for LongBufferPoolSize. The prior method for calculating the amount of memory to try and allocate for the tests increased when it ran with more threads. The pool under test is a fixed size pool so the effect was to add pressure on heap allocations instead. On a 6-core i7 the test failed with -Xmx500m. This change restricts the total allocation size to a little over double the pool size regardless of the number of threads present. --- .../utils/memory/LongBufferPoolTest.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java index 2391d9e17598..44349fb93cff 100644 --- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -125,10 +125,15 @@ public void testAllocate(int threadCount, long duration, int poolSize) throws In logger.info("Overriding configured BufferPool.MEMORY_USAGE_THRESHOLD={} and enabling BufferPool.DEBUG", poolSize); BufferPool.MEMORY_USAGE_THRESHOLD = poolSize; BufferPool.DEBUG = true; - // sum(1..n) = n/2 * (n + 1); we set zero to CHUNK_SIZE, so have n=threadCount-1 - int targetSizeQuanta = ((threadCount) * (threadCount - 1)) / 2; - // fix targetSizeQuanta at 1/64th our poolSize, so that we only consciously exceed our pool size limit - targetSizeQuanta = (targetSizeQuanta * poolSize) / 64; + + // Divide the poolSize across our threads, deliberately over-subscribing it. Threads + // allocate a different amount of memory each - 1*quanta, 2*quanta, ... N*quanta. + // Thread0 is always going to be a single CHUNK, then to allocate increasing amounts + // using their own algorithm the targetSize should be poolSize / targetSizeQuanta. + // + // This should divide double the poolSize across the working threads, + // plus CHUNK_SIZE for thread0 and 1/10 poolSize for the burn producer/consumer pair. + final int targetSizeQuanta = 2 * poolSize / sum1toN(threadCount - 1); { // setup some high churn allocate/deallocate, without any checking @@ -366,11 +371,6 @@ BufferCheck sample() return checks.get(index); } - - private int sum1toN(int n) - { - return (n * (n + 1)) / 2; - } })); } @@ -526,4 +526,8 @@ V poll() } } + private int sum1toN(int n) + { + return (n * (n + 1)) / 2; + } } From 43a4bcf8d15229d110844e2166a62c9e29a9f49a Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Mon, 22 Oct 2018 14:39:01 -0600 Subject: [PATCH 5/5] Refactored LongBufferPoolTest to improve clarity --- .../utils/memory/LongBufferPoolTest.java | 592 ++++++++++-------- 1 file changed, 327 insertions(+), 265 deletions(-) diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java index 44349fb93cff..8b51ef16a839 100644 --- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -61,6 +61,9 @@ public class LongBufferPoolTest { private static final Logger logger = LoggerFactory.getLogger(LongBufferPoolTest.class); + final int avgBufferSize = 16 << 10; + final int stdevBufferSize = 10 << 10; // picked to ensure exceeding buffer size is rare, but occurs + final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); @Test public void testAllocate() throws InterruptedException, ExecutionException @@ -95,334 +98,393 @@ void init() } } - public void testAllocate(int threadCount, long duration, int poolSize) throws InterruptedException, ExecutionException + private static final class TestEnvironment { - final int avgBufferSize = 16 << 10; - final int stdevBufferSize = 10 << 10; // picked to ensure exceeding buffer size is rare, but occurs - final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + final int threadCount; + final long duration; + final int poolSize; + final long until; + final CountDownLatch latch; + final SPSCQueue[] sharedRecycle; + final AtomicBoolean[] makingProgress; + final AtomicBoolean burnFreed; + final AtomicBoolean[] freedAllMemory; + final ExecutorService executorService; + final List> threadResultFuture; + final int targetSizeQuanta; + + TestEnvironment(int threadCount, long duration, int poolSize) + { + this.threadCount = threadCount; + this.duration = duration; + this.poolSize = poolSize; + until = System.nanoTime() + duration; + latch = new CountDownLatch(threadCount); + sharedRecycle = new SPSCQueue[threadCount]; + makingProgress = new AtomicBoolean[threadCount]; + burnFreed = new AtomicBoolean(false); + freedAllMemory = new AtomicBoolean[threadCount]; + executorService = Executors.newFixedThreadPool(threadCount + 2); + threadResultFuture = new ArrayList<>(threadCount); + + for (int i = 0; i < sharedRecycle.length; i++) + { + sharedRecycle[i] = new SPSCQueue<>(); + makingProgress[i] = new AtomicBoolean(false); + freedAllMemory[i] = new AtomicBoolean(false); + } - System.out.println(String.format("%s - testing %d threads for %dm", - dateFormat.format(new Date()), - threadCount, - TimeUnit.NANOSECONDS.toMinutes(duration))); + // Divide the poolSize across our threads, deliberately over-subscribing it. Threads + // allocate a different amount of memory each - 1*quanta, 2*quanta, ... N*quanta. + // Thread0 is always going to be a single CHUNK, then to allocate increasing amounts + // using their own algorithm the targetSize should be poolSize / targetSizeQuanta. + // + // This should divide double the poolSize across the working threads, + // plus CHUNK_SIZE for thread0 and 1/10 poolSize for the burn producer/consumer pair. + targetSizeQuanta = 2 * poolSize / sum1toN(threadCount - 1); + } + + void addCheckedFuture(Future future) + { + threadResultFuture.add(future); + } - final long until = System.nanoTime() + duration; - final CountDownLatch latch = new CountDownLatch(threadCount); - final SPSCQueue[] sharedRecycle = new SPSCQueue[threadCount]; - final AtomicBoolean[] makingProgress = new AtomicBoolean[threadCount]; - final AtomicBoolean burnFreed = new AtomicBoolean(false); - final AtomicBoolean[] freedAllMemory = new AtomicBoolean[threadCount]; - for (int i = 0 ; i < sharedRecycle.length ; i++) + int countStalledThreads() { - sharedRecycle[i] = new SPSCQueue<>(); - makingProgress[i] = new AtomicBoolean(false); - freedAllMemory[i] = new AtomicBoolean(false); + int stalledThreads = 0; + + for (AtomicBoolean progress : makingProgress) + { + if (!progress.getAndSet(false)) + stalledThreads++; + } + return stalledThreads; } - ExecutorService executorService = Executors.newFixedThreadPool(threadCount + 2); - List> ret = new ArrayList<>(threadCount); + int countDoneThreads() + { + int doneThreads = 0; + for (Future r : threadResultFuture) + { + if (r.isDone()) + doneThreads++; + } + return doneThreads; + } + + void assertCheckedThreadsSucceeded() + { + try + { + for (Future r : threadResultFuture) + assertTrue(r.get()); + } + catch (InterruptedException ex) + { + // If interrupted while checking, restart and check everything. + assertCheckedThreadsSucceeded(); + } + catch (ExecutionException ex) + { + fail("Checked thread threw exception: " + ex.toString()); + } + } + } + + public void testAllocate(int threadCount, long duration, int poolSize) throws InterruptedException, ExecutionException + { + System.out.println(String.format("%s - testing %d threads for %dm", + dateFormat.format(new Date()), + threadCount, + TimeUnit.NANOSECONDS.toMinutes(duration))); long prevPoolSize = BufferPool.MEMORY_USAGE_THRESHOLD; logger.info("Overriding configured BufferPool.MEMORY_USAGE_THRESHOLD={} and enabling BufferPool.DEBUG", poolSize); BufferPool.MEMORY_USAGE_THRESHOLD = poolSize; BufferPool.DEBUG = true; - // Divide the poolSize across our threads, deliberately over-subscribing it. Threads - // allocate a different amount of memory each - 1*quanta, 2*quanta, ... N*quanta. - // Thread0 is always going to be a single CHUNK, then to allocate increasing amounts - // using their own algorithm the targetSize should be poolSize / targetSizeQuanta. - // - // This should divide double the poolSize across the working threads, - // plus CHUNK_SIZE for thread0 and 1/10 poolSize for the burn producer/consumer pair. - final int targetSizeQuanta = 2 * poolSize / sum1toN(threadCount - 1); + TestEnvironment testEnv = new TestEnvironment(threadCount, duration, poolSize); - { - // setup some high churn allocate/deallocate, without any checking - final SPSCQueue burn = new SPSCQueue<>(); - final CountDownLatch doneAdd = new CountDownLatch(1); - ret.add(executorService.submit(new TestUntil(until) - { - int count = 0; - final ThreadLocalRandom rand = ThreadLocalRandom.current(); - void testOne() throws Exception - { - if (count * BufferPool.CHUNK_SIZE >= poolSize / 10) - { - if (burn.exhausted) - { - count = 0; - burnFreed.compareAndSet(false, true); - } else - { - Thread.yield(); - } - return; - } + startBurnerThreads(testEnv); - ByteBuffer buffer = BufferPool.tryGet(BufferPool.CHUNK_SIZE); - if (buffer == null) - { - Thread.yield(); - return; - } + for (int threadIdx = 0; threadIdx < threadCount; threadIdx++) + testEnv.addCheckedFuture(startWorkerThread(testEnv, threadIdx)); - // 50/50 chance of returning the buffer from the producer thread, or - // pass it on to the consumer. - if (rand.nextBoolean()) - BufferPool.put(buffer); - else - burn.add(buffer); + while (!testEnv.latch.await(10L, TimeUnit.SECONDS)) + { + int stalledThreads = testEnv.countStalledThreads(); + int doneThreads = testEnv.countDoneThreads(); - count++; - } - void cleanup() - { - doneAdd.countDown(); - } - })); - ret.add(executorService.submit(new TestUntil(until) - { - void testOne() throws Exception - { - ByteBuffer buffer = burn.poll(); - if (buffer == null) - { - Thread.yield(); - return; - } - BufferPool.put(buffer); - } - void cleanup() - { - Uninterruptibles.awaitUninterruptibly(doneAdd); - } - })); + if (doneThreads == 0) // If any threads have completed, they will stop making progress/recycling buffers. + { // Assertions failures on the threads will be caught below. + assert stalledThreads == 0; + boolean allFreed = testEnv.burnFreed.getAndSet(false); + for (AtomicBoolean freedMemory : testEnv.freedAllMemory) + allFreed = allFreed && freedMemory.getAndSet(false); + if (allFreed) + BufferPool.assertAllRecycled(); + else + logger.info("All threads did not free all memory in this time slot - skipping buffer recycle check"); + } } - for (int t = 0; t < threadCount; t++) + for (SPSCQueue queue : testEnv.sharedRecycle) { - final int threadIdx = t; - final int targetSize = t == 0 ? BufferPool.CHUNK_SIZE : targetSizeQuanta * t; - - ret.add(executorService.submit(new TestUntil(until) + BufferCheck check; + while ( null != (check = queue.poll()) ) { - final SPSCQueue shareFrom = sharedRecycle[threadIdx]; - final DynamicList checks = new DynamicList<>((int) Math.max(1, targetSize / (1 << 10))); - final SPSCQueue shareTo = sharedRecycle[(threadIdx + 1) % threadCount]; - final ThreadLocalRandom rand = ThreadLocalRandom.current(); - int totalSize = 0; - int freeingSize = 0; - int size = 0; - - void checkpoint() - { - if (!makingProgress[threadIdx].get()) - makingProgress[threadIdx].set(true); - } + check.validate(); + BufferPool.put(check.buffer); + } + } - void testOne() throws Exception - { + assertEquals(0, testEnv.executorService.shutdownNow().size()); - long currentTargetSize = (rand.nextInt(poolSize / 1024) == 0 || !freedAllMemory[threadIdx].get()) ? 0 : targetSize; - int spinCount = 0; - while (totalSize > currentTargetSize - freeingSize) - { - // free buffers until we're below our target size - if (checks.size() == 0) - { - // if we're out of buffers to free, we're waiting on our neighbour to free them; - // first check if the consuming neighbour has caught up, and if so mark that free - if (shareTo.exhausted) - { - totalSize -= freeingSize; - freeingSize = 0; - } - else if (!recycleFromNeighbour()) - { - if (++spinCount > 1000 && System.nanoTime() > until) - return; - // otherwise, free one of our other neighbour's buffers if can; and otherwise yield - Thread.yield(); - } - continue; - } + logger.info("Reverting BufferPool.MEMORY_USAGE_THRESHOLD={}", prevPoolSize); + BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize; + BufferPool.DEBUG = false; - // pick a random buffer, with preference going to earlier ones - BufferCheck check = sample(); - checks.remove(check.listnode); - check.validate(); + testEnv.assertCheckedThreadsSucceeded(); - size = BufferPool.roundUpNormal(check.buffer.capacity()); - if (size > BufferPool.CHUNK_SIZE) - size = 0; + System.out.println(String.format("%s - finished.", + dateFormat.format(new Date()))); + } - // either share to free, or free immediately - if (rand.nextBoolean()) + private Future startWorkerThread(TestEnvironment testEnv, final int threadIdx) + { + return testEnv.executorService.submit(new TestUntil(testEnv.until) + { + final int targetSize = threadIdx == 0 ? BufferPool.CHUNK_SIZE : testEnv.targetSizeQuanta * threadIdx; + final SPSCQueue shareFrom = testEnv.sharedRecycle[threadIdx]; + final DynamicList checks = new DynamicList<>((int) Math.max(1, targetSize / (1 << 10))); + final SPSCQueue shareTo = testEnv.sharedRecycle[(threadIdx + 1) % testEnv.threadCount]; + final ThreadLocalRandom rand = ThreadLocalRandom.current(); + int totalSize = 0; + int freeingSize = 0; + int size = 0; + + void checkpoint() + { + if (!testEnv.makingProgress[threadIdx].get()) + testEnv.makingProgress[threadIdx].set(true); + } + + void testOne() throws Exception + { + + long currentTargetSize = (rand.nextInt(testEnv.poolSize / 1024) == 0 || !testEnv.freedAllMemory[threadIdx].get()) ? 0 : targetSize; + int spinCount = 0; + while (totalSize > currentTargetSize - freeingSize) + { + // free buffers until we're below our target size + if (checks.size() == 0) + { + // if we're out of buffers to free, we're waiting on our neighbour to free them; + // first check if the consuming neighbour has caught up, and if so mark that free + if (shareTo.exhausted) { - shareTo.add(check); - freeingSize += size; - // interleave this with potentially messing with the other neighbour's stuff - recycleFromNeighbour(); + totalSize -= freeingSize; + freeingSize = 0; } - else + else if (!recycleFromNeighbour()) { - check.validate(); - BufferPool.put(check.buffer); - totalSize -= size; + if (++spinCount > 1000 && System.nanoTime() > until) + return; + // otherwise, free one of our other neighbour's buffers if can; and otherwise yield + Thread.yield(); } + continue; } - if (currentTargetSize == 0) - freedAllMemory[threadIdx].compareAndSet(false, true); + // pick a random buffer, with preference going to earlier ones + BufferCheck check = sample(); + checks.remove(check.listnode); + check.validate(); - // allocate a new buffer - size = (int) Math.max(1, avgBufferSize + (stdevBufferSize * rand.nextGaussian())); - if (size <= BufferPool.CHUNK_SIZE) - { - totalSize += BufferPool.roundUpNormal(size); - allocate(size); - } - else if (rand.nextBoolean()) + size = BufferPool.roundUpNormal(check.buffer.capacity()); + if (size > BufferPool.CHUNK_SIZE) + size = 0; + + // either share to free, or free immediately + if (rand.nextBoolean()) { - allocate(size); + shareTo.add(check); + freeingSize += size; + // interleave this with potentially messing with the other neighbour's stuff + recycleFromNeighbour(); } else { - // perform a burst allocation to exhaust all available memory - while (totalSize < poolSize) - { - size = (int) Math.max(1, avgBufferSize + (stdevBufferSize * rand.nextGaussian())); - if (size <= BufferPool.CHUNK_SIZE) - { - allocate(size); - totalSize += BufferPool.roundUpNormal(size); - } - } - } - - // validate a random buffer we have stashed - checks.get(rand.nextInt(checks.size())).validate(); - - // free all of our neighbour's remaining shared buffers - while (recycleFromNeighbour()); - } - - void cleanup() - { - while (checks.size() > 0) - { - BufferCheck check = checks.get(0); + check.validate(); BufferPool.put(check.buffer); - checks.remove(check.listnode); + totalSize -= size; } - latch.countDown(); } - boolean recycleFromNeighbour() + if (currentTargetSize == 0) + testEnv.freedAllMemory[threadIdx].compareAndSet(false, true); + + // allocate a new buffer + size = (int) Math.max(1, avgBufferSize + (stdevBufferSize * rand.nextGaussian())); + if (size <= BufferPool.CHUNK_SIZE) { - BufferCheck check = shareFrom.poll(); - if (check == null) - return false; - check.validate(); - BufferPool.put(check.buffer); - return true; + totalSize += BufferPool.roundUpNormal(size); + allocate(size); } - - BufferCheck allocate(int size) + else if (rand.nextBoolean()) { - ByteBuffer buffer = BufferPool.get(size); - assertNotNull(buffer); - BufferCheck check = new BufferCheck(buffer, rand.nextLong()); - assertEquals(size, buffer.capacity()); - assertEquals(0, buffer.position()); - check.init(); - check.listnode = checks.append(check); - return check; + allocate(size); } - - BufferCheck sample() + else { - // sample with preference to first elements: - // element at index n will be selected with likelihood (size - n) / sum1ToN(size) - int size = checks.size(); - - // pick a random number between 1 and sum1toN(size) - int sampleRange = sum1toN(size); - int sampleIndex = rand.nextInt(sampleRange); - - // then binary search for the N, such that [sum1ToN(N), sum1ToN(N+1)) contains this random number - int moveBy = Math.max(size / 4, 1); - int index = size / 2; - while (true) + // perform a burst allocation to exhaust all available memory + while (totalSize < testEnv.poolSize) { - int baseSampleIndex = sum1toN(index); - int endOfSampleIndex = sum1toN(index + 1); - if (sampleIndex >= baseSampleIndex) + size = (int) Math.max(1, avgBufferSize + (stdevBufferSize * rand.nextGaussian())); + if (size <= BufferPool.CHUNK_SIZE) { - if (sampleIndex < endOfSampleIndex) - break; - index += moveBy; + allocate(size); + totalSize += BufferPool.roundUpNormal(size); } - else index -= moveBy; - moveBy = Math.max(moveBy / 2, 1); } + } - // this gives us the inverse of our desired value, so just subtract it from the last index - index = size - (index + 1); + // validate a random buffer we have stashed + checks.get(rand.nextInt(checks.size())).validate(); - return checks.get(index); - } - })); - } + // free all of our neighbour's remaining shared buffers + while (recycleFromNeighbour()); + } - while (!latch.await(10L, TimeUnit.SECONDS)) - { - int stalledThreads = 0; - int doneThreads = 0; + void cleanup() + { + while (checks.size() > 0) + { + BufferCheck check = checks.get(0); + BufferPool.put(check.buffer); + checks.remove(check.listnode); + } + testEnv.latch.countDown(); + } - for (AtomicBoolean progress : makingProgress) + boolean recycleFromNeighbour() { - if (!progress.getAndSet(false)) - stalledThreads++; + BufferCheck check = shareFrom.poll(); + if (check == null) + return false; + check.validate(); + BufferPool.put(check.buffer); + return true; } - for (Future r : ret) + BufferCheck allocate(int size) { - if (r.isDone()) - doneThreads++; + ByteBuffer buffer = BufferPool.get(size); + assertNotNull(buffer); + BufferCheck check = new BufferCheck(buffer, rand.nextLong()); + assertEquals(size, buffer.capacity()); + assertEquals(0, buffer.position()); + check.init(); + check.listnode = checks.append(check); + return check; } - if (doneThreads == 0) // If any threads have completed, they will stop making progress/recycling buffers. - { // Assertions failures on the threads will be caught below. - assert stalledThreads == 0; - boolean allFreed = burnFreed.getAndSet(false); - for (AtomicBoolean freedMemory : freedAllMemory) - allFreed = allFreed && freedMemory.getAndSet(false); - if (allFreed) - BufferPool.assertAllRecycled(); - else - logger.info("All threads did not free all memory in this time slot - skipping buffer recycle check"); + + BufferCheck sample() + { + // sample with preference to first elements: + // element at index n will be selected with likelihood (size - n) / sum1ToN(size) + int size = checks.size(); + + // pick a random number between 1 and sum1toN(size) + int sampleRange = sum1toN(size); + int sampleIndex = rand.nextInt(sampleRange); + + // then binary search for the N, such that [sum1ToN(N), sum1ToN(N+1)) contains this random number + int moveBy = Math.max(size / 4, 1); + int index = size / 2; + while (true) + { + int baseSampleIndex = sum1toN(index); + int endOfSampleIndex = sum1toN(index + 1); + if (sampleIndex >= baseSampleIndex) + { + if (sampleIndex < endOfSampleIndex) + break; + index += moveBy; + } + else index -= moveBy; + moveBy = Math.max(moveBy / 2, 1); + } + + // this gives us the inverse of our desired value, so just subtract it from the last index + index = size - (index + 1); + + return checks.get(index); } - } + }); + } - for (SPSCQueue queue : sharedRecycle) + private void startBurnerThreads(TestEnvironment testEnv) + { + // setup some high churn allocate/deallocate, without any checking + final SPSCQueue burn = new SPSCQueue<>(); + final CountDownLatch doneAdd = new CountDownLatch(1); + testEnv.addCheckedFuture(testEnv.executorService.submit(new TestUntil(testEnv.until) { - BufferCheck check; - while ( null != (check = queue.poll()) ) + int count = 0; + final ThreadLocalRandom rand = ThreadLocalRandom.current(); + void testOne() throws Exception { - check.validate(); - BufferPool.put(check.buffer); - } - } + if (count * BufferPool.CHUNK_SIZE >= testEnv.poolSize / 10) + { + if (burn.exhausted) + { + count = 0; + testEnv.burnFreed.compareAndSet(false, true); + } else + { + Thread.yield(); + } + return; + } - assertEquals(0, executorService.shutdownNow().size()); + ByteBuffer buffer = BufferPool.tryGet(BufferPool.CHUNK_SIZE); + if (buffer == null) + { + Thread.yield(); + return; + } - logger.info("Reverting BufferPool.MEMORY_USAGE_THRESHOLD={}", prevPoolSize); - BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize; - BufferPool.DEBUG = false; - for (Future r : ret) - assertTrue(r.get()); + // 50/50 chance of returning the buffer from the producer thread, or + // pass it on to the consumer. + if (rand.nextBoolean()) + BufferPool.put(buffer); + else + burn.add(buffer); - System.out.println(String.format("%s - finished.", - dateFormat.format(new Date()))); + count++; + } + void cleanup() + { + doneAdd.countDown(); + } + })); + testEnv.threadResultFuture.add(testEnv.executorService.submit(new TestUntil(testEnv.until) + { + void testOne() throws Exception + { + ByteBuffer buffer = burn.poll(); + if (buffer == null) + { + Thread.yield(); + return; + } + BufferPool.put(buffer); + } + void cleanup() + { + Uninterruptibles.awaitUninterruptibly(doneAdd); + } + })); } static abstract class TestUntil implements Callable @@ -526,7 +588,7 @@ V poll() } } - private int sum1toN(int n) + private static int sum1toN(int n) { return (n * (n + 1)) / 2; }