Skip to content

Commit

Permalink
HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxu14 committed Aug 26, 2019
1 parent 009851d commit 43f3930
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.List;

Expand Down Expand Up @@ -78,6 +79,23 @@ public boolean release() {
return refCnt.release();
}

public RefCnt getRefCnt() {
return this.refCnt;
}

/**
* BucketEntry use this to share refCnt with ByteBuff, so make the method public here,
* the upstream should not use this public method in other place, or the previous recycler
* will be lost.
*/
public void shareRefCnt(RefCnt refCnt, boolean replace) {
if (replace) {
this.refCnt = refCnt;
} else {
this.refCnt = new CompositeRefCnt(getRefCnt(), refCnt);
}
}

/******************************* Methods for ByteBuff **************************************/

/**
Expand Down Expand Up @@ -450,10 +468,37 @@ public byte[] toBytes() {
*/
public abstract int read(ReadableByteChannel channel) throws IOException;

/**
* Reads bytes from FileChannel into this ByteBuff
*/
public abstract int read(FileChannel channel, long offset) throws IOException;

/**
* Write this ByteBuff's data into target file
*/
public abstract int write(FileChannel channel, long offset) throws IOException;

/**
* function interface for Channel read
*/
@FunctionalInterface
interface ChannelReader {
int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException;
}

static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> {
return channel.read(buf);
};

static final ChannelReader FILE_READER = (channel, buf, offset) -> {
return ((FileChannel)channel).read(buf, offset);
};

// static helper methods
public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset,
ChannelReader reader) throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
return channel.read(buf);
return reader.read(channel, buf, offset);
}
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
Expand All @@ -463,7 +508,8 @@ public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throw
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = channel.read(buf);
offset += ret;
ret = reader.read(channel, buf, offset);
if (ret < ioSize) {
break;
}
Expand Down Expand Up @@ -540,15 +586,7 @@ public String toString() {
}

/********************************* ByteBuff wrapper methods ***********************************/

