diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index c04c3f55aaef..51698b678f78 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; import java.util.List; @@ -78,6 +79,23 @@ public boolean release() { return refCnt.release(); } + public RefCnt getRefCnt() { + return this.refCnt; + } + + /** + * BucketEntry use this to share refCnt with ByteBuff, so make the method public here, + * the upstream should not use this public method in other place, or the previous recycler + * will be lost. + */ + public void shareRefCnt(RefCnt refCnt, boolean replace) { + if (replace) { + this.refCnt = refCnt; + } else { + this.refCnt = new CompositeRefCnt(getRefCnt(), refCnt); + } + } + /******************************* Methods for ByteBuff **************************************/ /** @@ -450,10 +468,37 @@ public byte[] toBytes() { */ public abstract int read(ReadableByteChannel channel) throws IOException; + /** + * Reads bytes from FileChannel into this ByteBuff + */ + public abstract int read(FileChannel channel, long offset) throws IOException; + + /** + * Write this ByteBuff's data into target file + */ + public abstract int write(FileChannel channel, long offset) throws IOException; + + /** + * function interface for Channel read + */ + @FunctionalInterface + interface ChannelReader { + int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException; + } + + static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> { + return channel.read(buf); + }; + + static final ChannelReader FILE_READER = (channel, buf, offset) -> { + return ((FileChannel)channel).read(buf, offset); + }; + // static helper methods - public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException { + public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset, + ChannelReader reader) throws IOException { if (buf.remaining() <= NIO_BUFFER_LIMIT) { - return channel.read(buf); + return reader.read(channel, buf, offset); } int originalLimit = buf.limit(); int initialRemaining = buf.remaining(); @@ -463,7 +508,8 @@ public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throw try { int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); buf.limit(buf.position() + ioSize); - ret = channel.read(buf); + offset += ret; + ret = reader.read(channel, buf, offset); if (ret < ioSize) { break; } @@ -540,15 +586,7 @@ public String toString() { } /********************************* ByteBuff wrapper methods ***********************************/ - - /** - * In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so - * please don't use this public method in other place. Make the method public here because the - * BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from - * IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public - * way here. - */ - public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) { + private static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) { if (buffers == null || buffers.length == 0) { throw new IllegalArgumentException("buffers shouldn't be null or empty"); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/CompositeRefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/CompositeRefCnt.java new file mode 100644 index 000000000000..c63b89b7f19c --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/CompositeRefCnt.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.nio; + +import java.util.Optional; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted; + +/** + * The CompositeRefCnt is mainly used by exclusive memory HFileBlock, it has a innerRefCnt + * to share with BucketEntry, in order to summarize the number of RPC requests. So when + * BucketCache#freeEntireBuckets is called, will not violate the LRU policy. + *

