Skip to content

Commit

Permalink
#19006 cache eviction is perfrmed asynchrously when threadpool is und… (
Browse files Browse the repository at this point in the history
#19261)

* #19006 cache eviction is perfrmed asynchrously when threadpool is under certain %

* #19006 isAllocationWithinTolerance shouldn't be syynnchronized

* #19006 slight optimization lowering number of threads to 5
  • Loading branch information
fabrizzio-dotCMS committed Sep 14, 2020
1 parent b47e4de commit 181f4ca
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 10 deletions.
Expand Up @@ -6,11 +6,9 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -29,7 +27,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import com.dotcms.util.CloseUtils;
import com.dotmarketing.business.cache.provider.CacheProvider;
import com.dotmarketing.business.cache.provider.CacheProviderStats;
import com.dotmarketing.business.cache.provider.CacheStats;
Expand All @@ -46,13 +43,15 @@ public class H22Cache extends CacheProvider {


private static final long serialVersionUID = 1L;
final int numberOfAsyncThreads=Config.getIntProperty("cache_h22_async_threads", 10);

final int numberOfAsyncThreads=Config.getIntProperty("cache_h22_async_threads", 5);
final int asyncTaskQueueSize = Config.getIntProperty("cache_h22_async_task_queue", 10000);
final float threadAllocationTolerance = Config.getFloatProperty("cache_h22_async_tolerance",0.98F);
final boolean shouldAsync=Config.getBooleanProperty("cache_h22_async", true);


final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("H22-ASYNC-COMMIT-%d").build();
final private ExecutorService executorService = new ThreadPoolExecutor(numberOfAsyncThreads, numberOfAsyncThreads, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000),namedThreadFactory);
final private LinkedBlockingQueue<Runnable> asyncTaskQueue = new LinkedBlockingQueue<>();
final private ExecutorService executorService = new ThreadPoolExecutor(numberOfAsyncThreads, numberOfAsyncThreads, 10, TimeUnit.SECONDS, asyncTaskQueue ,namedThreadFactory);


