Skip to content

Commit

Permalink
fixes #939 made transactor cache configurable (#956)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kenneth McFarland authored and keith-turner committed Oct 27, 2017
1 parent c6bdbe5 commit 955c86f
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 24 deletions.
Expand Up @@ -104,21 +104,21 @@ public static int getTxCommitMemory(FluoConfiguration conf) {
return m;
}

public static final String TX_INFO_CACHE_SIZE = FLUO_IMPL_PREFIX + ".tx.failed.cache.size.mb";
public static final long TX_INFO_CACHE_SIZE_DEFAULT = 10_000_000;
public static final String TX_INFO_CACHE_WEIGHT = FLUO_IMPL_PREFIX + ".tx.failed.cache.weight.mb";
public static final long TX_INFO_CACHE_WEIGHT_DEFAULT = 10_000_000;

/**
* Gets the cache size
* Gets the txinfo cache weight
*
* @param conf The FluoConfiguration
* @return The size of the cache value from the property value {@value #TX_INFO_CACHE_SIZE}
* if it is set, else the value of the default value {@value #TX_INFO_CACHE_SIZE_DEFAULT}
* @return The size of the cache value from the property value {@value #TX_INFO_CACHE_WEIGHT}
* if it is set, else the value of the default value {@value #TX_INFO_CACHE_WEIGHT_DEFAULT}
*/