+ * And it has its own refCnt & Recycler, Once the cells shipped to client, then both the + * Cacheable#refCnt & BucketEntry#refCnt will be decreased. when Cacheable's refCnt decrease + * to 0, it's ByteBuff will be reclaimed. and when BucketEntry#refCnt decrease to 0, the + * Bucket can be evicted. + */ +@InterfaceAudience.Private +public class CompositeRefCnt extends RefCnt { + + private Optional innerRefCnt; + + public CompositeRefCnt(RefCnt orignal, RefCnt inner) { + super(orignal.getRecycler()); + this.innerRefCnt = Optional.ofNullable(inner); + } + + @VisibleForTesting + public Optional getInnerRefCnt() { + return this.innerRefCnt; + } + + @Override + public boolean release() { + return super.release() && innerRefCnt.map(refCnt -> refCnt.release()).orElse(true); + } + + @Override + public ReferenceCounted retain() { + return innerRefCnt.map(refCnt -> refCnt.retain()).orElseGet(() -> super.retain()); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java index 3ce170903974..264a0a2433bf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -24,7 +24,10 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.InvalidMarkException; +import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; +import java.util.Iterator; +import java.util.NoSuchElementException; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.util.ByteBufferUtils; @@ -53,6 +56,23 @@ public class MultiByteBuff extends ByteBuff { private int markedItemIndex = -1; private final int[] itemBeginPos; + private Iterator buffsIterator = new Iterator() { + @Override + public boolean hasNext() { + return curItemIndex < limitedItemIndex || + (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining()); + } + + @Override + public ByteBuffer next() { + if (curItemIndex >= items.length) { + throw new NoSuchElementException("items overflow"); + } + curItem = items[curItemIndex++]; + return curItem; + } + }; + public MultiByteBuff(ByteBuffer... items) { this(NONE, items); } @@ -1064,23 +1084,44 @@ public byte[] toBytes(int offset, int length) { return output; } + private int internalRead(ReadableByteChannel channel, long offset, + ChannelReader reader) throws IOException { + checkRefCount(); + int total = 0; + while (buffsIterator.hasNext()) { + ByteBuffer buffer = buffsIterator.next(); + int len = read(channel, buffer, offset, reader); + if (len > 0) { + total += len; + offset += len; + } + if (buffer.hasRemaining()) { + break; + } + } + return total; + } + @Override public int read(ReadableByteChannel channel) throws IOException { + return internalRead(channel, 0, CHANNEL_READER); + } + + @Override + public int read(FileChannel channel, long offset) throws IOException { + return internalRead(channel, offset, FILE_READER); + } + + @Override + public int write(FileChannel channel, long offset) throws IOException { checkRefCount(); int total = 0; - while (true) { - // Read max possible into the current BB - int len = channelRead(channel, this.curItem); - if (len > 0) + while (buffsIterator.hasNext()) { + ByteBuffer buffer = buffsIterator.next(); + while (buffer.hasRemaining()) { + int len = channel.write(curItem, offset); total += len; - if (this.curItem.hasRemaining()) { - // We were not able to read enough to fill the current BB itself. Means there is no point in - // doing more reads from Channel. Only this much there for now. - break; - } else { - if (this.curItemIndex >= this.limitedItemIndex) break; - this.curItemIndex++; - this.curItem = this.items[this.curItemIndex]; + offset += len; } } return total; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java index 018c8b44ef4f..7b5282dcb592 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java @@ -20,7 +20,6 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted; import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted; @@ -51,6 +50,10 @@ public RefCnt(Recycler recycler) { this.recycler = recycler; } + public Recycler getRecycler() { + return this.recycler; + } + @Override protected final void deallocate() { this.recycler.free(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java index 36a83a0ec212..797bfdc1fff5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; @@ -371,7 +372,25 @@ public void get(ByteBuffer out, int sourceOffset, int length) { @Override public int read(ReadableByteChannel channel) throws IOException { checkRefCount(); - return channelRead(channel, buf); + return read(channel, buf, 0, CHANNEL_READER); + } + + @Override + public int read(FileChannel channel, long offset) throws IOException { + checkRefCount(); + return read(channel, buf, offset, FILE_READER); + } + + @Override + public int write(FileChannel channel, long offset) throws IOException { + checkRefCount(); + int total = 0; + while(buf.hasRemaining()) { + int len = channel.write(buf, offset); + total += len; + offset += len; + } + return total; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 14ed275cc333..33564e7107cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; @@ -418,6 +419,11 @@ public BlockType getBlockType() { return blockType; } + @VisibleForTesting + public RefCnt getRefCnt() { + return buf.getRefCnt(); + } + @Override public int refCnt() { return buf.refCnt(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java index ca41ecafb9d1..ac2c6ff5b9b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java @@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted { */ private final RefCnt refCnt; final AtomicBoolean markedAsEvicted; - private final ByteBuffAllocator allocator; + final ByteBuffAllocator allocator; /** * Time this block was cached. Presumes we are created just before we are added to the cache. @@ -194,7 +194,15 @@ boolean isRpcRef() { } Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException { - ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt); + ByteBuff buf = ByteBuff.wrap(buffers); + buf.shareRefCnt(this.refCnt, true); + return wrapAsCacheable(buf); + } + + Cacheable wrapAsCacheable(ByteBuff buf) throws IOException { + if (buf.getRefCnt() != this.refCnt) { + buf.shareRefCnt(this.refCnt, false); + } return this.deserializerReference().deserialize(buf, allocator); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java index 3d7f2b1f3bdb..3169a66539aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java @@ -17,7 +17,6 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.nio.ByteBuff; @@ -35,9 +34,9 @@ public ExclusiveMemoryMmapIOEngine(String filePath, long capacity) throws IOExce @Override public Cacheable read(BucketEntry be) throws IOException { - ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength())); + ByteBuff dst = be.allocator.allocate(be.getLength()); bufferArray.read(be.offset(), dst); dst.position(0).limit(be.getLength()); - return be.wrapAsCacheable(dst.nioByteBuffers()); + return be.wrapAsCacheable(dst); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index b3afe482a02a..4ddc15f84af4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -129,20 +129,20 @@ public Cacheable read(BucketEntry be) throws IOException { long offset = be.offset(); int length = be.getLength(); Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); - ByteBuffer dstBuffer = ByteBuffer.allocate(length); + ByteBuff dstBuff = be.allocator.allocate(length); if (length != 0) { - accessFile(readAccessor, dstBuffer, offset); + accessFile(readAccessor, dstBuff, offset); // The buffer created out of the fileChannel is formed by copying the data from the file // Hence in this case there is no shared memory that we point to. Even if the BucketCache // evicts this buffer from the file the data is already copied and there is no need to // ensure that the results are not corrupted before consuming them. - if (dstBuffer.limit() != length) { + if (dstBuff.limit() != length) { throw new IllegalArgumentIOException( - "Only " + dstBuffer.limit() + " bytes read, " + length + " expected"); + "Only " + dstBuff.limit() + " bytes read, " + length + " expected"); } } - dstBuffer.rewind(); - return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer }); + dstBuff.rewind(); + return be.wrapAsCacheable(dstBuff); } @VisibleForTesting @@ -164,10 +164,7 @@ void closeFileChannels() { */ @Override public void write(ByteBuffer srcBuffer, long offset) throws IOException { - if (!srcBuffer.hasRemaining()) { - return; - } - accessFile(writeAccessor, srcBuffer, offset); + write(ByteBuff.wrap(srcBuffer), offset); } /** @@ -208,28 +205,30 @@ public void shutdown() { } @Override - public void write(ByteBuff srcBuffer, long offset) throws IOException { - ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate(); - write(dup, offset); + public void write(ByteBuff srcBuff, long offset) throws IOException { + if (!srcBuff.hasRemaining()) { + return; + } + accessFile(writeAccessor, srcBuff, offset); } - private void accessFile(FileAccessor accessor, ByteBuffer buffer, + private void accessFile(FileAccessor accessor, ByteBuff buff, long globalOffset) throws IOException { int startFileNum = getFileNum(globalOffset); - int remainingAccessDataLen = buffer.remaining(); + int remainingAccessDataLen = buff.remaining(); int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); int accessFileNum = startFileNum; long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); - int bufLimit = buffer.limit(); + int bufLimit = buff.limit(); while (true) { FileChannel fileChannel = fileChannels[accessFileNum]; int accessLen = 0; if (endFileNum > accessFileNum) { // short the limit; - buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); + buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); } try { - accessLen = accessor.access(fileChannel, buffer, accessOffset); + accessLen = accessor.access(fileChannel, buff, accessOffset); } catch (ClosedByInterruptException e) { throw e; } catch (ClosedChannelException e) { @@ -237,7 +236,7 @@ private void accessFile(FileAccessor accessor, ByteBuffer buffer, continue; } // recover the limit - buffer.limit(bufLimit); + buff.limit(bufLimit); if (accessLen < remainingAccessDataLen) { remainingAccessDataLen -= accessLen; accessFileNum++; @@ -246,7 +245,7 @@ private void accessFile(FileAccessor accessor, ByteBuffer buffer, break; } if (accessFileNum >= fileChannels.length) { - throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining()) + throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining()) + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" + globalOffset); } @@ -304,23 +303,23 @@ void refreshFileConnection(int accessFileNum, IOException ioe) throws IOExceptio } private interface FileAccessor { - int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) + int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException; } private static class FileReadAccessor implements FileAccessor { @Override - public int access(FileChannel fileChannel, ByteBuffer byteBuffer, + public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException { - return fileChannel.read(byteBuffer, accessOffset); + return buff.read(fileChannel, accessOffset); } } private static class FileWriteAccessor implements FileAccessor { @Override - public int access(FileChannel fileChannel, ByteBuffer byteBuffer, + public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException { - return fileChannel.write(byteBuffer, accessOffset); + return buff.write(fileChannel, accessOffset); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java index dd9a1c80bedb..8519a6894114 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java @@ -29,6 +29,8 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache; +import org.apache.hadoop.hbase.nio.CompositeRefCnt; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -58,9 +61,14 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) @Category({ IOTests.class, SmallTests.class }) public class TestHFileScannerImplReferenceCount { @@ -71,6 +79,15 @@ public class TestHFileScannerImplReferenceCount { @Rule public TestName CASE = new TestName(); + @Parameters(name = "{index}: ioengine={0}") + public static Collection data() { + return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" }, + new Object[] { "mmap" }, new Object[] { "pmem" }); + } + + @Parameter + public String ioengine; + private static final Logger LOG = LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); @@ -113,12 +130,16 @@ public static void setUpBeforeClass() { @Before public void setUp() throws IOException { + String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"); + this.workDir = UTIL.getDataTestDir(caseName); + if (!"offheap".equals(ioengine)) { + ioengine = ioengine + ":" + workDir.toString() + "/cachedata"; + } + UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine); this.firstCell = null; this.secondCell = null; this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true); this.conf = new Configuration(UTIL.getConfiguration()); - String caseName = CASE.getMethodName(); - this.workDir = UTIL.getDataTestDir(caseName); this.fs = this.workDir.getFileSystem(conf); this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis()); LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName); @@ -202,34 +223,34 @@ private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding) scanner.seekTo(firstCell); curBlock = scanner.curBlock; - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(curBlock, 2); // Seek to the block again, the curBlock won't change and won't read from BlockCache. so // refCnt should be unchanged. scanner.seekTo(firstCell); Assert.assertTrue(curBlock == scanner.curBlock); - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(curBlock, 2); prevBlock = curBlock; scanner.seekTo(secondCell); curBlock = scanner.curBlock; - Assert.assertEquals(prevBlock.refCnt(), 2); - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(prevBlock, 2); + this.assertRefCnt(curBlock, 2); // After shipped, the prevBlock will be release, but curBlock is still referenced by the // curBlock. scanner.shipped(); - Assert.assertEquals(prevBlock.refCnt(), 1); - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(prevBlock, 1); + this.assertRefCnt(curBlock, 2); // Try to ship again, though with nothing to client. scanner.shipped(); - Assert.assertEquals(prevBlock.refCnt(), 1); - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(prevBlock, 1); + this.assertRefCnt(curBlock, 2); // The curBlock will also be released. scanner.close(); - Assert.assertEquals(curBlock.refCnt(), 1); + this.assertRefCnt(curBlock, 1); // Finish the block & block2 RPC path Assert.assertTrue(block1.release()); @@ -287,7 +308,7 @@ public void testSeekBefore() throws Exception { curBlock = scanner.curBlock; Assert.assertFalse(curBlock == block2); Assert.assertEquals(1, block2.refCnt()); - Assert.assertEquals(2, curBlock.refCnt()); + this.assertRefCnt(curBlock, 2); prevBlock = scanner.curBlock; // Release the block1, no other reference. @@ -305,22 +326,22 @@ public void testSeekBefore() throws Exception { // the curBlock is read from IOEngine, so a different block. Assert.assertFalse(curBlock == block1); // Two reference for curBlock: 1. scanner; 2. blockCache. - Assert.assertEquals(2, curBlock.refCnt()); + this.assertRefCnt(curBlock, 2); // Reference count of prevBlock must be unchanged because we haven't shipped. - Assert.assertEquals(2, prevBlock.refCnt()); + this.assertRefCnt(prevBlock, 2); // Do the shipped scanner.shipped(); Assert.assertEquals(scanner.prevBlocks.size(), 0); Assert.assertNotNull(scanner.curBlock); - Assert.assertEquals(2, curBlock.refCnt()); - Assert.assertEquals(1, prevBlock.refCnt()); + this.assertRefCnt(curBlock, 2); + this.assertRefCnt(prevBlock, 1); // Do the close scanner.close(); Assert.assertNull(scanner.curBlock); - Assert.assertEquals(1, curBlock.refCnt()); - Assert.assertEquals(1, prevBlock.refCnt()); + this.assertRefCnt(curBlock, 1); + this.assertRefCnt(prevBlock, 1); Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2); Assert.assertEquals(0, curBlock.refCnt()); @@ -340,18 +361,29 @@ public void testSeekBefore() throws Exception { Assert.assertTrue(scanner.seekTo()); curBlock = scanner.curBlock; Assert.assertFalse(curBlock == block1); - Assert.assertEquals(2, curBlock.refCnt()); + this.assertRefCnt(curBlock, 2); // Return false because firstCell <= c[0] Assert.assertFalse(scanner.seekBefore(firstCell)); // The block1 shouldn't be released because we still don't do the shipped or close. - Assert.assertEquals(2, curBlock.refCnt()); + this.assertRefCnt(curBlock, 2); scanner.close(); - Assert.assertEquals(1, curBlock.refCnt()); + this.assertRefCnt(curBlock, 1); Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1); Assert.assertEquals(0, curBlock.refCnt()); } + private void assertRefCnt(HFileBlock block, int value) { + if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) { + Assert.assertEquals(value, block.refCnt()); + } else { + Assert.assertEquals(value - 1, block.refCnt()); + Assert.assertTrue(block.getRefCnt() instanceof CompositeRefCnt); + Assert.assertEquals(value, ((CompositeRefCnt)block.getRefCnt()) + .getInnerRefCnt().get().refCnt()); + } + } + @Test public void testDefault() throws Exception { testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);