Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.cache.CacheAccessService;
import org.apache.hadoop.hbase.io.hfile.cache.CacheAccessServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -176,6 +178,8 @@ public class CacheConfig implements PropagatingConfigurationObserver {
// Local reference to the block cache
private final BlockCache blockCache;

private final CacheAccessService cacheAccessService;

private final ByteBuffAllocator byteBuffAllocator;

private double heapUsageThreshold;
Expand Down Expand Up @@ -231,6 +235,9 @@ public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache
}
this.blockCache = blockCache;
this.byteBuffAllocator = byteBuffAllocator;
this.cacheAccessService = blockCache != null
? CacheAccessServices.fromBlockCache(blockCache)
: CacheAccessServices.disabled();
}

/**
Expand All @@ -252,6 +259,9 @@ public CacheConfig(CacheConfig cacheConf) {
this.blockCache = cacheConf.blockCache;
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
this.heapUsageThreshold = cacheConf.heapUsageThreshold;
this.cacheAccessService = blockCache != null
? CacheAccessServices.fromBlockCache(blockCache)
: CacheAccessServices.disabled();
}

private CacheConfig() {
Expand All @@ -270,6 +280,7 @@ private CacheConfig() {
this.blockCache = null;
this.byteBuffAllocator = ByteBuffAllocator.HEAP;
this.heapUsageThreshold = DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD;
this.cacheAccessService = CacheAccessServices.disabled();
}

/**
Expand Down Expand Up @@ -469,6 +480,20 @@ public Optional<BlockCache> getBlockCache() {
return Optional.ofNullable(this.blockCache);
}

/**
* Returns the cache access service used by HFile read/write path callers.
* <p>
* This service is the migration-facing cache abstraction. For now it is backed by the existing
* {@link BlockCache} when block cache is configured, or by a disabled no-op implementation when
* block cache is unavailable. This keeps cache construction unchanged while allowing callers such
* as {@code HFileReaderImpl} to depend on {@link CacheAccessService}.
* </p>
* @return cache access service
*/
public CacheAccessService getCacheAccessService() {
return cacheAccessService;
}

