Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read #479

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.List;

Expand Down Expand Up @@ -78,6 +79,23 @@ public boolean release() {
return refCnt.release();
}

public RefCnt getRefCnt() {
return this.refCnt;
}

/**
* BucketEntry use this to share refCnt with ByteBuff, so make the method public here,
* the upstream should not use this public method in other place, or the previous recycler
* will be lost.
*/
public void shareRefCnt(RefCnt refCnt, boolean replace) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, you have a clear comment here. seems OK now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of File based BC, now we make it to read the cached data into the pooled BBs. Every read RPC will acquire own BBs and read into. There is ideally no sharing of BBs across the readers happen at all.. But seems here we try share the ref count of the BC (File based here) with the RPCs. Little strange to digest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there missing some conversations with @openinx
We do this in order not to violate the LRU, when BucketCache#freeEntireBuckets is executed

if (replace) {
this.refCnt = refCnt;
} else {
this.refCnt = new CompositeRefCnt(getRefCnt(), refCnt);
}
}

/******************************* Methods for ByteBuff **************************************/

/**
Expand Down Expand Up @@ -450,10 +468,37 @@ public byte[] toBytes() {
*/
public abstract int read(ReadableByteChannel channel) throws IOException;

/**
* Reads bytes from FileChannel into this ByteBuff
*/
public abstract int read(FileChannel channel, long offset) throws IOException;

/**
* Write this ByteBuff's data into target file
*/
public abstract int write(FileChannel channel, long offset) throws IOException;

/**
* function interface for Channel read
*/
@FunctionalInterface
interface ChannelReader {
int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ReadableByteChannel channel cannot accept the argument with offset position ? Looks strange here we provide a offset argument...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any good idea to abstract this?because FileChannel need this, so provide it as an arg

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have no better idea now, maybe we can keep the current way :-)

}

static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> {
return channel.read(buf);
};

static final ChannelReader FILE_READER = (channel, buf, offset) -> {
return ((FileChannel)channel).read(buf, offset);
};

// static helper methods
public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset,
ChannelReader reader) throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
return channel.read(buf);
return reader.read(channel, buf, offset);
}
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
Expand All @@ -463,7 +508,8 @@ public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throw
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = channel.read(buf);
offset += ret;
ret = reader.read(channel, buf, offset);
if (ret < ioSize) {
break;
}
Expand Down Expand Up @@ -540,15 +586,7 @@ public String toString() {
}

/********************************* ByteBuff wrapper methods ***********************************/