public static long getTxInfoCacheSize(FluoConfiguration conf) {
long size = conf.getLong(TX_INFO_CACHE_SIZE, TX_INFO_CACHE_SIZE_DEFAULT);
public static long getTxInfoCacheWeight(FluoConfiguration conf) {
long size = conf.getLong(TX_INFO_CACHE_WEIGHT, TX_INFO_CACHE_WEIGHT_DEFAULT);
if (size <= 0) {
throw new IllegalArgumentException("Cache size must be positive for " + TX_INFO_CACHE_SIZE);
throw new IllegalArgumentException("Cache size must be positive for " + TX_INFO_CACHE_WEIGHT);
}
return size;
}
Expand All @@ -144,22 +144,23 @@ public static long getTxIfoCacheTimeout(FluoConfiguration conf, TimeUnit tu) {
return tu.convert(millis, TimeUnit.MILLISECONDS);
}

public static final String VISIBILITY_CACHE_SIZE = FLUO_IMPL_PREFIX + ".visibility.cache.size.mb";
public static final long VISIBILITY_CACHE_SIZE_DEFAULT = 10_000_000;
public static final String VISIBILITY_CACHE_WEIGHT =
FLUO_IMPL_PREFIX + ".visibility.cache.weight.mb";
public static final long VISIBILITY_CACHE_WEIGHT_DEFAULT = 10_000_000;

/**
* Gets the cache size
* Gets the visibility cache weight
*
* @param conf The FluoConfiguration
* @return The size of the cache value from the property value {@value #VISIBILITY_CACHE_SIZE}
* if it is set, else the value of the default value {@value #VISIBILITY_CACHE_SIZE_DEFAULT}
* @return The size of the cache value from the property value {@value #VISIBILITY_CACHE_WEIGHT}
* if it is set, else the value of the default value {@value #VISIBILITY_CACHE_WEIGHT_DEFAULT}
*/

public static long getVisibilityCacheSize(FluoConfiguration conf) {
long size = conf.getLong(VISIBILITY_CACHE_SIZE, VISIBILITY_CACHE_SIZE_DEFAULT);
public static long getVisibilityCacheWeight(FluoConfiguration conf) {
long size = conf.getLong(VISIBILITY_CACHE_WEIGHT, VISIBILITY_CACHE_WEIGHT_DEFAULT);
if (size <= 0) {
throw new IllegalArgumentException(
"Cache size must be positive for " + VISIBILITY_CACHE_SIZE);
"Cache size must be positive for " + VISIBILITY_CACHE_WEIGHT);
}
return size;
}
Expand All @@ -185,6 +186,41 @@ public static long getVisibilityCacheTimeout(FluoConfiguration conf, TimeUnit tu
return tu.convert(millis, TimeUnit.MILLISECONDS);
}

private static final String TRANSACTOR_MAX_CACHE_SIZE =
FLUO_IMPL_PREFIX + ".transactor.cache.max.size";
private static final long TRANSACTOR_MAX_CACHE_SIZE_DEFAULT = 32768; // this equals 2^15

/**
* Gets the specified number of entries the cache can contain, this gets the value
* of {@value #TRANSACTOR_MAX_CACHE_SIZE} if set, the default
* {@value #TRANSACTOR_CACHE_TIMEOUT_DEFAULT} otherwise
*
* @param conf The FluoConfiguartion
* @return The maximum number of entries permitted in this cache
*/

public static long getTransactorMaxCacheSize(FluoConfiguration conf) {
long size = conf.getLong(TRANSACTOR_MAX_CACHE_SIZE, TRANSACTOR_MAX_CACHE_SIZE_DEFAULT);
if (size <= 0) {
throw new IllegalArgumentException(
"Cache size must be positive for " + TRANSACTOR_MAX_CACHE_SIZE);
}
return size;
}

public static final String TRANSACTOR_CACHE_TIMEOUT =
FLUO_IMPL_PREFIX + ".transactor.cache.expireTime.ms";

public static final long TRANSACTOR_CACHE_TIMEOUT_DEFAULT = 24 * 60 * 1000;

public static long getTransactorCacheTimeout(FluoConfiguration conf, TimeUnit tu) {
long millis = conf.getLong(TRANSACTOR_CACHE_TIMEOUT, TRANSACTOR_CACHE_TIMEOUT_DEFAULT);
if (millis <= 0) {
throw new IllegalArgumentException("Timeout must positive for " + TRANSACTOR_CACHE_TIMEOUT);
}
return tu.convert(millis, TimeUnit.MILLISECONDS);
}

public static final String ASYNC_CW_THREADS = FLUO_IMPL_PREFIX + ".async.cw.threads";
public static final int ASYNC_CW_THREADS_DEFAULT = 8;
public static final String ASYNC_CW_LIMIT = FLUO_IMPL_PREFIX + ".async.cw.limit";
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.fluo.accumulo.util.LongUtil;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.api.config.FluoConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,11 +48,14 @@ public enum TcStatus {
private static final Logger log = LoggerFactory.getLogger(TransactorCache.class);

public TransactorCache(Environment env) {

timeoutCache = CacheBuilder.newBuilder().maximumSize(1 << 15)
.expireAfterAccess(FluoConfigurationImpl.TX_INFO_CACHE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS)
.concurrencyLevel(10).build();
final FluoConfiguration conf = env.getConfiguration();

timeoutCache =
CacheBuilder.newBuilder().maximumSize(FluoConfigurationImpl.getTransactorMaxCacheSize(conf))
.expireAfterAccess(
FluoConfigurationImpl.getTransactorCacheTimeout(conf, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.concurrencyLevel(10).build();

this.env = env;
cache = new PathChildrenCache(env.getSharedResources().getCurator(),
Expand Down
Expand Up @@ -41,7 +41,7 @@ public int weigh(PrimaryRowColumn key, TxInfo value) {
cache = CacheBuilder.newBuilder()
.expireAfterAccess(FluoConfigurationImpl.getTxIfoCacheTimeout(conf, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.maximumWeight(FluoConfigurationImpl.getTxInfoCacheSize(conf))
.maximumWeight(FluoConfigurationImpl.getTxInfoCacheWeight(conf))
.weigher(new TxStatusWeigher()).concurrencyLevel(10).build();
this.env = env;
}
Expand Down
Expand Up @@ -52,8 +52,8 @@ public int weigh(Bytes key, ColumnVisibility vis) {
.expireAfterAccess(
FluoConfigurationImpl.getVisibilityCacheTimeout(conf, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.maximumWeight(FluoConfigurationImpl.getVisibilityCacheSize(conf)).weigher(new VisWeigher())
.concurrencyLevel(10).build();
.maximumWeight(FluoConfigurationImpl.getVisibilityCacheWeight(conf))
.weigher(new VisWeigher()).concurrencyLevel(10).build();
}

public ColumnVisibility getCV(Column col) {
Expand Down

0 comments on commit 955c86f

Please sign in to comment.