Skip to content

Commit

Permalink
HBASE-22463 Some paths in HFileScannerImpl did not consider block#rel…
Browse files Browse the repository at this point in the history
…ease which will exhaust the ByteBuffAllocator (#257)
  • Loading branch information
openinx committed May 31, 2019
1 parent 8bfe0bb commit 810d287
Show file tree
Hide file tree
Showing 21 changed files with 403 additions and 192 deletions.
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.trace.TraceUtil;
Expand Down Expand Up @@ -272,8 +271,7 @@ public CachedData encode(HFileBlock block) {
public HFileBlock decode(CachedData d) {
try {
ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP,
MemoryType.EXCLUSIVE);
return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP);
} catch (IOException e) {
LOG.warn("Failed to deserialize data from memcached", e);
}
Expand Down
Expand Up @@ -63,20 +63,6 @@ public interface Cacheable extends HeapSize, HBaseReferenceCounted {
*/
BlockType getBlockType();

/**
* @return the {@code MemoryType} of this Cacheable
*/
MemoryType getMemoryType();

/**
* SHARED means when this Cacheable is read back from cache it refers to the same memory area as
* used by the cache for caching it. EXCLUSIVE means when this Cacheable is read back from cache,
* the data was copied to an exclusive memory area of this Cacheable.
*/
enum MemoryType {
SHARED, EXCLUSIVE
}

/******************************* ReferenceCounted Interfaces ***********************************/

/**
Expand Down
Expand Up @@ -20,9 +20,8 @@
import java.io.IOException;

import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Interface for a deserializer. Throws an IOException if the serialized data is incomplete or
Expand All @@ -33,11 +32,10 @@ public interface CacheableDeserializer<T extends Cacheable> {
/**
* @param b ByteBuff to deserialize the Cacheable.
* @param allocator to manage NIO ByteBuffers for future allocation or de-allocation.
* @param memType the {@link MemoryType} of the buffer
* @return T the deserialized object.
* @throws IOException
*/
T deserialize(ByteBuff b, ByteBuffAllocator allocator, MemoryType memType) throws IOException;
T deserialize(ByteBuff b, ByteBuffAllocator allocator) throws IOException;

/**
* Get the identifier of this deserializer. Identifier is unique for each deserializer and
Expand Down
Expand Up @@ -201,8 +201,6 @@ static class Header {
*/
private long offset = UNSET;

private MemoryType memType = MemoryType.EXCLUSIVE;

/**
* The on-disk size of the next block, including the header and checksums if present.
* UNSET if unknown.
Expand Down Expand Up @@ -274,7 +272,7 @@ private BlockDeserializer() {
}

@Override
public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc, MemoryType memType)
public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc)
throws IOException {
// The buf has the file block followed by block metadata.
// Set limit to just before the BLOCK_METADATA_SPACE then rewind.
Expand All @@ -287,8 +285,7 @@ public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc, MemoryType
boolean usesChecksum = buf.get() == (byte) 1;
long offset = buf.getLong();
int nextBlockOnDiskSize = buf.getInt();
return new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null,
alloc);
return new HFileBlock(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
}

@Override
Expand Down Expand Up @@ -366,7 +363,7 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
* to that point.
* @param buf Has header, content, and trailing checksums if present.
*/
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset,
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
throws IOException {
buf.rewind();
Expand Down Expand Up @@ -398,7 +395,6 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
this.memType = memType;
this.offset = offset;
this.buf = buf;
this.buf.rewind();
Expand Down Expand Up @@ -1785,8 +1781,8 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
// The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it.
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE,
offset, nextBlockOnDiskSize, fileContext, intoHeap ? HEAP: allocator);
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, offset,
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
// Run check on uncompressed sizings.
if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed();
Expand Down Expand Up @@ -2060,18 +2056,6 @@ HFileContext getHFileContext() {
return this.fileContext;
}

@Override
public MemoryType getMemoryType() {
return this.memType;
}

/**
* @return true if this block is backed by a shared memory area(such as that of a BucketCache).
*/
boolean usesSharedMemory() {
return this.memType == MemoryType.SHARED;
}

/**
* Convert the contents of the block header into a human readable string.
* This is mostly helpful for debugging. This assumes that the block
Expand Down

0 comments on commit 810d287

Please sign in to comment.