public boolean isCombinedBlockCache() {
return blockCache instanceof CombinedBlockCache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.cache.CacheAccessService;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void run() {
// Don't use BlockIterator here, because it's designed to read load-on-open section.
long onDiskSizeOfNextBlock = -1;
// if we are here, block cache is present anyways
BlockCache cache = cacheConf.getBlockCache().get();
CacheAccessService cacheAccessService = cacheConf.getCacheAccessService();
boolean interrupted = false;
int blockCount = 0;
int dataBlockCount = 0;
Expand All @@ -78,10 +79,10 @@ public void run() {
// update the offset and move on to the next block without actually going read all
// the way to the cache.
BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
if (cache.isAlreadyCached(cacheKey).orElse(false)) {
if (cacheAccessService.isAlreadyCached(cacheKey).orElse(false)) {
// Right now, isAlreadyCached is only supported by BucketCache, which should
// always cache data blocks.
int size = cache.getBlockSize(cacheKey).orElse(0);
int size = cacheAccessService.getBlockSize(cacheKey).orElse(0);
if (size > 0) {
offset += size;
LOG.debug("Found block of size {} for cache key {}. "
Expand All @@ -108,11 +109,12 @@ public void run() {
/* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
try {
if (!cacheConf.isInMemory()) {
if (!cache.blockFitsIntoTheCache(block).orElse(true)) {
if (!cacheAccessService.blockFitsIntoTheCache(block).orElse(true)) {
LOG.warn(
"Interrupting prefetch for file {} because block {} of size {} "
+ "doesn't fit in the available cache space. isCacheEnabled: {}",
path, cacheKey, block.getOnDiskSizeWithHeader(), cache.isCacheEnabled());
path, cacheKey, block.getOnDiskSizeWithHeader(),
cacheAccessService.isCacheEnabled());
interrupted = true;
break;
}
Expand All @@ -139,8 +141,8 @@ public void run() {
}
}
if (!interrupted) {
cacheConf.getBlockCache().get().notifyFileCachingCompleted(path, blockCount,
dataBlockCount, offset);
cacheAccessService.notifyFileCachingCompleted(path, blockCount, dataBlockCount,
offset);
}
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.hfile.cache.CacheAccessService;
import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
Expand Down Expand Up @@ -1104,120 +1105,120 @@ public HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boo
boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
// Check cache for block. If found return.
BlockCache cache = cacheConf.getBlockCache().orElse(null);
CacheAccessService cacheAccessService = cacheConf.getCacheAccessService();
long cachedBlockBytesRead = 0;
if (cache != null) {
HFileBlock cachedBlock = null;
boolean isScanMetricsEnabled = ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
try {
cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics,
expectedBlockType);
if (cachedBlock != null) {
if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
HFileBlock compressedBlock = cachedBlock;
cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
// In case of compressed block after unpacking we can release the compressed block
if (compressedBlock != cachedBlock) {
compressedBlock.release();
}
}
try {
validateBlockType(cachedBlock, expectedBlockType);
} catch (IOException e) {
returnAndEvictBlock(cache, cacheKey, cachedBlock);
cachedBlock = null;
throw e;
HFileBlock cachedBlock = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm wondered if we should do this instead? basically, if we let the code follow throw the non-null/ non-NoOpCacheAccessService , either it's still using any code logic to do no ops, or if NoOpCacheAccessService has any bug or human error in the future, it may be easier to cause another issue.

what do you think?

Suggested change
HFileBlock cachedBlock = null;
HFileBlock cachedBlock = null;
if (cacheAccessService instanceof NoOpCacheAccessService) {
return null;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think I would avoid the explicit instanceof NoOpCacheAccessService check here.
CacheAccessService is intended to be the abstraction boundary, so HFileReaderImpl should not need
to know which concrete service implementation it received. The disabled-cache behavior belongs in
NoOpCacheAccessService; its getBlock(...) returns null, so the current flow already exits
without doing cache work.
If there is concern about future bugs in NoOpCacheAccessService, I think the better protection is
unit coverage for that implementation rather than adding implementation-specific checks in the read
path. This keeps the reader code independent of whether the service is block-cache-backed,
topology-backed, or no-op.

boolean isScanMetricsEnabled = ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
try {
cachedBlock = (HFileBlock) cacheAccessService.getBlock(cacheKey, cacheBlock, useLock,
updateCacheMetrics, expectedBlockType);
if (cachedBlock != null) {
if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
HFileBlock compressedBlock = cachedBlock;
cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
// In case of compressed block after unpacking we can release the compressed block
if (compressedBlock != cachedBlock) {
compressedBlock.release();
}
}
try {
validateBlockType(cachedBlock, expectedBlockType);
} catch (IOException e) {
returnAndEvictBlock(cacheAccessService, cacheKey, cachedBlock);
cachedBlock = null;
throw e;
}

if (expectedDataBlockEncoding == null) {
return cachedBlock;
}
DataBlockEncoding actualDataBlockEncoding = cachedBlock.getDataBlockEncoding();
// Block types other than data blocks always have
// DataBlockEncoding.NONE. To avoid false negative cache misses, only
// perform this check if cached block is a data block.
if (expectedDataBlockEncoding == null) {
return cachedBlock;
}
DataBlockEncoding actualDataBlockEncoding = cachedBlock.getDataBlockEncoding();
// Block types other than data blocks always have
// DataBlockEncoding.NONE. To avoid false negative cache misses, only
// perform this check if cached block is a data block.
if (
cachedBlock.getBlockType().isData()
&& !actualDataBlockEncoding.equals(expectedDataBlockEncoding)
) {
// This mismatch may happen if a Scanner, which is used for say a
// compaction, tries to read an encoded block from the block cache.
// The reverse might happen when an EncodedScanner tries to read
// un-encoded blocks which were cached earlier.
//
// Because returning a data block with an implicit BlockType mismatch
// will cause the requesting scanner to throw a disk read should be
// forced here. This will potentially cause a significant number of
// cache misses, so update so we should keep track of this as it might
// justify the work on a CompoundScanner.
if (
cachedBlock.getBlockType().isData()
&& !actualDataBlockEncoding.equals(expectedDataBlockEncoding)
!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE)
&& !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)
) {
// This mismatch may happen if a Scanner, which is used for say a
// compaction, tries to read an encoded block from the block cache.
// The reverse might happen when an EncodedScanner tries to read
// un-encoded blocks which were cached earlier.
//
// Because returning a data block with an implicit BlockType mismatch
// will cause the requesting scanner to throw a disk read should be
// forced here. This will potentially cause a significant number of
// cache misses, so update so we should keep track of this as it might
// justify the work on a CompoundScanner.
if (
!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE)
&& !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)
) {
// If the block is encoded but the encoding does not match the
// expected encoding it is likely the encoding was changed but the
// block was not yet evicted. Evictions on file close happen async
// so blocks with the old encoding still linger in cache for some
// period of time. This event should be rare as it only happens on
// schema definition change.
LOG.info(
"Evicting cached block with key {} because data block encoding mismatch; "
+ "expected {}, actual {}, path={}",
cacheKey, actualDataBlockEncoding, expectedDataBlockEncoding, path);
// This is an error scenario. so here we need to release the block.
returnAndEvictBlock(cache, cacheKey, cachedBlock);
}
cachedBlock = null;
return null;
// If the block is encoded but the encoding does not match the
// expected encoding it is likely the encoding was changed but the
// block was not yet evicted. Evictions on file close happen async
// so blocks with the old encoding still linger in cache for some
// period of time. This event should be rare as it only happens on
// schema definition change.
LOG.info(
"Evicting cached block with key {} because data block encoding mismatch; "
+ "expected {}, actual {}, path={}",
cacheKey, actualDataBlockEncoding, expectedDataBlockEncoding, path);
// This is an error scenario. so here we need to release the block.
returnAndEvictBlock(cacheAccessService, cacheKey, cachedBlock);
}
return cachedBlock;
}
} catch (Exception e) {
if (cachedBlock != null) {
returnAndEvictBlock(cache, cacheKey, cachedBlock);
cachedBlock = null;
return null;
}
LOG.warn("Failed retrieving block from cache with key {}. "
+ "\n Evicting this block from cache and will read it from file system. "
+ "\n Exception details: ", cacheKey, e);
if (LOG.isDebugEnabled()) {
LOG.debug("Further tracing details for failed block cache retrieval:"
return cachedBlock;
}
} catch (Exception e) {
if (cachedBlock != null) {
returnAndEvictBlock(cacheAccessService, cacheKey, cachedBlock);
}
LOG.warn("Failed retrieving block from cache with key {}. "
+ "\n Evicting this block from cache and will read it from file system. "
+ "\n Exception details: ", cacheKey, e);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Further tracing details for failed block cache retrieval:"
+ "\n Complete File path - {}," + "\n Expected Block Type - {}, Actual Block Type - {},"
+ "\n Cache compressed - {}" + "\n Header size (after deserialized from cache) - {}"
+ "\n Size with header - {}" + "\n Uncompressed size without header - {} "
+ "\n Total byte buffer size - {}" + "\n Encoding code - {}", this.path,
expectedBlockType, (cachedBlock != null ? cachedBlock.getBlockType() : "N/A"),
(expectedBlockType != null
? cacheConf.shouldCacheCompressed(expectedBlockType.getCategory())
: "N/A"),
(cachedBlock != null ? cachedBlock.headerSize() : "N/A"),
(cachedBlock != null ? cachedBlock.getOnDiskSizeWithHeader() : "N/A"),
(cachedBlock != null ? cachedBlock.getUncompressedSizeWithoutHeader() : "N/A"),
(cachedBlock != null ? cachedBlock.getBufferReadOnly().limit() : "N/A"),
(cachedBlock != null
? cachedBlock.getBufferReadOnly().getShort(cachedBlock.headerSize())
: "N/A"));
}
return null;
} finally {
// Count bytes read as cached block is being returned
if (isScanMetricsEnabled && cachedBlock != null) {
cachedBlockBytesRead = cachedBlock.getOnDiskSizeWithHeader();
// Account for the header size of the next block if it exists
if (cachedBlock.getNextBlockOnDiskSize() > 0) {
cachedBlockBytesRead += cachedBlock.headerSize();
}
}
if (cachedBlockBytesRead > 0) {
ThreadLocalServerSideScanMetrics.addBytesReadFromBlockCache(cachedBlockBytesRead);
+ "\n Total byte buffer size - {}" + "\n Encoding code - {}",
this.path, expectedBlockType, (cachedBlock != null ? cachedBlock.getBlockType() : "N/A"),
(expectedBlockType != null
? cacheConf.shouldCacheCompressed(expectedBlockType.getCategory())
: "N/A"),
(cachedBlock != null ? cachedBlock.headerSize() : "N/A"),
(cachedBlock != null ? cachedBlock.getOnDiskSizeWithHeader() : "N/A"),
(cachedBlock != null ? cachedBlock.getUncompressedSizeWithoutHeader() : "N/A"),
(cachedBlock != null ? cachedBlock.getBufferReadOnly().limit() : "N/A"),
(cachedBlock != null
? cachedBlock.getBufferReadOnly().getShort(cachedBlock.headerSize())
: "N/A"));
}
return null;
} finally {
// Count bytes read as cached block is being returned
if (isScanMetricsEnabled && cachedBlock != null) {
cachedBlockBytesRead = cachedBlock.getOnDiskSizeWithHeader();
// Account for the header size of the next block if it exists
if (cachedBlock.getNextBlockOnDiskSize() > 0) {
cachedBlockBytesRead += cachedBlock.headerSize();
}
}
if (cachedBlockBytesRead > 0) {
ThreadLocalServerSideScanMetrics.addBytesReadFromBlockCache(cachedBlockBytesRead);
}
}
return null;
}

private void returnAndEvictBlock(BlockCache cache, BlockCacheKey cacheKey, Cacheable block) {
private void returnAndEvictBlock(CacheAccessService cacheAccessService, BlockCacheKey cacheKey,
Cacheable block) {
block.release();
cache.evictBlock(cacheKey);
cacheAccessService.evictBlock(cacheKey);
}

/**
Expand Down Expand Up @@ -1373,9 +1374,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
// type in the cache key, and we expect it to match on a cache hit.
if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
// Remember to release the block when in exceptional path.
cacheConf.getBlockCache().ifPresent(cache -> {
returnAndEvictBlock(cache, cacheKey, cachedBlock);
});
CacheAccessService cacheAccessService = cacheConf.getCacheAccessService();
returnAndEvictBlock(cacheAccessService, cacheKey, cachedBlock);
throw new IOException("Cached block under key " + cacheKey + " "
+ "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
+ dataBlockEncoder.getDataBlockEncoding() + "), path=" + path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Objects;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
Expand Down Expand Up @@ -297,4 +298,11 @@ public void onConfigurationChange(Configuration config) {
Objects.requireNonNull(config, "config must not be null");
blockCache.onConfigurationChange(config);
}

@Override
public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount,
long size) {
Objects.requireNonNull(fileName, "fileName must not be null");
blockCache.notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
}
}
Loading