Skip to content

Commit

Permalink
[ALLUXIO-2977] Implement WriteRequestContext and other required classes
Browse files Browse the repository at this point in the history
  • Loading branch information
apc999 committed Aug 8, 2017
1 parent f065de1 commit 7f397c5
Show file tree
Hide file tree
Showing 13 changed files with 668 additions and 456 deletions.
Expand Up @@ -62,7 +62,7 @@
* @param <T> type of read request * @param <T> type of read request
*/ */
@NotThreadSafe @NotThreadSafe
abstract class AbstractReadHandler<T extends ReadRequestContext> abstract class AbstractReadHandler<T extends ReadRequestContext<?>>
extends ChannelInboundHandlerAdapter { extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(AbstractReadHandler.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractReadHandler.class);


Expand Down Expand Up @@ -135,8 +135,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
*/ */
@GuardedBy("mLock") @GuardedBy("mLock")
public boolean tooManyPendingPackets() { public boolean tooManyPendingPackets() {
return mContext.getPosToQueue() - mContext.getPosToWrite() return mContext.getPosToQueue() - mContext.getPosToWrite() >= MAX_PACKETS_IN_FLIGHT * mContext
>= MAX_PACKETS_IN_FLIGHT * mContext.getRequest().getPacketSize(); .getRequest().getPacketSize();
} }


/** /**
Expand Down Expand Up @@ -237,13 +237,13 @@ protected boolean acceptMessage(Object object) {
protected abstract T createRequestContext(Protocol.ReadRequest request); protected abstract T createRequestContext(Protocol.ReadRequest request);


/** /**
* Completes the read request. When the request is closed, we should clean up any temporary state * Creates a read reader.
* it may have accumulated. *
* @param request read request * @param context read request context
* @param channel channel * @param channel channel
* @return the packet reader for this handler * @return the packet reader for this handler
*/ */
protected abstract PacketReader createPacketReader(T request, Channel channel); protected abstract PacketReader createPacketReader(T context, Channel channel);


/** /**
* @param bytesRead bytes read * @param bytesRead bytes read
Expand Down Expand Up @@ -280,9 +280,8 @@ public void operationComplete(ChannelFuture future) {


try (LockResource lr = new LockResource(mLock)) { try (LockResource lr = new LockResource(mLock)) {
Preconditions.checkState( Preconditions.checkState(
mPosToWriteUncommitted - mContext.getPosToWrite() mPosToWriteUncommitted - mContext.getPosToWrite() <= mContext.getRequest()
<= mContext.getRequest().getPacketSize(), .getPacketSize(), "Some packet is not acked.");
"Some packet is not acked.");
incrementMetrics(mPosToWriteUncommitted - mContext.getPosToWrite()); incrementMetrics(mPosToWriteUncommitted - mContext.getPosToWrite());
mContext.setPosToWrite(mPosToWriteUncommitted); mContext.setPosToWrite(mPosToWriteUncommitted);


Expand Down Expand Up @@ -315,7 +314,7 @@ protected abstract class PacketReader implements Runnable {
/** /**
* Creates an instance of the {@link PacketReader}. * Creates an instance of the {@link PacketReader}.
* *
* @param context request to complete * @param context context of the request to complete
* @param channel the channel * @param channel the channel
*/ */
PacketReader(T context, Channel channel) { PacketReader(T context, Channel channel) {
Expand Down Expand Up @@ -351,8 +350,8 @@ private void runInternal() {
mContext.setPacketReaderActive(false); mContext.setPacketReaderActive(false);
break; break;
} }
packetSize = (int) Math.min(mRequest.getEnd() - mContext.getPosToQueue(), packetSize = (int) Math
mRequest.getPacketSize()); .min(mRequest.getEnd() - mContext.getPosToQueue(), mRequest.getPacketSize());


// packetSize should always be > 0 here when reaches here. // packetSize should always be > 0 here when reaches here.
Preconditions.checkState(packetSize > 0); Preconditions.checkState(packetSize > 0);
Expand All @@ -371,8 +370,8 @@ private void runInternal() {
mContext.setPosToQueue(mContext.getPosToQueue() + packet.getLength()); mContext.setPosToQueue(mContext.getPosToQueue() + packet.getLength());
} }
} }
if (packet == null || packet.getLength() < packetSize if (packet == null || packet.getLength() < packetSize || start + packetSize == mRequest
|| start + packetSize == mRequest.getEnd()) { .getEnd()) {
// This can happen if the requested read length is greater than the actual length of the // This can happen if the requested read length is greater than the actual length of the
// block or file starting from the given offset. // block or file starting from the given offset.
setEof(mChannel); setEof(mChannel);
Expand Down Expand Up @@ -413,23 +412,23 @@ private void runInternal() {
} }


/** /**
* Completes the read request. When the request is closed, we should clean up any temporary state * Completes the read request. When the request is closed, we should clean up any temporary
* it may have accumulated. * state it may have accumulated.
* *
* @param request request to complete * @param context context of the request to complete
*/ */
protected abstract void completeRequest(T request) throws Exception; protected abstract void completeRequest(T context) throws Exception;


/** /**
* Returns the appropriate {@link DataBuffer} representing the data to send, depending on the * Returns the appropriate {@link DataBuffer} representing the data to send, depending on the
* configurable transfer type. * configurable transfer type.
* *
* @param request request to complete * @param context context of the request to complete
* @param channel the netty channel * @param channel the netty channel
* @param len The length, in bytes, of the data to read from the block * @param len The length, in bytes, of the data to read from the block
* @return a {@link DataBuffer} representing the data * @return a {@link DataBuffer} representing the data
*/ */
protected abstract DataBuffer getDataBuffer(T request, Channel channel, long offset, int len) protected abstract DataBuffer getDataBuffer(T context, Channel channel, long offset, int len)
throws Exception; throws Exception;


/** /**
Expand All @@ -444,8 +443,8 @@ private void replyError(AlluxioStatusException e) {
* Writes a success response. * Writes a success response.
*/ */
private void replyEof() { private void replyEof() {
Preconditions.checkState(!mContext.isDone()); Preconditions.checkState(!mContext.isDoneUnsafe());
mContext.setDone(true); mContext.setDoneUnsafe(true);
mChannel.writeAndFlush(RPCProtoMessage.createOkResponse(null)) mChannel.writeAndFlush(RPCProtoMessage.createOkResponse(null))
.addListeners(ChannelFutureListener.CLOSE_ON_FAILURE); .addListeners(ChannelFutureListener.CLOSE_ON_FAILURE);
} }
Expand All @@ -454,8 +453,8 @@ private void replyEof() {
* Writes a cancel response. * Writes a cancel response.
*/ */
private void replyCancel() { private void replyCancel() {
Preconditions.checkState(!mContext.isDone()); Preconditions.checkState(!mContext.isDoneUnsafe());
mContext.setDone(true); mContext.setDoneUnsafe(true);
mChannel.writeAndFlush(RPCProtoMessage.createCancelResponse()) mChannel.writeAndFlush(RPCProtoMessage.createCancelResponse())
.addListeners(ChannelFutureListener.CLOSE_ON_FAILURE); .addListeners(ChannelFutureListener.CLOSE_ON_FAILURE);
} }
Expand Down

0 comments on commit 7f397c5

Please sign in to comment.