Skip to content

Commit

Permalink
HBASE-27796 Improve MemcachedBlockCache (#5181)
Browse files Browse the repository at this point in the history
Track and log better stats.
Fix use of memcached API such that we are not accidentally immediately expiring keys.
Use a default retention period of 30 days, the max per memcached spec.
Use set instead of add to store keys.
Gracefully handle failures to cache and read timeouts.
Add unit tests using jmemcached as a test dependency.

Signed-off-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
apurtell committed Apr 17, 2023
1 parent 31c4aea commit d56e7f2
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 24 deletions.
58 changes: 57 additions & 1 deletion hbase-external-blockcache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,63 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-logging</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-zookeeper</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.thimbleware.jmemcached</groupId>
<artifactId>jmemcached-core</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,47 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.transcoders.Transcoder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons
* that are tuned well and have a good network connection to the HBase regionservers. Any other use
* will likely slow down HBase greatly.
*/
@InterfaceAudience.Private
@SuppressWarnings("FutureReturnValueIgnored")
public class MemcachedBlockCache implements BlockCache {
private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName());

// Some memcache versions won't take more than 1024 * 1024. So set the limit below
// that just in case this client is used with those versions.
public static final int MAX_SIZE = 1020 * 1024;

// Start memcached with -I <MAX_SIZE> to ensure it has the ability to store blocks of this size
public static final int MAX_TIME = 60 * 60 * 24 * 30; // 30 days, max allowed per the memcached
// spec

// Config key for what memcached servers to use.
// They should be specified in a comma sperated list with ports.
// like:
Expand All @@ -67,10 +79,20 @@ public class MemcachedBlockCache implements BlockCache {
public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze";
public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false;
public static final int STAT_THREAD_PERIOD = 60 * 5;

private final MemcachedClient client;
private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
private final AtomicLong cachedCount = new AtomicLong();
private final AtomicLong notCachedCount = new AtomicLong();
private final AtomicLong cacheErrorCount = new AtomicLong();
private final AtomicLong timeoutCount = new AtomicLong();

/** Statistics thread schedule pool (for heavy debugging, could remove) */
private transient final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("MemcachedBlockCacheStatsExecutor").setDaemon(true).build());

public MemcachedBlockCache(Configuration c) throws IOException {
LOG.info("Creating MemcachedBlockCache");
Expand All @@ -80,18 +102,15 @@ public MemcachedBlockCache(Configuration c) throws IOException {
boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);

ConnectionFactoryBuilder builder =
// Cap the max time before anything times out
new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
// Don't keep threads around past the end of days.
.setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
.setUseNagleAlgorithm(false) // Ain't nobody got time for that
.setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case
.setUseNagleAlgorithm(false).setReadBufferSize(MAX_SIZE);

// Assume only the localhost is serving memecached.
// Assume only the localhost is serving memcached.
// A la mcrouter or co-locating memcached with split regionservers.
//
// If this config is a pool of memecached servers they will all be used according to the
// default hashing scheme defined by the memcache client. Spy Memecache client in this
// If this config is a pool of memcached servers they will all be used according to the
// default hashing scheme defined by the memcached client. Spy Memecache client in this
// case.
String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211");
String[] servers = serverListString.split(",");
Expand All @@ -104,29 +123,36 @@ public MemcachedBlockCache(Configuration c) throws IOException {
}

client = new MemcachedClient(builder.build(), serverAddresses);
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
}

@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
cacheBlock(cacheKey, buf);
}

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
if (buf instanceof HFileBlock) {
client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc).addListener(f -> {
if (buf.getSerializedLength() > MAX_SIZE) {
LOG.debug("Block of type {} with key {} is too large, size={}, max={}, will not cache",
buf.getClass(), cacheKey, buf.getSerializedLength(), MAX_SIZE);
notCachedCount.incrementAndGet();
return;
}
client.set(cacheKey.toString(), MAX_TIME, (HFileBlock) buf, tc).addListener(f -> {
try {
f.get();
} catch (ExecutionException e) {
LOG.warn("Failed to cache block", e);
cachedCount.incrementAndGet();
} catch (Exception e) {
LOG.warn("Failed to cache block with key " + cacheKey, e);
cacheErrorCount.incrementAndGet();
}
});
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"MemcachedBlockCache can not cache Cacheable's of type " + buf.getClass().toString());
}
LOG.debug("Can not cache Cacheables of type {} with key {}", buf.getClass(), cacheKey);
notCachedCount.incrementAndGet();
}
}

Expand All @@ -139,17 +165,25 @@ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repea
try (Scope traceScope = span.makeCurrent()) {
result = client.get(cacheKey.toString(), tc);
} catch (Exception e) {
// Catch a pretty broad set of exceptions to limit any changes in the memecache client
// Catch a pretty broad set of exceptions to limit any changes in the memcache client
// and how it handles failures from leaking into the read path.
if (LOG.isDebugEnabled()) {
LOG.debug(
"Exception pulling from memcached [ " + cacheKey.toString() + " ]. Treating as a miss.",
e);
if (
(e instanceof OperationTimeoutException) || ((e instanceof RuntimeException)
&& (e.getCause() instanceof OperationTimeoutException))
) {
timeoutCount.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("Timeout getting key " + cacheKey.toString(), e);
}
} else {
cacheErrorCount.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("Exception getting key " + cacheKey.toString(), e);
}
}
result = null;
} finally {
span.end();
// Update stats if this request doesn't have it turned off 100% of the time
if (updateCacheMetrics) {
if (result == null) {
cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
Expand All @@ -158,7 +192,6 @@ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repea
}
}
}

return result;
}

Expand Down Expand Up @@ -194,6 +227,22 @@ public CacheStats getStats() {
@Override
public void shutdown() {
client.shutdown();
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
Thread.currentThread().interrupt();
break;
}
}
}
if (!this.scheduleThreadPool.isShutdown()) {
List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
LOG.debug("Still running " + runnables);
}
}

@Override
Expand Down Expand Up @@ -289,4 +338,38 @@ public int getMaxSize() {
return MAX_SIZE;
}
}

private static class StatisticsThread extends Thread {

private final MemcachedBlockCache c;

public StatisticsThread(MemcachedBlockCache c) {
super("MemcachedBlockCacheStats");
setDaemon(true);
this.c = c;
}

@Override
public void run() {
c.logStats();
}

}

public void logStats() {
LOG.info("cached=" + cachedCount.get() + ", notCached=" + notCachedCount.get()
+ ", cacheErrors=" + cacheErrorCount.get() + ", timeouts=" + timeoutCount.get() + ", reads="
+ cacheStats.getRequestCount() + ", " + "hits=" + cacheStats.getHitCount() + ", hitRatio="
+ (cacheStats.getHitCount() == 0
? "0"
: (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
+ "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
+ cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
+ (cacheStats.getHitCachingCount() == 0
? "0,"
: (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
+ "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
+ cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction());
}

}
Loading

0 comments on commit d56e7f2

Please sign in to comment.