diff --git a/bin/alluxio b/bin/alluxio index 19c40287fce6..5eb19795a627 100755 --- a/bin/alluxio +++ b/bin/alluxio @@ -310,6 +310,10 @@ function main { for src_file in $(find ${BIN}/../core/server/src/proto -name '*.proto' -type f); do protoc --java_out=${BIN}/../core/server/src/main/java --proto_path=`dirname ${src_file}` ${src_file} done + rm -rf ${BIN}/../core/common/src/main/java/alluxio/proto + for src_file in $(find ${BIN}/../core/common/src/proto -name '*.proto' -type f); do + protoc --java_out=${BIN}/../core/common/src/main/java --proto_path=`dirname ${src_file}` ${src_file} + done ;; "clearCache") sync; echo 3 > /proc/sys/vm/drop_caches ; diff --git a/build/findbugs/findbugs-exclude.xml b/build/findbugs/findbugs-exclude.xml index baf001180a35..d857684dc6c6 100644 --- a/build/findbugs/findbugs-exclude.xml +++ b/build/findbugs/findbugs-exclude.xml @@ -5,6 +5,7 @@ + diff --git a/core/client/src/main/java/alluxio/client/block/stream/PacketInStream.java b/core/client/src/main/java/alluxio/client/block/stream/PacketInStream.java index 16bda72c8ddc..20d1842c578c 100644 --- a/core/client/src/main/java/alluxio/client/block/stream/PacketInStream.java +++ b/core/client/src/main/java/alluxio/client/block/stream/PacketInStream.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import javax.annotation.concurrent.NotThreadSafe; @@ -183,13 +182,6 @@ private void closePacketReader() { * @param packet the packet */ private void destroyPacket(ByteBuf packet) { - // TODO(peis): Investigate whether we can get rid of this. - if (packet.nioBufferCount() > 0) { - ByteBuffer buffer = packet.nioBuffer(); - if (buffer.isDirect()) { - BufferUtils.cleanDirectBuffer(buffer); - } - } ReferenceCountUtil.release(packet); } diff --git a/core/common/src/main/java/alluxio/network/protocol/RPCMessage.java b/core/common/src/main/java/alluxio/network/protocol/RPCMessage.java index b0553285fefe..b676cb9539fd 100644 --- a/core/common/src/main/java/alluxio/network/protocol/RPCMessage.java +++ b/core/common/src/main/java/alluxio/network/protocol/RPCMessage.java @@ -41,6 +41,12 @@ public enum Type implements EncodedMessage { RPC_FILE_READ_RESPONSE(6), RPC_FILE_WRITE_REQUEST(7), RPC_FILE_WRITE_RESPONSE(8), + + RPC_READ_REQUEST(9), + RPC_WRITE_REQUEST(10), + RPC_RESPONSE(11), + + RPC_UNKNOWN(1000), ; private final int mId; diff --git a/core/common/src/main/java/alluxio/network/protocol/RPCMessageEncoder.java b/core/common/src/main/java/alluxio/network/protocol/RPCMessageEncoder.java index d9591c872b28..50b2fedf0d6d 100644 --- a/core/common/src/main/java/alluxio/network/protocol/RPCMessageEncoder.java +++ b/core/common/src/main/java/alluxio/network/protocol/RPCMessageEncoder.java @@ -73,6 +73,5 @@ protected void encode(ChannelHandlerContext ctx, RPCMessage in, List out "The payload must be a ByteBuf or a FileRegion."); out.add(output); } - } } diff --git a/core/server/src/main/java/alluxio/worker/netty/DataServerBlockReadHandler.java b/core/server/src/main/java/alluxio/worker/netty/DataServerBlockReadHandler.java index 240dda8627dd..dc9d80380084 100644 --- a/core/server/src/main/java/alluxio/worker/netty/DataServerBlockReadHandler.java +++ b/core/server/src/main/java/alluxio/worker/netty/DataServerBlockReadHandler.java @@ -12,29 +12,32 @@ package alluxio.worker.netty; import alluxio.Constants; -import alluxio.network.protocol.RPCBlockReadRequest; +import alluxio.network.protocol.RPCProtoMessage; import alluxio.network.protocol.databuffer.DataBuffer; -import alluxio.network.protocol.databuffer.DataByteBuffer; import alluxio.network.protocol.databuffer.DataFileChannel; +import alluxio.network.protocol.databuffer.DataNettyBuffer; +import alluxio.proto.dataserver.Protocol; import alluxio.worker.block.BlockWorker; import alluxio.worker.block.io.BlockReader; import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.concurrent.ExecutorService; import javax.annotation.concurrent.NotThreadSafe; /** - * This class handles {@link RPCBlockReadRequest}s. + * This handler handles block read request. Check more information in {@link DataServerReadHandler}. */ @NotThreadSafe -final public class DataServerBlockReadHandler extends DataServerReadHandler { +public final class DataServerBlockReadHandler extends DataServerReadHandler { private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); /** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */ @@ -54,10 +57,10 @@ private final class BlockReadRequestInternal extends ReadRequestInternal { * @param request the block read request * @throws Exception if it fails to create the object */ - public BlockReadRequestInternal(RPCBlockReadRequest request) throws Exception { + public BlockReadRequestInternal(Protocol.ReadRequest request) throws Exception { mBlockReader = mWorker - .readBlockRemote(request.getSessionId(), request.getBlockId(), request.getLockId()); - mId = request.getBlockId(); + .readBlockRemote(request.getSessionId(), request.getId(), request.getLockId()); + mId = request.getId(); mWorker.accessBlock(request.getSessionId(), mId); mStart = request.getOffset(); @@ -91,21 +94,38 @@ public DataServerBlockReadHandler(ExecutorService executorService, BlockWorker b } @Override - protected void initializeRequest(RPCBlockReadRequest request) throws Exception { + protected boolean acceptMessage(Object object) { + if (!super.acceptMessage(object)) { + return false; + } + Protocol.ReadRequest request = (Protocol.ReadRequest) ((RPCProtoMessage) object).getMessage(); + return request.getType() == Protocol.RequestType.ALLUXIO_BLOCK; + } + + @Override + protected void initializeRequest(Protocol.ReadRequest request) throws Exception { mRequest = new BlockReadRequestInternal(request); } @Override - protected DataBuffer getDataBuffer(long offset, int len) throws IOException { + protected DataBuffer getDataBuffer(Channel channel, long offset, int len) throws IOException { BlockReader blockReader = ((BlockReadRequestInternal) mRequest).mBlockReader; + Preconditions.checkArgument(blockReader.getChannel() instanceof FileChannel, + "Only FileChannel is supported!"); switch (mTransferType) { case MAPPED: - ByteBuffer data = blockReader.read(offset, len); - return new DataByteBuffer(data, len); + ByteBuf buf = channel.alloc().buffer(len, len); + try { + while (buf.writableBytes() > 0 + && buf.writeBytes((FileChannel) blockReader.getChannel(), buf.writableBytes()) != -1) + ; + } catch (Throwable e) { + ReferenceCountUtil.release(buf); + throw e; + } + return new DataNettyBuffer(buf, buf.readableBytes()); case TRANSFER: // intend to fall through as TRANSFER is the default type. default: - Preconditions.checkArgument(blockReader.getChannel() instanceof FileChannel, - "Only FileChannel is supported!"); return new DataFileChannel((FileChannel) blockReader.getChannel(), offset, len); } } diff --git a/core/server/src/main/java/alluxio/worker/netty/DataServerBlockWriteHandler.java b/core/server/src/main/java/alluxio/worker/netty/DataServerBlockWriteHandler.java index 406a285340fe..fbb874db0c03 100644 --- a/core/server/src/main/java/alluxio/worker/netty/DataServerBlockWriteHandler.java +++ b/core/server/src/main/java/alluxio/worker/netty/DataServerBlockWriteHandler.java @@ -13,7 +13,8 @@ import alluxio.StorageTierAssoc; import alluxio.WorkerStorageTierAssoc; -import alluxio.network.protocol.RPCBlockWriteRequest; +import alluxio.network.protocol.RPCProtoMessage; +import alluxio.proto.dataserver.Protocol; import alluxio.worker.block.BlockWorker; import alluxio.worker.block.io.BlockWriter; @@ -27,20 +28,11 @@ import javax.annotation.concurrent.NotThreadSafe; /** - * This class handles {@link RPCBlockWriteRequest}s. - * - * Protocol: Check {@link alluxio.client.block.stream.NettyBlockWriter} for more information. - * 1. The netty channel handler streams packets from the channel and buffers them. The netty - * reader is paused if the buffer is full by turning off the auto read, and is resumed when - * the buffer is not full. - * 2. The {@link PacketWriter} polls packets from the buffer and writes to the block worker. The - * writer becomes inactive if there is nothing on the buffer to free up the executor. It is - * resumed when the buffer becomes non-empty. - * 3. When an error occurs, the channel is closed. All the buffered packets are released when the - * channel is deregistered. + * This handler handles block write request. Check more information in + * {@link DataServerWriteHandler}. */ @NotThreadSafe -public abstract class DataServerBlockWriteHandler extends DataServerWriteHandler { +public final class DataServerBlockWriteHandler extends DataServerWriteHandler { /** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */ private final BlockWorker mWorker; /** An object storing the mapping of tier aliases to ordinals. */ @@ -49,10 +41,10 @@ public abstract class DataServerBlockWriteHandler extends DataServerWriteHandler private class BlockWriteRequestInternal extends WriteRequestInternal { public BlockWriter mBlockWriter; - public BlockWriteRequestInternal(RPCBlockWriteRequest request) throws Exception { - mBlockWriter = mWorker.getTempBlockWriterRemote(request.getSessionId(), request.getBlockId()); + public BlockWriteRequestInternal(Protocol.WriteRequest request) throws Exception { + mBlockWriter = mWorker.getTempBlockWriterRemote(request.getSessionId(), request.getId()); mSessionId = request.getSessionId(); - mId = request.getBlockId(); + mId = request.getId(); } @Override @@ -77,10 +69,11 @@ public DataServerBlockWriteHandler(ExecutorService executorService, BlockWorker * @param msg the block write request * @throws Exception if it fails to initialize */ - protected void initializeRequest(RPCBlockWriteRequest msg) throws Exception { + protected void initializeRequest(RPCProtoMessage msg) throws Exception { super.initializeRequest(msg); if (mRequest == null) { - mRequest = new BlockWriteRequestInternal(msg); + Protocol.WriteRequest request = (Protocol.WriteRequest) (msg.getMessage()); + mRequest = new BlockWriteRequestInternal(request); } } diff --git a/core/server/src/main/java/alluxio/worker/netty/DataServerFileReadHandler.java b/core/server/src/main/java/alluxio/worker/netty/DataServerFileReadHandler.java index 5c3d53ecbb14..50a1613cc343 100644 --- a/core/server/src/main/java/alluxio/worker/netty/DataServerFileReadHandler.java +++ b/core/server/src/main/java/alluxio/worker/netty/DataServerFileReadHandler.java @@ -12,12 +12,15 @@ package alluxio.worker.netty; import alluxio.Constants; -import alluxio.network.protocol.RPCBlockReadRequest; +import alluxio.network.protocol.RPCProtoMessage; import alluxio.network.protocol.databuffer.DataBuffer; import alluxio.network.protocol.databuffer.DataNettyBuffer; +import alluxio.proto.dataserver.Protocol; import alluxio.worker.file.FileSystemWorker; -import io.netty.buffer.Unpooled; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,10 +31,10 @@ import javax.annotation.concurrent.NotThreadSafe; /** - * This class handles {@link RPCBlockReadRequest}s. + * This handler handles file read request. Check more information in {@link DataServerReadHandler}. */ @NotThreadSafe -final public class DataServerFileReadHandler extends DataServerReadHandler { +public final class DataServerFileReadHandler extends DataServerReadHandler { private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); /** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */ @@ -50,8 +53,8 @@ private final class FileReadRequestInternal extends ReadRequestInternal { * @param request the block read request * @throws Exception if it fails to create the object */ - public FileReadRequestInternal(RPCBlockReadRequest request) throws Exception { - mInputStream = mWorker.getUfsInputStream(request.getBlockId(), request.getOffset()); + public FileReadRequestInternal(Protocol.ReadRequest request) throws Exception { + mInputStream = mWorker.getUfsInputStream(request.getId(), request.getOffset()); mStart = request.getOffset(); mEnd = mStart + request.getLength(); @@ -73,26 +76,37 @@ public DataServerFileReadHandler(ExecutorService executorService, FileSystemWork } @Override - protected void initializeRequest(RPCBlockReadRequest request) throws Exception { + protected boolean acceptMessage(Object object) { + if (!super.acceptMessage(object)) { + return false; + } + Protocol.ReadRequest request = (Protocol.ReadRequest) ((RPCProtoMessage) object).getMessage(); + return request.getType() == Protocol.RequestType.UFS_FILE; + } + + @Override + protected void initializeRequest(Protocol.ReadRequest request) throws Exception { mRequest = new FileReadRequestInternal(request); } @Override - protected DataBuffer getDataBuffer(long offset, int len) throws IOException { - byte[] data = new byte[len]; + protected DataBuffer getDataBuffer(Channel channel, long offset, int len) throws IOException { + ByteBuf buf = channel.alloc().buffer(len, len); InputStream in = ((FileReadRequestInternal) mRequest).mInputStream; - int bytesRead = 0; - if (in != null) { // if we have not reached the end of the file - while (bytesRead < len) { - int read = in.read(data, bytesRead, len - bytesRead); - if (read == -1) { - break; - } - bytesRead += read; + try { + if (in != null) { // if we have not reached the end of the file + while (buf.writableBytes() > 0 && buf.writeBytes(in, buf.writableBytes()) != -1); } + } catch (Throwable e) { + ReferenceCountUtil.release(buf); + throw e; + } + + if (buf.readableBytes() == 0) { + ReferenceCountUtil.release(buf); + return null; } - return bytesRead != 0 ? - new DataNettyBuffer(Unpooled.wrappedBuffer(data, 0, bytesRead), bytesRead) : null; + return new DataNettyBuffer(buf, buf.readableBytes()); } } diff --git a/core/server/src/main/java/alluxio/worker/netty/DataServerFileWriteHandler.java b/core/server/src/main/java/alluxio/worker/netty/DataServerFileWriteHandler.java index 432f184f0024..d055ce560b5c 100644 --- a/core/server/src/main/java/alluxio/worker/netty/DataServerFileWriteHandler.java +++ b/core/server/src/main/java/alluxio/worker/netty/DataServerFileWriteHandler.java @@ -11,7 +11,8 @@ package alluxio.worker.netty; -import alluxio.network.protocol.RPCBlockWriteRequest; +import alluxio.network.protocol.RPCProtoMessage; +import alluxio.proto.dataserver.Protocol; import alluxio.worker.file.FileSystemWorker; import io.netty.buffer.ByteBuf; @@ -24,29 +25,20 @@ import javax.annotation.concurrent.NotThreadSafe; /** - * This class handles {@link RPCBlockWriteRequest}s. - * - * Protocol: Check {@link alluxio.client.block.stream.NettyBlockWriter} for more information. - * 1. The netty channel handler streams packets from the channel and buffers them. The netty - * reader is paused if the buffer is full by turning off the auto read, and is resumed when - * the buffer is not full. - * 2. The {@link PacketWriter} polls packets from the buffer and writes to the block worker. The - * writer becomes inactive if there is nothing on the buffer to free up the executor. It is - * resumed when the buffer becomes non-empty. - * 3. When an error occurs, the channel is closed. All the buffered packets are released when the - * channel is deregistered. + * This handler handles file write request. Check more information in + * {@link DataServerWriteHandler}. */ @NotThreadSafe -public abstract class DataServerFileWriteHandler extends DataServerWriteHandler { +public final class DataServerFileWriteHandler extends DataServerWriteHandler { /** Filesystem worker which handles file level operations for the worker. */ private final FileSystemWorker mWorker; private class FileWriteRequestInternal extends WriteRequestInternal { public OutputStream mOutputStream; - public FileWriteRequestInternal(RPCBlockWriteRequest request) throws Exception { - mOutputStream = mWorker.getUfsOutputStream(request.getBlockId()); - mId = request.getBlockId(); + public FileWriteRequestInternal(Protocol.WriteRequest request) throws Exception { + mOutputStream = mWorker.getUfsOutputStream(request.getId()); + mId = request.getId(); } @Override @@ -69,10 +61,11 @@ public DataServerFileWriteHandler(ExecutorService executorService, FileSystemWor * @param msg the block write request * @throws Exception if it fails to initialize */ - protected void initializeRequest(RPCBlockWriteRequest msg) throws Exception { + @Override + protected void initializeRequest(RPCProtoMessage msg) throws Exception { super.initializeRequest(msg); if (mRequest == null) { - mRequest = new FileWriteRequestInternal(msg); + mRequest = new FileWriteRequestInternal((Protocol.WriteRequest) msg.getMessage()); } } diff --git a/core/server/src/main/java/alluxio/worker/netty/DataServerReadHandler.java b/core/server/src/main/java/alluxio/worker/netty/DataServerReadHandler.java index a4f70b287d19..2f92f0d418f0 100644 --- a/core/server/src/main/java/alluxio/worker/netty/DataServerReadHandler.java +++ b/core/server/src/main/java/alluxio/worker/netty/DataServerReadHandler.java @@ -15,10 +15,12 @@ import alluxio.Constants; import alluxio.PropertyKey; import alluxio.metrics.MetricsSystem; -import alluxio.network.protocol.RPCBlockReadRequest; import alluxio.network.protocol.RPCBlockReadResponse; +import alluxio.network.protocol.RPCMessage; +import alluxio.network.protocol.RPCProtoMessage; import alluxio.network.protocol.RPCResponse; import alluxio.network.protocol.databuffer.DataBuffer; +import alluxio.proto.dataserver.Protocol; import com.codahale.metrics.Counter; import com.google.common.base.Preconditions; @@ -26,7 +28,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +41,7 @@ import javax.annotation.concurrent.NotThreadSafe; /** - * This class handles {@link RPCBlockReadRequest}s. + * This class handles {@link alluxio.proto.dataserver.Protocol.ReadRequest}s. * * Protocol: Check {@link alluxio.client.block.stream.NettyPacketReader} for more information. * 1. Once a read request is received, the handler creates a {@link PacketReader} which reads @@ -49,8 +51,7 @@ * 3. The channel is closed if there is any exception during the packet read/write. */ @NotThreadSafe -public abstract class DataServerReadHandler - extends SimpleChannelInboundHandler { +public abstract class DataServerReadHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final long PACKET_SIZE = @@ -103,16 +104,23 @@ public DataServerReadHandler(ExecutorService executorService) { @Override public void channelUnregistered(ChannelHandlerContext ctx) { reset(); + ctx.fireChannelUnregistered(); } @Override - public void channelRead0(ChannelHandlerContext ctx, RPCBlockReadRequest msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object object) throws Exception { + if (!acceptMessage(object)) { + ctx.fireChannelRead(object); + return; + } + Protocol.ReadRequest msg = (Protocol.ReadRequest) ((RPCProtoMessage) object).getMessage(); + if (!validateReadRequest(msg)) { replyError(ctx.channel()); return; } - if (msg.isCancelRequest()) { + if (msg.getCancel()) { mRequest.mCancelled = true; return; } @@ -206,13 +214,13 @@ private void replySuccess(Channel channel) { * @param request the block read request * @return true if the block read request is valid */ - private boolean validateReadRequest(RPCBlockReadRequest request) { + private boolean validateReadRequest(Protocol.ReadRequest request) { if (mRequest == null) { return true; } - if (request.isCancelRequest()) { - if (request.getBlockId() != mRequest.mId) { + if (request.getCancel()) { + if (request.getId() != mRequest.mId) { return false; } return true; @@ -238,23 +246,39 @@ private void reset() { } } + /** + * Checks whether this object should be processed by this handler. + * + * @param object the object + * @return true if this object should be processed + */ + protected boolean acceptMessage(Object object) { + if (!(object instanceof RPCProtoMessage)) { + return false; + } + RPCProtoMessage message = (RPCProtoMessage) object; + return message.getType() == RPCMessage.Type.RPC_READ_REQUEST; + } + /** * Initializes the handler for the given block read request. * * @param request the block read request * @throws Exception if it fails to initialize */ - protected abstract void initializeRequest(RPCBlockReadRequest request) throws Exception; + protected abstract void initializeRequest(Protocol.ReadRequest request) throws Exception; /** * Returns the appropriate {@link DataBuffer} representing the data to send, depending on the * configurable transfer type. * + * @param channel the netty channel * @param len The length, in bytes, of the data to read from the block * @return a {@link DataBuffer} representing the data * @throws IOException if an I/O error occurs when reading the data */ - protected abstract DataBuffer getDataBuffer(long offset, int len) throws IOException; + protected abstract DataBuffer getDataBuffer(Channel channel, long offset, int len) + throws IOException; /** * The channel handler listener that runs after a packet write is flushed. @@ -334,7 +358,7 @@ public void run() { DataBuffer packet; try { - packet = getDataBuffer(start, packet_size); + packet = getDataBuffer(mChannel, start, packet_size); } catch (Exception e) { mChannel.pipeline().fireExceptionCaught(e); break; diff --git a/core/server/src/main/java/alluxio/worker/netty/DataServerWriteHandler.java b/core/server/src/main/java/alluxio/worker/netty/DataServerWriteHandler.java index dcced6be4fd2..485d5ff27ab5 100644 --- a/core/server/src/main/java/alluxio/worker/netty/DataServerWriteHandler.java +++ b/core/server/src/main/java/alluxio/worker/netty/DataServerWriteHandler.java @@ -14,14 +14,16 @@ import alluxio.Configuration; import alluxio.Constants; import alluxio.PropertyKey; -import alluxio.network.protocol.RPCBlockWriteRequest; import alluxio.network.protocol.RPCBlockWriteResponse; +import alluxio.network.protocol.RPCMessage; +import alluxio.network.protocol.RPCProtoMessage; import alluxio.network.protocol.RPCResponse; +import alluxio.proto.dataserver.Protocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,9 +39,9 @@ import javax.annotation.concurrent.NotThreadSafe; /** - * This class handles {@link RPCBlockWriteRequest}s. + * This class handles {@link alluxio.proto.dataserver.Protocol.WriteRequest}s. * - * Protocol: Check {@link alluxio.client.block.stream.NettyBlockWriter} for more information. + * Protocol: Check {@link alluxio.client.block.stream.NettyPacketWriter} for more information. * 1. The netty channel handler streams packets from the channel and buffers them. The netty * reader is paused if the buffer is full by turning off the auto read, and is resumed when * the buffer is not full. @@ -50,8 +52,7 @@ * channel is deregistered. */ @NotThreadSafe -public abstract class DataServerWriteHandler - extends SimpleChannelInboundHandler { +public abstract class DataServerWriteHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final int MAX_PACKETS_IN_FLIGHT = @@ -87,7 +88,7 @@ protected abstract class WriteRequestInternal implements Closeable { protected volatile long mPosToWrite = 0; /** - * Creates an instance of {@link BlockWriteDataServerHandler}. + * Creates an instance of {@link DataServerWriteHandler}. * * @param executorService the executor service to run {@link PacketWriter}s. */ @@ -96,7 +97,13 @@ public DataServerWriteHandler(ExecutorService executorService) { } @Override - public void channelRead0(ChannelHandlerContext ctx, RPCBlockWriteRequest msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object object) throws Exception { + if (!acceptMessage(object)) { + ctx.fireChannelRead(object); + return; + } + + RPCProtoMessage msg = (RPCProtoMessage) object; initializeRequest(msg); // Validate msg and return error if invalid. Init variables if necessary. @@ -107,7 +114,7 @@ public void channelRead0(ChannelHandlerContext ctx, RPCBlockWriteRequest msg) th mLock.lock(); try { - ByteBuf buf = (ByteBuf) msg.getPayloadDataBuffer().getNettyOutput(); + ByteBuf buf = (ByteBuf) (msg.getPayloadDataBuffer().getNettyOutput()); mPackets.offer(buf); mPosToQueue += buf.readableBytes(); if (!mPacketWriterActive) { @@ -150,16 +157,12 @@ private boolean tooManyPacketsInFlight() { * @param msg the block write request * @return true if the request valid */ - private boolean validateRequest(RPCBlockWriteRequest msg) { - if (msg.getBlockId() != mRequest.mId || msg.getLength() < 0) { + private boolean validateRequest(RPCProtoMessage msg) { + Protocol.WriteRequest request = (Protocol.WriteRequest) msg.getMessage(); + if (request.getId() != mRequest.mId || !msg.hasPayload()) { return false; } - if (msg.getOffset() != mPosToQueue) { - return false; - } - - // The last packet (signified by msg.getLength()) should not contain any data. - if (msg.getLength() == 0 && msg.getPayloadDataBuffer().getLength() > 0) { + if (request.getOffset() != mPosToQueue) { return false; } return true; @@ -267,13 +270,27 @@ public void run() { } } + /** + * Checks whether this object should be processed by this handler. + * + * @param object the object + * @return true if this object should be processed + */ + protected boolean acceptMessage(Object object) { + if (!(object instanceof RPCProtoMessage)) { + return false; + } + RPCProtoMessage message = (RPCProtoMessage) object; + return message.getType() == RPCMessage.Type.RPC_WRITE_REQUEST; + } + /** * Initializes the handler if necessary. * * @param msg the block write request * @throws Exception if it fails to initialize */ - protected void initializeRequest(RPCBlockWriteRequest msg) throws Exception { + protected void initializeRequest(RPCProtoMessage msg) throws Exception { if (mRequest == null) { mPosToQueue = 0; mPosToWrite = 0;