Skip to content

Commit

Permalink
Create NettyDataBuffer, release ByteBuf after reading.
Browse files Browse the repository at this point in the history
  • Loading branch information
ooq committed Jul 15, 2015
2 parents 86e5778 + 098df3d commit b396bbd
Show file tree
Hide file tree
Showing 34 changed files with 871 additions and 510 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.hadoop.hdfs.DFSClient.BlockReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tachyon.Constants;
import tachyon.client.netty.NettyRemoteBlockReader;
import tachyon.conf.TachyonConf;
import tachyon.thrift.ClientBlockInfo;
import tachyon.thrift.NetAddress;
Expand Down Expand Up @@ -103,6 +105,9 @@ public class RemoteBlockInStream extends BlockInStream {
*/
private static final int MAX_REMOTE_READ_ATTEMPTS = 2;

/** A reference to the current reader so we can clear it after reading is finished. */
private static RemoteBlockReader mUnclearedReader = null;

/**
* @param file the file the block belongs to
* @param readType the InStream's read type
Expand Down Expand Up @@ -170,6 +175,7 @@ public void close() throws IOException {
if (mBytesReadRemote > 0) {
mTachyonFS.getClientMetrics().incBlocksReadRemote(1);
}
clearReader();
mClosed = true;
}

Expand Down Expand Up @@ -308,7 +314,11 @@ public static ByteBuffer readRemoteByteBuffer(TachyonFS tachyonFS, ClientBlockIn

private static ByteBuffer retrieveByteBufferFromRemoteMachine(InetSocketAddress address,
long blockId, long offset, long length, TachyonConf conf) throws IOException {
return RemoteBlockReader.Factory.createRemoteBlockReader(conf).readRemoteBlock(
RemoteBlockReader reader = RemoteBlockReader.Factory.createRemoteBlockReader(conf);
// always clear the previous reader before assigning it to a new one
clearReader();
mUnclearedReader = reader;
return reader.readRemoteBlock(
address.getHostName(), address.getPort(), blockId, offset, length);
}

Expand Down Expand Up @@ -412,4 +422,20 @@ private boolean updateCurrentBuffer() throws IOException {
}
return false;
}

/**
* Clear the previous reader, release the resource it references.
*
* @return true if reader is successfully cleared or no clearing is needed
*/
private static boolean clearReader() {
boolean res = true;
if (mUnclearedReader != null) {
if (mUnclearedReader instanceof NettyRemoteBlockReader) {
return ((NettyRemoteBlockReader) mUnclearedReader).clearReadResponse();
}
mUnclearedReader = null;
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;

import tachyon.Constants;
import tachyon.client.RemoteBlockReader;
import tachyon.network.protocol.RPCBlockRequest;
import tachyon.network.protocol.RPCBlockResponse;
import tachyon.network.protocol.RPCBlockReadRequest;
import tachyon.network.protocol.RPCBlockReadResponse;
import tachyon.network.protocol.RPCErrorResponse;
import tachyon.network.protocol.RPCMessage;
import tachyon.network.protocol.RPCResponse;
import tachyon.network.protocol.databuffer.DataByteBuffer;

/**
* Read data from remote data server using Netty.
Expand All @@ -43,6 +43,8 @@ public final class NettyRemoteBlockReader implements RemoteBlockReader {

private final Bootstrap mClientBootstrap;
private final ClientHandler mHandler;
/** A reference to read response so we can explicitly release the resource after reading.*/
private RPCBlockReadResponse mReadResponse = null;

// TODO: Creating a new remote block reader may be expensive, so consider a connection pool.
public NettyRemoteBlockReader() {
Expand All @@ -62,18 +64,21 @@ public ByteBuffer readRemoteBlock(String host, int port, long blockId, long offs
Channel channel = f.channel();
SingleResponseListener listener = new SingleResponseListener();
mHandler.addListener(listener);
channel.writeAndFlush(new RPCBlockRequest(blockId, offset, length));
channel.writeAndFlush(new RPCBlockReadRequest(blockId, offset, length));

RPCResponse response = listener.get(NettyClient.TIMEOUT_MS, TimeUnit.MILLISECONDS);
channel.close().sync();

switch (response.getType()) {
case RPC_BLOCK_RESPONSE:
RPCBlockResponse blockResponse = (RPCBlockResponse) response;
case RPC_BLOCK_READ_RESPONSE:
RPCBlockReadResponse blockResponse = (RPCBlockReadResponse) response;
LOG.info("Data " + blockId + " from remote machine " + address + " received");

RPCResponse.Status status = blockResponse.getStatus();
if (status == RPCResponse.Status.SUCCESS) {
// always clear the previous response before reading another one
clearReadResponse();
mReadResponse = blockResponse;
return blockResponse.getPayloadDataBuffer().getReadOnlyByteBuffer();
}
throw new IOException(status.getMessage() + " response: " + blockResponse);
Expand All @@ -82,10 +87,24 @@ public ByteBuffer readRemoteBlock(String host, int port, long blockId, long offs
throw new IOException(error.getStatus().getMessage());
default:
throw new IOException("Unexpected response message type: " + response.getType()
+ " (expected: " + RPCMessage.Type.RPC_BLOCK_RESPONSE + ")");
+ " (expected: " + RPCMessage.Type.RPC_BLOCK_READ_RESPONSE + ")");
}
} catch (Exception e) {
throw new IOException(e);
}
}

/**
* Clear the previous read response, release the resource the response references.
*
* @return true if the response is cleared, or there is nothing needs to be cleared.
*/
public boolean clearReadResponse() {
boolean res = true;
if (mReadResponse != null) {
res = mReadResponse.releaseBuffer();
mReadResponse = null;
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,32 @@
/**
* This represents an RPC request to read a block from a DataServer.
*/
public class RPCBlockRequest extends RPCRequest {
public class RPCBlockReadRequest extends RPCRequest {
private final long mBlockId;
private final long mOffset;
private final long mLength;

// TODO: rename this to RPCBlockReadRequest.
public RPCBlockRequest(long blockId, long offset, long length) {
public RPCBlockReadRequest(long blockId, long offset, long length) {
mBlockId = blockId;
mOffset = offset;
mLength = length;
}

public Type getType() {
return Type.RPC_BLOCK_REQUEST;
return Type.RPC_BLOCK_READ_REQUEST;
}

/**
* Decode the input {@link ByteBuf} into a {@link RPCBlockRequest} object and return it.
* Decode the input {@link ByteBuf} into a {@link RPCBlockReadRequest} object and return it.
*
* @param in the input {@link ByteBuf}
* @return The decoded RPCBlockRequest object
* @return The decoded RPCBlockReadRequest object
*/
public static RPCBlockRequest decode(ByteBuf in) {
public static RPCBlockReadRequest decode(ByteBuf in) {
long blockId = in.readLong();
long offset = in.readLong();
long length = in.readLong();
return new RPCBlockRequest(blockId, offset, length);
return new RPCBlockReadRequest(blockId, offset, length);
}

@Override
Expand All @@ -78,7 +77,7 @@ public void validate() {

@Override
public String toString() {
return "RPCBlockRequest(" + mBlockId + ", " + mOffset + ", " + mLength + ")";
return "RPCBlockReadRequest(" + mBlockId + ", " + mOffset + ", " + mLength + ")";
}

public long getBlockId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,26 @@

package tachyon.network.protocol;

import java.nio.ByteBuffer;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.google.common.primitives.Shorts;

import io.netty.buffer.ByteBuf;

import tachyon.network.protocol.databuffer.DataBuffer;
import tachyon.network.protocol.databuffer.DataByteBuffer;
import tachyon.network.protocol.databuffer.DataNettyBuffer;

/**
* This represents the response of a {@link RPCBlockRequest}.
* This represents the response of a {@link RPCBlockReadRequest}.
*/
public class RPCBlockResponse extends RPCResponse {
public class RPCBlockReadResponse extends RPCResponse {
private final long mBlockId;
private final long mOffset;
private final long mLength;
private final DataBuffer mData;
private final Status mStatus;

// TODO: rename this to RPCBlockReadResponse.
public RPCBlockResponse(long blockId, long offset, long length, DataBuffer data, Status status) {
public RPCBlockReadResponse(long blockId, long offset, long length, DataBuffer data,
Status status) {
mBlockId = blockId;
mOffset = offset;
mLength = length;
Expand All @@ -46,42 +43,43 @@ public RPCBlockResponse(long blockId, long offset, long length, DataBuffer data,
}

public Type getType() {
return Type.RPC_BLOCK_RESPONSE;
return Type.RPC_BLOCK_READ_RESPONSE;
}

/**
* Creates a {@link RPCBlockResponse} object that indicates an error for the given
* {@link RPCBlockRequest}.
* Creates a {@link RPCBlockReadResponse} object that indicates an error for the given
* {@link RPCBlockReadRequest}.
*
* @param request The {@link RPCBlockRequest} to generated the {@link RPCBlockResponse} for.
* @param request The {@link RPCBlockReadRequest} to generated
* the {@link RPCBlockReadResponse} for.
* @param status The {@link tachyon.network.protocol.RPCResponse.Status} for the response.
* @return The generated {@link RPCBlockResponse} object.
* @return The generated {@link RPCBlockReadResponse} object.
*/
public static RPCBlockResponse createErrorResponse(final RPCBlockRequest request,
public static RPCBlockReadResponse createErrorResponse(final RPCBlockReadRequest request,
final Status status) {
Preconditions.checkArgument(status != Status.SUCCESS);
// The response has no payload, so length must be 0.
return new RPCBlockResponse(request.getBlockId(), request.getOffset(), 0, null, status);
return new RPCBlockReadResponse(request.getBlockId(), request.getOffset(), 0, null, status);
}

/**
* Decode the input {@link ByteBuf} into a {@link RPCBlockResponse} object and return it.
* Decode the input {@link ByteBuf} into a {@link RPCBlockReadResponse} object and return it.
*
* @param in the input {@link ByteBuf}
* @return The decoded RPCBlockResponse object
* @return The decoded RPCBlockReadResponse object
*/
public static RPCBlockResponse decode(ByteBuf in) {
public static RPCBlockReadResponse decode(ByteBuf in) {
long blockId = in.readLong();
long offset = in.readLong();
long length = in.readLong();
short status = in.readShort();

DataBuffer data = null;
if (length > 0) {
assert (in.nioBufferCount() == 1);
data = new DataByteBuffer(in.nioBuffer(), (int) length);
// use DataNettyBuffer instead of DataByteBuffer to avoid copying
data = new DataNettyBuffer(in, (int) length);
}
return new RPCBlockResponse(blockId, offset, length, data, Status.fromShort(status));
return new RPCBlockReadResponse(blockId, offset, length, data, Status.fromShort(status));
}

@Override
Expand All @@ -107,7 +105,8 @@ public DataBuffer getPayloadDataBuffer() {

@Override
public String toString() {
return "RPCBlockResponse(" + mBlockId + ", " + mOffset + ", " + mLength + ", " + mStatus + ")";
return "RPCBlockReadResponse(" + mBlockId + ", " + mOffset
+ ", " + mLength + ", " + mStatus + ")";
}

public long getBlockId() {
Expand All @@ -125,4 +124,11 @@ public long getOffset() {
public Status getStatus() {
return mStatus;
}

public boolean releaseBuffer() {
if (mData instanceof DataNettyBuffer) {
return ((DataNettyBuffer) mData).releaseNettyBuffer();
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Type getType() {
* Decode the input {@link ByteBuf} into a {@link RPCBlockWriteRequest} object and return it.
*
* @param in the input {@link ByteBuf}
* @return The decoded RPCBlockResponse object
* @return The decoded RPCBlockWriteRequest object
*/
public static RPCBlockWriteRequest decode(ByteBuf in) {
long userId = in.readLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public RPCBlockWriteResponse(long userId, long blockId, long offset, long length
* Creates a {@link RPCBlockWriteResponse} object that indicates an error for the given
* {@link RPCBlockWriteRequest}.
*
* @param request The {@link RPCBlockWriteRequest} to generated the {@link RPCBlockResponse} for.
* @param request The {@link RPCBlockWriteRequest} to generated
* the {@link RPCBlockReadResponse} for.
* @param status The {@link tachyon.network.protocol.RPCResponse.Status} for the response.
* @return The generated {@link RPCBlockWriteResponse} object.
*/
Expand All @@ -63,7 +64,7 @@ public Type getType() {
* Decode the input {@link ByteBuf} into a {@link RPCBlockWriteResponse} object and return it.
*
* @param in the input {@link ByteBuf}
* @return The decoded RPCBlockResponse object
* @return The decoded RPCBlockWriteResponse object
*/
public static RPCBlockWriteResponse decode(ByteBuf in) {
long userId = in.readLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ public RPCErrorResponse(Status status) {
mStatus = status;
}

public Type getType() {
return Type.RPC_ERROR_RESPONSE;
}

/**
* Decode the input {@link ByteBuf} into a {@link RPCErrorResponse} object and return it.
*
Expand All @@ -44,17 +40,26 @@ public static RPCErrorResponse decode(ByteBuf in) {
}

@Override
public int getEncodedLength() {
// 1 short (mStatus)
return Shorts.BYTES;
public void encode(ByteBuf out) {
out.writeShort(mStatus.getId());
}

@Override
public void encode(ByteBuf out) {
out.writeShort(mStatus.getId());
public int getEncodedLength() {
// 1 short (mStatus)
return Shorts.BYTES;
}

public Status getStatus() {
return mStatus;
}

public Type getType() {
return Type.RPC_ERROR_RESPONSE;
}

@Override
public String toString() {
return "RPCErrorResponse(" + mStatus + ")";
}
}

0 comments on commit b396bbd

Please sign in to comment.