Skip to content

Commit

Permalink
HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read (#583)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxu14 authored and openinx committed Sep 9, 2019
1 parent 7648855 commit fb7230c
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 77 deletions.
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.List;

Expand Down Expand Up @@ -450,10 +451,37 @@ public byte[] toBytes() {
*/
public abstract int read(ReadableByteChannel channel) throws IOException;

/**
* Reads bytes from FileChannel into this ByteBuff
*/
public abstract int read(FileChannel channel, long offset) throws IOException;

/**
* Write this ByteBuff's data into target file
*/
public abstract int write(FileChannel channel, long offset) throws IOException;

/**
* function interface for Channel read
*/
@FunctionalInterface
interface ChannelReader {
int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException;
}

static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> {
return channel.read(buf);
};

static final ChannelReader FILE_READER = (channel, buf, offset) -> {
return ((FileChannel)channel).read(buf, offset);
};

// static helper methods
public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset,
ChannelReader reader) throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
return channel.read(buf);
return reader.read(channel, buf, offset);
}
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
Expand All @@ -463,7 +491,8 @@ public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throw
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = channel.read(buf);
offset += ret;
ret = reader.read(channel, buf, offset);
if (ret < ioSize) {
break;
}
Expand Down
Expand Up @@ -24,7 +24,10 @@
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
Expand Down Expand Up @@ -53,6 +56,23 @@ public class MultiByteBuff extends ByteBuff {
private int markedItemIndex = -1;
private final int[] itemBeginPos;

private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
@Override
public boolean hasNext() {
return curItemIndex < limitedItemIndex ||
(curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
}

@Override
public ByteBuffer next() {
if (curItemIndex >= items.length) {
throw new NoSuchElementException("items overflow");
}
curItem = items[curItemIndex++];
return curItem;
}
};

public MultiByteBuff(ByteBuffer... items) {
this(NONE, items);
}
Expand Down Expand Up @@ -1064,23 +1084,44 @@ public byte[] toBytes(int offset, int length) {
return output;
}

private int internalRead(ReadableByteChannel channel, long offset,
ChannelReader reader) throws IOException {
checkRefCount();
int total = 0;
while (buffsIterator.hasNext()) {
ByteBuffer buffer = buffsIterator.next();
int len = read(channel, buffer, offset, reader);
if (len > 0) {
total += len;
offset += len;
}
if (buffer.hasRemaining()) {
break;
}
}
return total;
}

@Override
public int read(ReadableByteChannel channel) throws IOException {
return internalRead(channel, 0, CHANNEL_READER);
}

@Override
public int read(FileChannel channel, long offset) throws IOException {
return internalRead(channel, offset, FILE_READER);
}

@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while (true) {
// Read max possible into the current BB
int len = channelRead(channel, this.curItem);
if (len > 0)
while (buffsIterator.hasNext()) {
ByteBuffer buffer = buffsIterator.next();
while (buffer.hasRemaining()) {
int len = channel.write(buffer, offset);
total += len;
if (this.curItem.hasRemaining()) {
// We were not able to read enough to fill the current BB itself. Means there is no point in
// doing more reads from Channel. Only this much there for now.
break;
} else {
if (this.curItemIndex >= this.limitedItemIndex) break;
this.curItemIndex++;
this.curItem = this.items[this.curItemIndex];
offset += len;
}
}
return total;
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;

import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
Expand Down Expand Up @@ -371,7 +372,25 @@ public void get(ByteBuffer out, int sourceOffset, int length) {
@Override
public int read(ReadableByteChannel channel) throws IOException {
checkRefCount();
return channelRead(channel, buf);
return read(channel, buf, 0, CHANNEL_READER);
}

@Override
public int read(FileChannel channel, long offset) throws IOException {
checkRefCount();
return read(channel, buf, offset, FILE_READER);
}

@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while(buf.hasRemaining()) {
int len = channel.write(buf, offset);
total += len;
offset += len;
}
return total;
}

@Override
Expand Down
Expand Up @@ -502,8 +502,11 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
// block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
// the same BucketEntry, then all of the three will share the same refCnt.
Cacheable cachedBlock = ioEngine.read(bucketEntry);
// RPC start to reference, so retain here.
cachedBlock.retain();
if (ioEngine.usesSharedMemory()) {
// If IOEngine use shared memory, cachedBlock and BucketEntry will share the
// same RefCnt, do retain here, in order to count the number of RPC references
cachedBlock.retain();
}
// Update the cache statistics.
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
Expand Down
Expand Up @@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted {
*/
private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted;
private final ByteBuffAllocator allocator;
final ByteBuffAllocator allocator;

/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
Expand Down Expand Up @@ -194,7 +194,10 @@ boolean isRpcRef() {
}

Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt));
}

Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
return this.deserializerReference().deserialize(buf, allocator);
}

