Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27796 Improve MemcachedBlockCache #5181

Merged
merged 3 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion hbase-external-blockcache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,63 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<scope>test</scope>
</dependency>
virajjasani marked this conversation as resolved.
Show resolved Hide resolved
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-logging</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-zookeeper</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.thimbleware.jmemcached</groupId>
<artifactId>jmemcached-core</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,29 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.transcoders.Transcoder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons
* that are tuned well and have a good network connection to the HBase regionservers. Any other use
Expand All @@ -56,6 +63,10 @@ public class MemcachedBlockCache implements BlockCache {
// that just in case this client is used with those versions.
public static final int MAX_SIZE = 1020 * 1024;

// Start memcached with -I <MAX_SIZE> to ensure it has the ability to store blocks of this size
public static final int MAX_TIME = 60 * 60 * 24 * 30; // 30 days, max allowed per the memcached
// spec

// Config key for what memcached servers to use.
// They should be specified in a comma sperated list with ports.
// like:
Expand All @@ -67,10 +78,20 @@ public class MemcachedBlockCache implements BlockCache {
public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze";
public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false;
public static final int STAT_THREAD_PERIOD = 60 * 5;

private final MemcachedClient client;
private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
private final AtomicLong cachedCount = new AtomicLong();
private final AtomicLong notCachedCount = new AtomicLong();
private final AtomicLong cacheErrorCount = new AtomicLong();
private final AtomicLong timeoutCount = new AtomicLong();

/** Statistics thread schedule pool (for heavy debugging, could remove) */
private transient final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("MemcachedBlockCacheStatsExecutor").setDaemon(true).build());
apurtell marked this conversation as resolved.
Show resolved Hide resolved

public MemcachedBlockCache(Configuration c) throws IOException {
LOG.info("Creating MemcachedBlockCache");
Expand All @@ -80,18 +101,15 @@ public MemcachedBlockCache(Configuration c) throws IOException {
boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);

ConnectionFactoryBuilder builder =
// Cap the max time before anything times out
new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
// Don't keep threads around past the end of days.
.setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
.setUseNagleAlgorithm(false) // Ain't nobody got time for that
.setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case
.setUseNagleAlgorithm(false).setReadBufferSize(MAX_SIZE);

// Assume only the localhost is serving memecached.
// Assume only the localhost is serving memcached.
// A la mcrouter or co-locating memcached with split regionservers.
//
// If this config is a pool of memecached servers they will all be used according to the
// default hashing scheme defined by the memcache client. Spy Memecache client in this
// If this config is a pool of memcached servers they will all be used according to the
// default hashing scheme defined by the memcached client. Spy Memecache client in this
// case.
String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211");
String[] servers = serverListString.split(",");
Expand All @@ -104,6 +122,8 @@ public MemcachedBlockCache(Configuration c) throws IOException {
}

client = new MemcachedClient(builder.build(), serverAddresses);
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
}

@Override
Expand All @@ -115,18 +135,24 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory)
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
if (buf instanceof HFileBlock) {
client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc).addListener(f -> {
if (buf.getSerializedLength() > MAX_SIZE) {
LOG.debug("Block of type {} with key {} is too large, size={}, max={}, will not cache",
buf.getClass(), cacheKey, buf.getSerializedLength(), MAX_SIZE);
notCachedCount.incrementAndGet();
return;
}
client.set(cacheKey.toString(), MAX_TIME, (HFileBlock) buf, tc).addListener(f -> {
try {
f.get();
} catch (ExecutionException e) {
LOG.warn("Failed to cache block", e);
cachedCount.incrementAndGet();
} catch (Exception e) {
LOG.warn("Failed to cache block with key " + cacheKey, e);
cacheErrorCount.incrementAndGet();
}
});
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"MemcachedBlockCache can not cache Cacheable's of type " + buf.getClass().toString());
}
LOG.debug("Can not cache Cacheables of type {} with key {}", buf.getClass(), cacheKey);
notCachedCount.incrementAndGet();
}
}

Expand All @@ -139,17 +165,24 @@ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repea
try (Scope traceScope = span.makeCurrent()) {
result = client.get(cacheKey.toString(), tc);
} catch (Exception e) {
// Catch a pretty broad set of exceptions to limit any changes in the memecache client
// Catch a pretty broad set of exceptions to limit any changes in the memcache client
// and how it handles failures from leaking into the read path.
if (LOG.isDebugEnabled()) {
LOG.debug(
"Exception pulling from memcached [ " + cacheKey.toString() + " ]. Treating as a miss.",
e);
if (
(e instanceof OperationTimeoutException) || ((e instanceof RuntimeException)
&& (e.getCause() instanceof OperationTimeoutException))
) {
timeoutCount.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("Timeout getting key " + cacheKey.toString(), e);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Exception getting key " + cacheKey.toString(), e);
}
}
apurtell marked this conversation as resolved.
Show resolved Hide resolved
result = null;
} finally {
span.end();
// Update stats if this request doesn't have it turned off 100% of the time
if (updateCacheMetrics) {
if (result == null) {
cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
Expand All @@ -158,7 +191,6 @@ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repea
}
}
}

return result;
}

Expand Down Expand Up @@ -194,6 +226,22 @@ public CacheStats getStats() {
@Override
public void shutdown() {
client.shutdown();
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
Thread.currentThread().interrupt();
break;
}
}
}
if (!this.scheduleThreadPool.isShutdown()) {
List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
LOG.debug("Still running " + runnables);
}
}

@Override
Expand Down Expand Up @@ -289,4 +337,38 @@ public int getMaxSize() {
return MAX_SIZE;
}
}

private static class StatisticsThread extends Thread {

private final MemcachedBlockCache c;

public StatisticsThread(MemcachedBlockCache c) {
super("MemcachedBlockCacheStats");
setDaemon(true);
this.c = c;
}

@Override
public void run() {
c.logStats();
}

}

public void logStats() {
LOG.info("cached=" + cachedCount.get() + ", notCached=" + notCachedCount.get()
+ ", cacheErrors=" + cacheErrorCount.get() + ", timeouts=" + timeoutCount.get() + ", reads="
+ cacheStats.getRequestCount() + ", " + "hits=" + cacheStats.getHitCount() + ", hitRatio="
+ (cacheStats.getHitCount() == 0
? "0"
: (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
+ "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
+ cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
+ (cacheStats.getHitCachingCount() == 0
? "0,"
: (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
+ "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
+ cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction());
}

}
Loading