Permalink
Browse files

fixes #939 made transactor cache configurable (#956)

  • Loading branch information...
kpm1985 authored and keith-turner committed Oct 24, 2017
1 parent c6bdbe5 commit 955c86fadcc207eecd2a6af464ad51f15e3a223c
@@ -104,21 +104,21 @@ public static int getTxCommitMemory(FluoConfiguration conf) {
return m;
}
public static final String TX_INFO_CACHE_SIZE = FLUO_IMPL_PREFIX + ".tx.failed.cache.size.mb";
public static final long TX_INFO_CACHE_SIZE_DEFAULT = 10_000_000;
public static final String TX_INFO_CACHE_WEIGHT = FLUO_IMPL_PREFIX + ".tx.failed.cache.weight.mb";
public static final long TX_INFO_CACHE_WEIGHT_DEFAULT = 10_000_000;
/**
* Gets the cache size
* Gets the txinfo cache weight
*
* @param conf The FluoConfiguration
* @return The size of the cache value from the property value {@value #TX_INFO_CACHE_SIZE}
* if it is set, else the value of the default value {@value #TX_INFO_CACHE_SIZE_DEFAULT}
* @return The size of the cache value from the property value {@value #TX_INFO_CACHE_WEIGHT}
* if it is set, else the value of the default value {@value #TX_INFO_CACHE_WEIGHT_DEFAULT}
*/
public static long getTxInfoCacheSize(FluoConfiguration conf) {
long size = conf.getLong(TX_INFO_CACHE_SIZE, TX_INFO_CACHE_SIZE_DEFAULT);
public static long getTxInfoCacheWeight(FluoConfiguration conf) {
long size = conf.getLong(TX_INFO_CACHE_WEIGHT, TX_INFO_CACHE_WEIGHT_DEFAULT);
if (size <= 0) {
throw new IllegalArgumentException("Cache size must be positive for " + TX_INFO_CACHE_SIZE);
throw new IllegalArgumentException("Cache size must be positive for " + TX_INFO_CACHE_WEIGHT);
}
return size;
}
@@ -144,22 +144,23 @@ public static long getTxIfoCacheTimeout(FluoConfiguration conf, TimeUnit tu) {
return tu.convert(millis, TimeUnit.MILLISECONDS);
}
public static final String VISIBILITY_CACHE_SIZE = FLUO_IMPL_PREFIX + ".visibility.cache.size.mb";
public static final long VISIBILITY_CACHE_SIZE_DEFAULT = 10_000_000;
public static final String VISIBILITY_CACHE_WEIGHT =
FLUO_IMPL_PREFIX + ".visibility.cache.weight.mb";
public static final long VISIBILITY_CACHE_WEIGHT_DEFAULT = 10_000_000;
/**
* Gets the cache size
* Gets the visibility cache weight
*
* @param conf The FluoConfiguration
* @return The size of the cache value from the property value {@value #VISIBILITY_CACHE_SIZE}
* if it is set, else the value of the default value {@value #VISIBILITY_CACHE_SIZE_DEFAULT}
* @return The size of the cache value from the property value {@value #VISIBILITY_CACHE_WEIGHT}
* if it is set, else the value of the default value {@value #VISIBILITY_CACHE_WEIGHT_DEFAULT}
*/
public static long getVisibilityCacheSize(FluoConfiguration conf) {
long size = conf.getLong(VISIBILITY_CACHE_SIZE, VISIBILITY_CACHE_SIZE_DEFAULT);
public static long getVisibilityCacheWeight(FluoConfiguration conf) {
long size = conf.getLong(VISIBILITY_CACHE_WEIGHT, VISIBILITY_CACHE_WEIGHT_DEFAULT);
if (size <= 0) {
throw new IllegalArgumentException(
"Cache size must be positive for " + VISIBILITY_CACHE_SIZE);
"Cache size must be positive for " + VISIBILITY_CACHE_WEIGHT);
}
return size;
}
@@ -185,6 +186,41 @@ public static long getVisibilityCacheTimeout(FluoConfiguration conf, TimeUnit tu
return tu.convert(millis, TimeUnit.MILLISECONDS);
}
private static final String TRANSACTOR_MAX_CACHE_SIZE =
FLUO_IMPL_PREFIX + ".transactor.cache.max.size";
private static final long TRANSACTOR_MAX_CACHE_SIZE_DEFAULT = 32768; // this equals 2^15
/**
* Gets the specified number of entries the cache can contain, this gets the value
* of {@value #TRANSACTOR_MAX_CACHE_SIZE} if set, the default
* {@value #TRANSACTOR_CACHE_TIMEOUT_DEFAULT} otherwise
*
* @param conf The FluoConfiguartion
* @return The maximum number of entries permitted in this cache
*/
public static long getTransactorMaxCacheSize(FluoConfiguration conf) {
long size = conf.getLong(TRANSACTOR_MAX_CACHE_SIZE, TRANSACTOR_MAX_CACHE_SIZE_DEFAULT);
if (size <= 0) {
throw new IllegalArgumentException(
"Cache size must be positive for " + TRANSACTOR_MAX_CACHE_SIZE);
}
return size;
}
public static final String TRANSACTOR_CACHE_TIMEOUT =
FLUO_IMPL_PREFIX + ".transactor.cache.expireTime.ms";
public static final long TRANSACTOR_CACHE_TIMEOUT_DEFAULT = 24 * 60 * 1000;
public static long getTransactorCacheTimeout(FluoConfiguration conf, TimeUnit tu) {
long millis = conf.getLong(TRANSACTOR_CACHE_TIMEOUT, TRANSACTOR_CACHE_TIMEOUT_DEFAULT);
if (millis <= 0) {
throw new IllegalArgumentException("Timeout must positive for " + TRANSACTOR_CACHE_TIMEOUT);
}
return tu.convert(millis, TimeUnit.MILLISECONDS);
}
public static final String ASYNC_CW_THREADS = FLUO_IMPL_PREFIX + ".async.cw.threads";
public static final int ASYNC_CW_THREADS_DEFAULT = 8;
public static final String ASYNC_CW_LIMIT = FLUO_IMPL_PREFIX + ".async.cw.limit";
@@ -27,6 +27,7 @@
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.fluo.accumulo.util.LongUtil;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.api.config.FluoConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,11 +48,14 @@
private static final Logger log = LoggerFactory.getLogger(TransactorCache.class);
public TransactorCache(Environment env) {
timeoutCache = CacheBuilder.newBuilder().maximumSize(1 << 15)
.expireAfterAccess(FluoConfigurationImpl.TX_INFO_CACHE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS)
.concurrencyLevel(10).build();
final FluoConfiguration conf = env.getConfiguration();
timeoutCache =
CacheBuilder.newBuilder().maximumSize(FluoConfigurationImpl.getTransactorMaxCacheSize(conf))
.expireAfterAccess(
FluoConfigurationImpl.getTransactorCacheTimeout(conf, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.concurrencyLevel(10).build();
this.env = env;
cache = new PathChildrenCache(env.getSharedResources().getCurator(),
@@ -41,7 +41,7 @@ public int weigh(PrimaryRowColumn key, TxInfo value) {
cache = CacheBuilder.newBuilder()
.expireAfterAccess(FluoConfigurationImpl.getTxIfoCacheTimeout(conf, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.maximumWeight(FluoConfigurationImpl.getTxInfoCacheSize(conf))
.maximumWeight(FluoConfigurationImpl.getTxInfoCacheWeight(conf))
.weigher(new TxStatusWeigher()).concurrencyLevel(10).build();
this.env = env;
}
@@ -52,8 +52,8 @@ public int weigh(Bytes key, ColumnVisibility vis) {
.expireAfterAccess(
FluoConfigurationImpl.getVisibilityCacheTimeout(conf, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.maximumWeight(FluoConfigurationImpl.getVisibilityCacheSize(conf)).weigher(new VisWeigher())
.concurrencyLevel(10).build();
.maximumWeight(FluoConfigurationImpl.getVisibilityCacheWeight(conf))
.weigher(new VisWeigher()).concurrencyLevel(10).build();
}
public ColumnVisibility getCV(Column col) {

0 comments on commit 955c86f

Please sign in to comment.