Expand Down
Expand Up @@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.nio.ByteBuff;
Expand All @@ -35,9 +34,9 @@ public ExclusiveMemoryMmapIOEngine(String filePath, long capacity) throws IOExce

@Override
public Cacheable read(BucketEntry be) throws IOException {
ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
ByteBuff dst = be.allocator.allocate(be.getLength());
bufferArray.read(be.offset(), dst);
dst.position(0).limit(be.getLength());
return be.wrapAsCacheable(dst.nioByteBuffers());
return be.wrapAsCacheable(dst);
}
}
Expand Up @@ -129,20 +129,25 @@ public Cacheable read(BucketEntry be) throws IOException {
long offset = be.offset();
int length = be.getLength();
Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
ByteBuff dstBuff = be.allocator.allocate(length);
if (length != 0) {
accessFile(readAccessor, dstBuffer, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
// Hence in this case there is no shared memory that we point to. Even if the BucketCache
// evicts this buffer from the file the data is already copied and there is no need to
// ensure that the results are not corrupted before consuming them.
if (dstBuffer.limit() != length) {
throw new IllegalArgumentIOException(
"Only " + dstBuffer.limit() + " bytes read, " + length + " expected");
try {
accessFile(readAccessor, dstBuff, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
// Hence in this case there is no shared memory that we point to. Even if the BucketCache
// evicts this buffer from the file the data is already copied and there is no need to
// ensure that the results are not corrupted before consuming them.
if (dstBuff.limit() != length) {
throw new IllegalArgumentIOException(
"Only " + dstBuff.limit() + " bytes read, " + length + " expected");
}
} catch (IOException ioe) {
dstBuff.release();
throw ioe;
}
}
dstBuffer.rewind();
return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer });
dstBuff.rewind();
return be.wrapAsCacheable(dstBuff);
}

@VisibleForTesting
Expand All @@ -164,10 +169,7 @@ void closeFileChannels() {
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
if (!srcBuffer.hasRemaining()) {
return;
}
accessFile(writeAccessor, srcBuffer, offset);
write(ByteBuff.wrap(srcBuffer), offset);
}

/**
Expand Down Expand Up @@ -208,36 +210,38 @@ public void shutdown() {
}

@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate();
write(dup, offset);
public void write(ByteBuff srcBuff, long offset) throws IOException {
if (!srcBuff.hasRemaining()) {
return;
}
accessFile(writeAccessor, srcBuff, offset);
}

private void accessFile(FileAccessor accessor, ByteBuffer buffer,
private void accessFile(FileAccessor accessor, ByteBuff buff,
long globalOffset) throws IOException {
int startFileNum = getFileNum(globalOffset);
int remainingAccessDataLen = buffer.remaining();
int remainingAccessDataLen = buff.remaining();
int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
int accessFileNum = startFileNum;
long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
int bufLimit = buffer.limit();
int bufLimit = buff.limit();
while (true) {
FileChannel fileChannel = fileChannels[accessFileNum];
int accessLen = 0;
if (endFileNum > accessFileNum) {
// short the limit;
buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
}
try {
accessLen = accessor.access(fileChannel, buffer, accessOffset);
accessLen = accessor.access(fileChannel, buff, accessOffset);
} catch (ClosedByInterruptException e) {
throw e;
} catch (ClosedChannelException e) {
refreshFileConnection(accessFileNum, e);
continue;
}
// recover the limit
buffer.limit(bufLimit);
buff.limit(bufLimit);
if (accessLen < remainingAccessDataLen) {
remainingAccessDataLen -= accessLen;
accessFileNum++;
Expand All @@ -246,7 +250,7 @@ private void accessFile(FileAccessor accessor, ByteBuffer buffer,
break;
}
if (accessFileNum >= fileChannels.length) {
throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining())
+ " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
+ globalOffset);
}
Expand Down Expand Up @@ -304,23 +308,23 @@ void refreshFileConnection(int accessFileNum, IOException ioe) throws IOExceptio
}

private interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
throws IOException;
}

private static class FileReadAccessor implements FileAccessor {
@Override
public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
public int access(FileChannel fileChannel, ByteBuff buff,
long accessOffset) throws IOException {
return fileChannel.read(byteBuffer, accessOffset);
return buff.read(fileChannel, accessOffset);
}
}

private static class FileWriteAccessor implements FileAccessor {
@Override
public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
public int access(FileChannel fileChannel, ByteBuff buff,
long accessOffset) throws IOException {
return fileChannel.write(byteBuffer, accessOffset);
return buff.write(fileChannel, accessOffset);
}
}
}

0 comments on commit fb7230c

Please sign in to comment.