Skip to content

Commit

Permalink
HBASE-17072 CPU usage starts to climb up to 90-100% when using G1GC
Browse files Browse the repository at this point in the history
Removes ThreadLocal. Uses AtomicReference instead (based on patch
posted up in HBASE-10676 "Removing ThreadLocal of PrefetchedHeader in
HFileBlock.FSReaderV2 make higher perforamce of scan")

Signed-off-by: Michael Stack <stack@apache.org>
  • Loading branch information
saintstack committed Nov 28, 2016
1 parent bbb81a7 commit 3d5e686
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 62 deletions.
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;


Expand Down Expand Up @@ -1366,13 +1367,20 @@ interface FSReader {
} }


/** /**
* We always prefetch the header of the next block, so that we know its * Data-structure to use caching the header of the NEXT block. Only works if next read
* on-disk size in advance and can read it in one operation. * that comes in here is next in sequence in this block.
*
* When we read, we read current block and the next blocks' header. We do this so we have
* the length of the next block to read if the hfile index is not available (rare).
* TODO: Review!! This trick of reading next blocks header is a pain, complicates our
* read path and I don't think it needed given it rare we don't have the block index
* (it is 'normally' present, gotten from the hfile index). FIX!!!
*/ */
private static class PrefetchedHeader { private static class PrefetchedHeader {
long offset = -1; long offset = -1;
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);

@Override @Override
public String toString() { public String toString() {
return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
Expand All @@ -1393,19 +1401,13 @@ static class FSReaderImpl implements FSReader {
private final HFileBlockDefaultDecodingContext defaultDecodingCtx; private final HFileBlockDefaultDecodingContext defaultDecodingCtx;


/** /**
* When we read a block, we overread and pull in the next blocks header too. We will save it * Cache of the NEXT header after this. Check it is indeed next blocks header
* here. If moving serially through the file, we will trip over this caching of the next blocks * before using it. TODO: Review. This overread into next block to fetch
* header so we won't have to do explicit seek to find next blocks lengths, etc. * next blocks header seems unnecessary given we usually get the block size
* from the hfile index. Review!
*/ */
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread = private AtomicReference<PrefetchedHeader> prefetchedHeader =
new ThreadLocal<PrefetchedHeader>() { new AtomicReference<PrefetchedHeader>(new PrefetchedHeader());
@Override
public PrefetchedHeader initialValue() {
return new PrefetchedHeader();
}
};

/** Compression algorithm used by the {@link HFile} */


/** The size of the file we are reading from, or -1 if unknown. */ /** The size of the file we are reading from, or -1 if unknown. */
protected long fileSize; protected long fileSize;
Expand Down Expand Up @@ -1455,13 +1457,17 @@ public BlockIterator blockRange(final long startOffset, final long endOffset) {
final FSReader owner = this; // handle for inner class final FSReader owner = this; // handle for inner class
return new BlockIterator() { return new BlockIterator() {
private long offset = startOffset; private long offset = startOffset;
// Cache length of next block. Current block has the length of next block in it.
private long length = -1;


@Override @Override
public HFileBlock nextBlock() throws IOException { public HFileBlock nextBlock() throws IOException {
if (offset >= endOffset) if (offset >= endOffset) {
return null; return null;
HFileBlock b = readBlockData(offset, -1, false); }
HFileBlock b = readBlockData(offset, length, false);
offset += b.getOnDiskSizeWithHeader(); offset += b.getOnDiskSizeWithHeader();
length = b.getNextBlockOnDiskSize();
return b.unpack(fileContext, owner); return b.unpack(fileContext, owner);
} }


Expand Down Expand Up @@ -1546,7 +1552,8 @@ protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffs
* *
* @param offset the offset in the stream to read at * @param offset the offset in the stream to read at
* @param onDiskSizeWithHeaderL the on-disk size of the block, including * @param onDiskSizeWithHeaderL the on-disk size of the block, including
* the header, or -1 if unknown * the header, or -1 if unknown; i.e. when iterating over blocks reading
* in the file metadata info.
* @param pread whether to use a positional read * @param pread whether to use a positional read
*/ */
@Override @Override
Expand Down Expand Up @@ -1630,31 +1637,6 @@ private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final
return (int)onDiskSizeWithHeaderL; return (int)onDiskSizeWithHeaderL;
} }


/**
* Check threadlocal cache for this block's header; we usually read it on the tail of reading
* the previous block to save a seek. Otherwise, we have to do a seek to read the header before
* we can pull in the block.
* @return The cached block header or null if not found.
* @see #cacheNextBlockHeader(long, byte[], int, int)
*/
private ByteBuffer getCachedHeader(final long offset) {
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
// PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
return prefetchedHeader != null && prefetchedHeader.offset == offset?
prefetchedHeader.buf: null;
}

/**
* Save away the next blocks header in thread local.
* @see #getCachedHeader(long)
*/
private void cacheNextBlockHeader(final long nextBlockOffset,
final byte [] header, final int headerOffset, final int headerLength) {
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
prefetchedHeader.offset = nextBlockOffset;
System.arraycopy(header, headerOffset, prefetchedHeader.header, 0, headerLength);
}

/** /**
* Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something
* is not right. * is not right.
Expand All @@ -1671,24 +1653,59 @@ private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer
} }
} }


/**
* Check atomic reference cache for this block's header. Cache only good if next
* read coming through is next in sequence in the block. We read next block's
* header on the tail of reading the previous block to save a seek. Otherwise,
* we have to do a seek to read the header before we can pull in the block OR
* we have to backup the stream because we over-read (the next block's header).
* @see PrefetchedHeader
* @return The cached block header or null if not found.
* @see #cacheNextBlockHeader(long, byte[], int, int)
*/
private ByteBuffer getCachedHeader(final long offset) {
PrefetchedHeader ph = this.prefetchedHeader.get();
return ph != null && ph.offset == offset? ph.buf: null;
}

/**
* Save away the next blocks header in atomic reference.
* @see #getCachedHeader(long)
* @see PrefetchedHeader
*/
private void cacheNextBlockHeader(final long offset,
final byte [] header, final int headerOffset, final int headerLength) {
PrefetchedHeader ph = new PrefetchedHeader();
ph.offset = offset;
System.arraycopy(header, headerOffset, ph.header, 0, headerLength);
this.prefetchedHeader.set(ph);
}

/** /**
* Reads a version 2 block. * Reads a version 2 block.
* *
* @param offset the offset in the stream to read at * @param offset the offset in the stream to read at. Usually the
* @param onDiskSizeWithHeaderL the on-disk size of the block, including * @param onDiskSizeWithHeaderL the on-disk size of the block, including
* the header and checksums if present or -1 if unknown * the header and checksums if present or -1 if unknown (as a long). Can be -1
* if we are doing raw iteration of blocks as when loading up file metadata; i.e.
* the first read of a new file (TODO: Fix! See HBASE-17072). Usually non-null gotten
* from the file index.
* @param pread whether to use a positional read * @param pread whether to use a positional read
* @param verifyChecksum Whether to use HBase checksums. * @param verifyChecksum Whether to use HBase checksums.
* If HBase checksum is switched off, then use HDFS checksum. * If HBase checksum is switched off, then use HDFS checksum.
* @return the HFileBlock or null if there is a HBase checksum mismatch * @return the HFileBlock or null if there is a HBase checksum mismatch
*/ */
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException { long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum)
throws IOException {
if (offset < 0) { if (offset < 0) {
throw new IOException("Invalid offset=" + offset + " trying to read " throw new IOException("Invalid offset=" + offset + " trying to read "
+ "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
} }
int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
// Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
// and will save us having to seek the stream backwards to reread the header we
// read the last time through here.
ByteBuffer headerBuf = getCachedHeader(offset); ByteBuffer headerBuf = getCachedHeader(offset);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Reading " + this.fileContext.getHFileName() + " at offset=" + offset + LOG.trace("Reading " + this.fileContext.getHFileName() + " at offset=" + offset +
Expand All @@ -1697,10 +1714,15 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
} }
if (onDiskSizeWithHeader <= 0) { if (onDiskSizeWithHeader <= 0) {
// We were not passed the block size. Need to get it from the header. If header was not in // We were not passed the block size. Need to get it from the header. If header was not in
// cache, need to seek to pull it in. This latter might happen when we are doing the first // cache, need to seek to pull it in. This is costly and should happen very rarely.
// read in a series of reads or a random read, and we don't have access to the block index. // Currently happens on open of a hfile reader where we read the trailer blocks for
// This is costly and should happen very rarely. // indices. Otherwise, we are reading block sizes out of the hfile index. To check,
// enable TRACE in this file and you'll get an exception in a LOG every time we seek.
// See HBASE-17072 for more detail.
if (headerBuf == null) { if (headerBuf == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Extra see to get block size!", new RuntimeException());
}
headerBuf = ByteBuffer.allocate(hdrSize); headerBuf = ByteBuffer.allocate(hdrSize);
readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false,
offset, pread); offset, pread);
Expand All @@ -1711,9 +1733,13 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
int preReadHeaderSize = headerBuf == null? 0 : hdrSize; int preReadHeaderSize = headerBuf == null? 0 : hdrSize;
// Allocate enough space to fit the next block's header too; saves a seek next time through. // Allocate enough space to fit the next block's header too; saves a seek next time through.
// onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
// onDiskSizeWithHeader is header, body, and any checksums if present. // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
// says where to start reading. If we have the header cached, then we don't need to read
// it again and we can likely read from last place we left off w/o need to backup and reread
// the header we read last time through here. TODO: Review this overread of the header. Is it necessary
// when we get the block size from the hfile index? See note on PrefetchedHeader class above.
// TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap). // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap).
byte[] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize,
onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
if (headerBuf != null) { if (headerBuf != null) {
Expand Down
Expand Up @@ -514,7 +514,7 @@ public String toString() {
* blocks at all other levels will be cached in the LRU cache in practice, * blocks at all other levels will be cached in the LRU cache in practice,
* although this API does not enforce that. * although this API does not enforce that.
* *
* All non-root (leaf and intermediate) index blocks contain what we call a * <p>All non-root (leaf and intermediate) index blocks contain what we call a
* "secondary index": an array of offsets to the entries within the block. * "secondary index": an array of offsets to the entries within the block.
* This allows us to do binary search for the entry corresponding to the * This allows us to do binary search for the entry corresponding to the
* given key without having to deserialize the block. * given key without having to deserialize the block.
Expand Down Expand Up @@ -583,17 +583,12 @@ public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boole
} }


/** /**
* Return the BlockWithScanInfo which contains the DataBlock with other scan * Return the BlockWithScanInfo, a data structure which contains the Data HFileBlock with
* info such as nextIndexedKey. This function will only be called when the * other scan info such as the key that starts the next HFileBlock. This function will only
* HFile version is larger than 1. * be called when the HFile version is larger than 1.
* *
* @param key * @param key the key we are looking for
* the key we are looking for * @param currentBlock the current block, to avoid re-reading the same block
* @param currentBlock
* the current block, to avoid re-reading the same block
* @param cacheBlocks
* @param pread
* @param isCompaction
* @param expectedDataBlockEncoding the data block encoding the caller is * @param expectedDataBlockEncoding the data block encoding the caller is
* expecting the data block to be in, or null to not perform this * expecting the data block to be in, or null to not perform this
* check and return the block irrespective of the encoding. * check and return the block irrespective of the encoding.
Expand Down

0 comments on commit 3d5e686

Please sign in to comment.