/**
* In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so
* please don't use this public method in other place. Make the method public here because the
* BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from
* IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public
* way here.
*/
public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) {
private static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) {
if (buffers == null || buffers.length == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.nio;

import java.util.Optional;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;

/**
* The CompositeRefCnt is mainly used by exclusive memory HFileBlock, it has a innerRefCnt
* to share with BucketEntry, in order to summarize the number of RPC requests. So when
* BucketCache#freeEntireBuckets is called, will not violate the LRU policy.
* <p>
* And it has its own refCnt & Recycler, Once the cells shipped to client, then both the
* Cacheable#refCnt & BucketEntry#refCnt will be decreased. when Cacheable's refCnt decrease
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means when say 2 read RPCs access a block from File based BC, that entry can NOT get evicted unless both these RPCs are over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, as mentioned above

* to 0, it's ByteBuff will be reclaimed. and when BucketEntry#refCnt decrease to 0, the
* Bucket can be evicted.
*/
openinx marked this conversation as resolved.
Show resolved Hide resolved
@InterfaceAudience.Private
public class CompositeRefCnt extends RefCnt {

private Optional<RefCnt> innerRefCnt;

public CompositeRefCnt(RefCnt orignal, RefCnt inner) {
super(orignal.getRecycler());
this.innerRefCnt = Optional.ofNullable(inner);
}

@VisibleForTesting
public Optional<RefCnt> getInnerRefCnt() {
return this.innerRefCnt;
}

@Override
public boolean release() {
return super.release() && innerRefCnt.map(refCnt -> refCnt.release()).orElse(true);
}

@Override
public ReferenceCounted retain() {
return innerRefCnt.map(refCnt -> refCnt.retain()).orElseGet(() -> super.retain());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
Expand Down Expand Up @@ -53,6 +56,23 @@ public class MultiByteBuff extends ByteBuff {
private int markedItemIndex = -1;
private final int[] itemBeginPos;

private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
openinx marked this conversation as resolved.
Show resolved Hide resolved
@Override
public boolean hasNext() {
return curItemIndex < limitedItemIndex ||
(curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
openinx marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ByteBuffer next() {
if (curItemIndex >= items.length) {
throw new NoSuchElementException("items overflow");
}
curItem = items[curItemIndex++];
return curItem;
}
};

public MultiByteBuff(ByteBuffer... items) {
this(NONE, items);
}
Expand Down Expand Up @@ -1064,23 +1084,44 @@ public byte[] toBytes(int offset, int length) {
return output;
}

private int internalRead(ReadableByteChannel channel, long offset,
ChannelReader reader) throws IOException {
checkRefCount();
int total = 0;
while (buffsIterator.hasNext()) {
ByteBuffer buffer = buffsIterator.next();
int len = read(channel, buffer, offset, reader);
if (len > 0) {
total += len;
offset += len;
}
if (buffer.hasRemaining()) {
break;
}
}
return total;
}

@Override
public int read(ReadableByteChannel channel) throws IOException {
return internalRead(channel, 0, CHANNEL_READER);
}

@Override
public int read(FileChannel channel, long offset) throws IOException {
return internalRead(channel, offset, FILE_READER);
}

@Override
public int write(FileChannel channel, long offset) throws IOException {
openinx marked this conversation as resolved.
Show resolved Hide resolved
checkRefCount();
int total = 0;
while (true) {
// Read max possible into the current BB
int len = channelRead(channel, this.curItem);
if (len > 0)
while (buffsIterator.hasNext()) {
ByteBuffer buffer = buffsIterator.next();
while (buffer.hasRemaining()) {
int len = channel.write(curItem, offset);
total += len;
if (this.curItem.hasRemaining()) {
// We were not able to read enough to fill the current BB itself. Means there is no point in
// doing more reads from Channel. Only this much there for now.
break;
} else {
if (this.curItemIndex >= this.limitedItemIndex) break;
this.curItemIndex++;
this.curItem = this.items[this.curItemIndex];
offset += len;
}
}
return total;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;

Expand Down Expand Up @@ -51,6 +50,10 @@ public RefCnt(Recycler recycler) {
this.recycler = recycler;
}

public Recycler getRecycler() {
return this.recycler;
}

@Override
protected final void deallocate() {
this.recycler.free();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;

import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
Expand Down Expand Up @@ -371,7 +372,25 @@ public void get(ByteBuffer out, int sourceOffset, int length) {
@Override
public int read(ReadableByteChannel channel) throws IOException {
checkRefCount();
return channelRead(channel, buf);
return read(channel, buf, 0, CHANNEL_READER);
}

@Override
public int read(FileChannel channel, long offset) throws IOException {
checkRefCount();
return read(channel, buf, offset, FILE_READER);
}

@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while(buf.hasRemaining()) {
int len = channel.write(buf, offset);
total += len;
offset += len;
}
return total;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
Expand Down Expand Up @@ -418,6 +419,11 @@ public BlockType getBlockType() {
return blockType;
}

@VisibleForTesting
public RefCnt getRefCnt() {
return buf.getRefCnt();
}

@Override
public int refCnt() {
return buf.refCnt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted {
*/
private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted;
private final ByteBuffAllocator allocator;
final ByteBuffAllocator allocator;

/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
Expand Down Expand Up @@ -194,7 +194,15 @@ boolean isRpcRef() {
}

Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
ByteBuff buf = ByteBuff.wrap(buffers);
buf.shareRefCnt(this.refCnt, true);
return wrapAsCacheable(buf);
}

Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
if (buf.getRefCnt() != this.refCnt) {
buf.shareRefCnt(this.refCnt, false);
}
return this.deserializerReference().deserialize(buf, allocator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.nio.ByteBuff;
Expand All @@ -35,9 +34,9 @@ public ExclusiveMemoryMmapIOEngine(String filePath, long capacity) throws IOExce

@Override
public Cacheable read(BucketEntry be) throws IOException {
ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
ByteBuff dst = be.allocator.allocate(be.getLength());
bufferArray.read(be.offset(), dst);
dst.position(0).limit(be.getLength());
return be.wrapAsCacheable(dst.nioByteBuffers());
return be.wrapAsCacheable(dst);
}
}
Loading