/**
* In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so
* please don't use this public method in other place. Make the method public here because the
* BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from
* IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public
* way here.
*/
public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) {
private static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) {
if (buffers == null || buffers.length == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.nio;

import java.util.Optional;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;

/**
* The CompositeRefCnt is mainly used by exclusive memory HFileBlock, it has a innerRefCnt
* to share with BucketEntry, in order to summarize the number of RPC requests. So when
* BucketCache#freeEntireBuckets is called, will not violate the LRU policy.
* <p>
* And it has its own refCnt & Recycler, Once the cells shipped to client, then both the
* Cacheable#refCnt & BucketEntry#refCnt will be decreased. when Cacheable's refCnt decrease
* to 0, it's ByteBuff will be reclaimed. and when BucketEntry#refCnt decrease to 0, the
* Bucket can be evicted.
*/
@InterfaceAudience.Private
public class CompositeRefCnt extends RefCnt {

private Optional<RefCnt> innerRefCnt;

public CompositeRefCnt(RefCnt orignal, RefCnt inner) {
super(orignal.getRecycler());
this.innerRefCnt = Optional.ofNullable(inner);
}

@VisibleForTesting
public Optional<RefCnt> getInnerRefCnt() {
return this.innerRefCnt;
}

@Override
public boolean release() {
return super.release() && innerRefCnt.map(refCnt -> refCnt.release()).orElse(true);
}

@Override
public ReferenceCounted retain() {
return innerRefCnt.map(refCnt -> refCnt.retain()).orElseGet(() -> super.retain());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
Expand Down Expand Up @@ -53,6 +56,23 @@ public class MultiByteBuff extends ByteBuff {
private int markedItemIndex = -1;
private final int[] itemBeginPos;

private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
@Override
public boolean hasNext() {
return curItemIndex < limitedItemIndex ||
(curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
}

@Override
public ByteBuffer next() {
if (curItemIndex >= items.length) {
throw new NoSuchElementException("items overflow");
}
curItem = items[curItemIndex++];
return curItem;
}
};

public MultiByteBuff(ByteBuffer... items) {
this(NONE, items);
}
Expand Down Expand Up @@ -1064,23 +1084,44 @@ public byte[] toBytes(int offset, int length) {
return output;
}

private int internalRead(ReadableByteChannel channel, long offset,
ChannelReader reader) throws IOException {
checkRefCount();
int total = 0;
while (buffsIterator.hasNext()) {
ByteBuffer buffer = buffsIterator.next();
int len = read(channel, buffer, offset, reader);
if (len > 0) {
total += len;
offset += len;
}
if (buffer.hasRemaining()) {
break;
}
}
return total;
}

@Override
public int read(ReadableByteChannel channel) throws IOException {
return internalRead(channel, 0, CHANNEL_READER);
}

@Override
public int read(FileChannel channel, long offset) throws IOException {
return internalRead(channel, offset, FILE_READER);
}

@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while (true) {
// Read max possible into the current BB
int len = channelRead(channel, this.curItem);
if (len > 0)
while (buffsIterator.hasNext()) {
ByteBuffer buffer = buffsIterator.next();
while (buffer.hasRemaining()) {
int len = channel.write(curItem, offset);
total += len;
if (this.curItem.hasRemaining()) {
// We were not able to read enough to fill the current BB itself. Means there is no point in
// doing more reads from Channel. Only this much there for now.
break;
} else {
if (this.curItemIndex >= this.limitedItemIndex) break;
this.curItemIndex++;
this.curItem = this.items[this.curItemIndex];
offset += len;
}
}
return total;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;

Expand Down Expand Up @@ -51,6 +50,10 @@ public RefCnt(Recycler recycler) {
this.recycler = recycler;
}

public Recycler getRecycler() {
return this.recycler;
}

@Override
protected final void deallocate() {
this.recycler.free();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;

import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
Expand Down Expand Up @@ -371,7 +372,25 @@ public void get(ByteBuffer out, int sourceOffset, int length) {
@Override
public int read(ReadableByteChannel channel) throws IOException {
checkRefCount();
return channelRead(channel, buf);
return read(channel, buf, 0, CHANNEL_READER);
}

@Override
public int read(FileChannel channel, long offset) throws IOException {
checkRefCount();
return read(channel, buf, offset, FILE_READER);
}

@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while(buf.hasRemaining()) {
int len = channel.write(buf, offset);
total += len;
offset += len;
}
return total;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
Expand Down Expand Up @@ -418,6 +419,11 @@ public BlockType getBlockType() {
return blockType;
}

@VisibleForTesting
public RefCnt getRefCnt() {
return buf.getRefCnt();
}

@Override
public int refCnt() {
return buf.refCnt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted {
*/
private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted;
private final ByteBuffAllocator allocator;
final ByteBuffAllocator allocator;

/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
Expand Down Expand Up @@ -194,7 +194,15 @@ boolean isRpcRef() {
}

Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
ByteBuff buf = ByteBuff.wrap(buffers);
buf.shareRefCnt(this.refCnt, true);
return wrapAsCacheable(buf);
}

Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
if (buf.getRefCnt() != this.refCnt) {
buf.shareRefCnt(this.refCnt, false);
}
return this.deserializerReference().deserialize(buf, allocator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.nio.ByteBuff;
Expand All @@ -35,9 +34,9 @@ public ExclusiveMemoryMmapIOEngine(String filePath, long capacity) throws IOExce

@Override
public Cacheable read(BucketEntry be) throws IOException {
ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
ByteBuff dst = be.allocator.allocate(be.getLength());
bufferArray.read(be.offset(), dst);
dst.position(0).limit(be.getLength());
return be.wrapAsCacheable(dst.nioByteBuffers());
return be.wrapAsCacheable(dst);
}
}
Loading

0 comments on commit 43f3930

Please sign in to comment.