private Boolean isInitialized = false;
Expand Down Expand Up @@ -134,7 +133,7 @@ public void put(final String group, final String key, final Object content) {
return;
}
DONT_CACHE_ME.put(fqn.id, fqn.toString());
if(shouldAsync) {
if(shouldAsync()) {
putAsync(fqn, content);
return;
}
Expand Down Expand Up @@ -227,7 +226,7 @@ public void remove(final String group, final String key) {

final Fqn fqn = new Fqn(group, key);
DONT_CACHE_ME.put(fqn.id, fqn.toString());
if(shouldAsync) {
if(shouldAsync()) {
removeAsync(fqn);
return;
}
Expand All @@ -243,10 +242,30 @@ public void remove(final String group, final String key) {

}

/**
* Calculates the thread allocation % for a given queue size.
* Then determines if that allocation % exceeds or not a tolerance.
* @return
*/
boolean isAllocationWithinTolerance() {
final int size = asyncTaskQueue.size();
final float allocation = (float) size / (float) asyncTaskQueueSize;
Logger.debug(H22Cache.class,
() -> " size is " + size + ", allocation is " + allocation + ", tolerance is :"
+ threadAllocationTolerance);
return allocation < threadAllocationTolerance;
}

void removeAsync(final Fqn fqn) {

/**
* returns true if async set to true and the task queue is < than a given tolerance % full.
*
* @return
*/
boolean shouldAsync() {
return shouldAsync && isAllocationWithinTolerance();
}

void removeAsync(final Fqn fqn) {

executorService.submit(()-> {
try {
Expand Down
Expand Up @@ -2,9 +2,17 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.dotmarketing.exception.DotRuntimeException;
import com.dotmarketing.util.Config;
import com.dotmarketing.util.Logger;
import com.google.common.io.Files;
import com.liferay.util.FileUtil;
import com.tngtech.java.junit.dataprovider.DataProvider;
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
import io.vavr.control.Try;
import java.io.File;
import java.sql.Connection;
Expand All @@ -19,7 +27,9 @@
import org.apache.commons.lang.RandomStringUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(DataProviderRunner.class)
public class H22CacheTest {

final String[] GROUPNAMES = { "testGroup", "testGroup2", "myBigGroup" };
Expand Down Expand Up @@ -268,4 +278,105 @@ public final class CantCacheMeObject {

}

public H22Cache newCacheInstance(){
File dir = Files.createTempDir();
dir.mkdirs();

final H22Cache cache = new H22Cache(dir.getAbsolutePath());
try {
cache.init();
} catch(Exception e) {
throw new DotRuntimeException(e);
}
return cache;
}

/**
* Given scenario: We create a cache instance then we feed it with remove-task. The cache
* it-self internally decides whether or not (according to the capacity) if some task should be
* executed or not asynchronously
* Expected Results: The test must match a criteria specified
* within the test-case. That helps to predict a behavior.
* In any case we should never ever get a RejectedExecutionException
*/
@Test
@UseDataProvider("toleranceTestCases")
public void Test_Exhaust_Thread_Pool(final ToleranceTestCase testCase) throws Exception {

final boolean shouldAsync = Config.getBooleanProperty("cache_h22_async", true);
final int numberOfAsyncThreads = Config.getIntProperty("cache_h22_async_threads", 10);
final int asyncTaskQueueSize = Config.getIntProperty("cache_h22_async_task_queue", 10000);
final float threadAllocationTolerance = Config.getFloatProperty("cache_h22_async_tolerance", 0.9F);

Config.setProperty("cache_h22_async", true);
Config.setProperty("cache_h22_async_threads", testCase.numberOfThreads);
Config.setProperty("cache_h22_async_task_queue", testCase.queueSize);
Config.setProperty("cache_h22_async_tolerance", testCase.tolerance);

final H22Cache cache = newCacheInstance();

try {
final String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(10);
final Object object = new Object();

assertTrue(cache.shouldAsync);
int count = 0;
for (int i = 1; i <= testCase.numberOfTask; i++) {
cache.put(randomAlphanumeric, randomAlphanumeric, object);
cache.remove(randomAlphanumeric, randomAlphanumeric);

if (!cache.isAllocationWithinTolerance()) {
count++;
}
}
if (testCase.expectAllocationExceeded) {
assertTrue(count > 0);
} else {
assertEquals(count, 0);
}

} finally {

Config.setProperty("cache_h22_async", shouldAsync);
Config.setProperty("cache_h22_async_threads", numberOfAsyncThreads);
Config.setProperty("cache_h22_async_task_queue", asyncTaskQueueSize);
Config.setProperty("cache_h22_async_tolerance", threadAllocationTolerance);

cache.shutdown();
}

}

@DataProvider
public static Object[] toleranceTestCases() throws Exception {
return new Object[]{

new ToleranceTestCase(.98F, 1, 3000, 10000, false),
new ToleranceTestCase(.98F, 1, 3000, 5000, false),
new ToleranceTestCase(.98F, 1, 3000, 2000, true), // The queue is too small

new ToleranceTestCase(.2F, 1, 3000, 10000, true), // Tolerance is too low
new ToleranceTestCase(.9F, 10, 10000, 10000, false), //Tolerance is high but there are many workers
new ToleranceTestCase(.9F, 2, 50000, 10000, true),
new ToleranceTestCase(.5F, 10, 50000, 10000, true)
};
}

static class ToleranceTestCase{
final float tolerance;
final int numberOfThreads;
final int numberOfTask;
final int queueSize;
final boolean expectAllocationExceeded;

ToleranceTestCase(final float tolerance, final int numberOfThreads, final int numberOfTask, final int queueSize, final boolean expectAllocationExceeded) {
this.tolerance = tolerance;
this.numberOfThreads = numberOfThreads;
this.numberOfTask = numberOfTask;
this.queueSize = queueSize;
this.expectAllocationExceeded = expectAllocationExceeded;
}

}

}

0 comments on commit 181f4ca

Please sign in to comment.