Skip to content

Commit

Permalink
RPCProtoMessage. Use that in the server
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Dec 20, 2016
1 parent 1b48dfd commit fb18239
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 109 deletions.
4 changes: 4 additions & 0 deletions bin/alluxio
Expand Up @@ -310,6 +310,10 @@ function main {
for src_file in $(find ${BIN}/../core/server/src/proto -name '*.proto' -type f); do for src_file in $(find ${BIN}/../core/server/src/proto -name '*.proto' -type f); do
protoc --java_out=${BIN}/../core/server/src/main/java --proto_path=`dirname ${src_file}` ${src_file} protoc --java_out=${BIN}/../core/server/src/main/java --proto_path=`dirname ${src_file}` ${src_file}
done done
rm -rf ${BIN}/../core/common/src/main/java/alluxio/proto
for src_file in $(find ${BIN}/../core/common/src/proto -name '*.proto' -type f); do
protoc --java_out=${BIN}/../core/common/src/main/java --proto_path=`dirname ${src_file}` ${src_file}
done
;; ;;
"clearCache") "clearCache")
sync; echo 3 > /proc/sys/vm/drop_caches ; sync; echo 3 > /proc/sys/vm/drop_caches ;
Expand Down
1 change: 1 addition & 0 deletions build/findbugs/findbugs-exclude.xml
Expand Up @@ -5,6 +5,7 @@
<Package name="org.apache.thrift"/> <Package name="org.apache.thrift"/>
<Package name="alluxio.thrift"/> <Package name="alluxio.thrift"/>
<Package name="alluxio.proto.journal.*"/> <Package name="alluxio.proto.journal.*"/>
<Package name="alluxio.proto.dataserver.*"/>
</Or> </Or>
</Match> </Match>


Expand Down
Expand Up @@ -22,7 +22,6 @@


import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;


import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;


Expand Down Expand Up @@ -183,13 +182,6 @@ private void closePacketReader() {
* @param packet the packet * @param packet the packet
*/ */
private void destroyPacket(ByteBuf packet) { private void destroyPacket(ByteBuf packet) {
// TODO(peis): Investigate whether we can get rid of this.
if (packet.nioBufferCount() > 0) {
ByteBuffer buffer = packet.nioBuffer();
if (buffer.isDirect()) {
BufferUtils.cleanDirectBuffer(buffer);
}
}
ReferenceCountUtil.release(packet); ReferenceCountUtil.release(packet);
} }


Expand Down
Expand Up @@ -41,6 +41,12 @@ public enum Type implements EncodedMessage {
RPC_FILE_READ_RESPONSE(6), RPC_FILE_READ_RESPONSE(6),
RPC_FILE_WRITE_REQUEST(7), RPC_FILE_WRITE_REQUEST(7),
RPC_FILE_WRITE_RESPONSE(8), RPC_FILE_WRITE_RESPONSE(8),

RPC_READ_REQUEST(9),
RPC_WRITE_REQUEST(10),
RPC_RESPONSE(11),

RPC_UNKNOWN(1000),
; ;


private final int mId; private final int mId;
Expand Down
Expand Up @@ -73,6 +73,5 @@ protected void encode(ChannelHandlerContext ctx, RPCMessage in, List<Object> out
"The payload must be a ByteBuf or a FileRegion."); "The payload must be a ByteBuf or a FileRegion.");
out.add(output); out.add(output);
} }

} }
} }
Expand Up @@ -12,29 +12,32 @@
package alluxio.worker.netty; package alluxio.worker.netty;


import alluxio.Constants; import alluxio.Constants;
import alluxio.network.protocol.RPCBlockReadRequest; import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.databuffer.DataBuffer; import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataByteBuffer;
import alluxio.network.protocol.databuffer.DataFileChannel; import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.network.protocol.databuffer.DataNettyBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.worker.block.BlockWorker; import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.BlockReader; import alluxio.worker.block.io.BlockReader;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;


import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;


/** /**
* This class handles {@link RPCBlockReadRequest}s. * This handler handles block read request. Check more information in {@link DataServerReadHandler}.
*/ */
@NotThreadSafe @NotThreadSafe
final public class DataServerBlockReadHandler extends DataServerReadHandler { public final class DataServerBlockReadHandler extends DataServerReadHandler {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);


/** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */ /** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */
Expand All @@ -54,10 +57,10 @@ private final class BlockReadRequestInternal extends ReadRequestInternal {
* @param request the block read request * @param request the block read request
* @throws Exception if it fails to create the object * @throws Exception if it fails to create the object
*/ */
public BlockReadRequestInternal(RPCBlockReadRequest request) throws Exception { public BlockReadRequestInternal(Protocol.ReadRequest request) throws Exception {
mBlockReader = mWorker mBlockReader = mWorker
.readBlockRemote(request.getSessionId(), request.getBlockId(), request.getLockId()); .readBlockRemote(request.getSessionId(), request.getId(), request.getLockId());
mId = request.getBlockId(); mId = request.getId();
mWorker.accessBlock(request.getSessionId(), mId); mWorker.accessBlock(request.getSessionId(), mId);


mStart = request.getOffset(); mStart = request.getOffset();
Expand Down Expand Up @@ -91,21 +94,38 @@ public DataServerBlockReadHandler(ExecutorService executorService, BlockWorker b
} }


@Override @Override
protected void initializeRequest(RPCBlockReadRequest request) throws Exception { protected boolean acceptMessage(Object object) {
if (!super.acceptMessage(object)) {
return false;
}
Protocol.ReadRequest request = (Protocol.ReadRequest) ((RPCProtoMessage) object).getMessage();
return request.getType() == Protocol.RequestType.ALLUXIO_BLOCK;
}

@Override
protected void initializeRequest(Protocol.ReadRequest request) throws Exception {
mRequest = new BlockReadRequestInternal(request); mRequest = new BlockReadRequestInternal(request);
} }


@Override @Override
protected DataBuffer getDataBuffer(long offset, int len) throws IOException { protected DataBuffer getDataBuffer(Channel channel, long offset, int len) throws IOException {
BlockReader blockReader = ((BlockReadRequestInternal) mRequest).mBlockReader; BlockReader blockReader = ((BlockReadRequestInternal) mRequest).mBlockReader;
Preconditions.checkArgument(blockReader.getChannel() instanceof FileChannel,
"Only FileChannel is supported!");
switch (mTransferType) { switch (mTransferType) {
case MAPPED: case MAPPED:
ByteBuffer data = blockReader.read(offset, len); ByteBuf buf = channel.alloc().buffer(len, len);
return new DataByteBuffer(data, len); try {
while (buf.writableBytes() > 0
&& buf.writeBytes((FileChannel) blockReader.getChannel(), buf.writableBytes()) != -1)
;
} catch (Throwable e) {
ReferenceCountUtil.release(buf);
throw e;
}
return new DataNettyBuffer(buf, buf.readableBytes());
case TRANSFER: // intend to fall through as TRANSFER is the default type. case TRANSFER: // intend to fall through as TRANSFER is the default type.
default: default:
Preconditions.checkArgument(blockReader.getChannel() instanceof FileChannel,
"Only FileChannel is supported!");
return new DataFileChannel((FileChannel) blockReader.getChannel(), offset, len); return new DataFileChannel((FileChannel) blockReader.getChannel(), offset, len);
} }
} }
Expand Down
Expand Up @@ -13,7 +13,8 @@


import alluxio.StorageTierAssoc; import alluxio.StorageTierAssoc;
import alluxio.WorkerStorageTierAssoc; import alluxio.WorkerStorageTierAssoc;
import alluxio.network.protocol.RPCBlockWriteRequest; import alluxio.network.protocol.RPCProtoMessage;
import alluxio.proto.dataserver.Protocol;
import alluxio.worker.block.BlockWorker; import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.BlockWriter; import alluxio.worker.block.io.BlockWriter;


Expand All @@ -27,20 +28,11 @@
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;


/** /**
* This class handles {@link RPCBlockWriteRequest}s. * This handler handles block write request. Check more information in
* * {@link DataServerWriteHandler}.
* Protocol: Check {@link alluxio.client.block.stream.NettyBlockWriter} for more information.
* 1. The netty channel handler streams packets from the channel and buffers them. The netty
* reader is paused if the buffer is full by turning off the auto read, and is resumed when
* the buffer is not full.
* 2. The {@link PacketWriter} polls packets from the buffer and writes to the block worker. The
* writer becomes inactive if there is nothing on the buffer to free up the executor. It is
* resumed when the buffer becomes non-empty.
* 3. When an error occurs, the channel is closed. All the buffered packets are released when the
* channel is deregistered.
*/ */
@NotThreadSafe @NotThreadSafe
public abstract class DataServerBlockWriteHandler extends DataServerWriteHandler { public final class DataServerBlockWriteHandler extends DataServerWriteHandler {
/** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */ /** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */
private final BlockWorker mWorker; private final BlockWorker mWorker;
/** An object storing the mapping of tier aliases to ordinals. */ /** An object storing the mapping of tier aliases to ordinals. */
Expand All @@ -49,10 +41,10 @@ public abstract class DataServerBlockWriteHandler extends DataServerWriteHandler
private class BlockWriteRequestInternal extends WriteRequestInternal { private class BlockWriteRequestInternal extends WriteRequestInternal {
public BlockWriter mBlockWriter; public BlockWriter mBlockWriter;


public BlockWriteRequestInternal(RPCBlockWriteRequest request) throws Exception { public BlockWriteRequestInternal(Protocol.WriteRequest request) throws Exception {
mBlockWriter = mWorker.getTempBlockWriterRemote(request.getSessionId(), request.getBlockId()); mBlockWriter = mWorker.getTempBlockWriterRemote(request.getSessionId(), request.getId());
mSessionId = request.getSessionId(); mSessionId = request.getSessionId();
mId = request.getBlockId(); mId = request.getId();
} }


@Override @Override
Expand All @@ -77,10 +69,11 @@ public DataServerBlockWriteHandler(ExecutorService executorService, BlockWorker
* @param msg the block write request * @param msg the block write request
* @throws Exception if it fails to initialize * @throws Exception if it fails to initialize
*/ */
protected void initializeRequest(RPCBlockWriteRequest msg) throws Exception { protected void initializeRequest(RPCProtoMessage msg) throws Exception {
super.initializeRequest(msg); super.initializeRequest(msg);
if (mRequest == null) { if (mRequest == null) {
mRequest = new BlockWriteRequestInternal(msg); Protocol.WriteRequest request = (Protocol.WriteRequest) (msg.getMessage());
mRequest = new BlockWriteRequestInternal(request);
} }
} }


Expand Down
Expand Up @@ -12,12 +12,15 @@
package alluxio.worker.netty; package alluxio.worker.netty;


import alluxio.Constants; import alluxio.Constants;
import alluxio.network.protocol.RPCBlockReadRequest; import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.databuffer.DataBuffer; import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataNettyBuffer; import alluxio.network.protocol.databuffer.DataNettyBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.worker.file.FileSystemWorker; import alluxio.worker.file.FileSystemWorker;


import io.netty.buffer.Unpooled; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -28,10 +31,10 @@
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;


/** /**
* This class handles {@link RPCBlockReadRequest}s. * This handler handles file read request. Check more information in {@link DataServerReadHandler}.
*/ */
@NotThreadSafe @NotThreadSafe
final public class DataServerFileReadHandler extends DataServerReadHandler { public final class DataServerFileReadHandler extends DataServerReadHandler {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);


/** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */ /** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */
Expand All @@ -50,8 +53,8 @@ private final class FileReadRequestInternal extends ReadRequestInternal {
* @param request the block read request * @param request the block read request
* @throws Exception if it fails to create the object * @throws Exception if it fails to create the object
*/ */
public FileReadRequestInternal(RPCBlockReadRequest request) throws Exception { public FileReadRequestInternal(Protocol.ReadRequest request) throws Exception {
mInputStream = mWorker.getUfsInputStream(request.getBlockId(), request.getOffset()); mInputStream = mWorker.getUfsInputStream(request.getId(), request.getOffset());


mStart = request.getOffset(); mStart = request.getOffset();
mEnd = mStart + request.getLength(); mEnd = mStart + request.getLength();
Expand All @@ -73,26 +76,37 @@ public DataServerFileReadHandler(ExecutorService executorService, FileSystemWork
} }


@Override @Override
protected void initializeRequest(RPCBlockReadRequest request) throws Exception { protected boolean acceptMessage(Object object) {
if (!super.acceptMessage(object)) {
return false;
}
Protocol.ReadRequest request = (Protocol.ReadRequest) ((RPCProtoMessage) object).getMessage();
return request.getType() == Protocol.RequestType.UFS_FILE;
}

@Override
protected void initializeRequest(Protocol.ReadRequest request) throws Exception {
mRequest = new FileReadRequestInternal(request); mRequest = new FileReadRequestInternal(request);
} }


@Override @Override
protected DataBuffer getDataBuffer(long offset, int len) throws IOException { protected DataBuffer getDataBuffer(Channel channel, long offset, int len) throws IOException {
byte[] data = new byte[len]; ByteBuf buf = channel.alloc().buffer(len, len);
InputStream in = ((FileReadRequestInternal) mRequest).mInputStream; InputStream in = ((FileReadRequestInternal) mRequest).mInputStream;


int bytesRead = 0; try {
if (in != null) { // if we have not reached the end of the file if (in != null) { // if we have not reached the end of the file
while (bytesRead < len) { while (buf.writableBytes() > 0 && buf.writeBytes(in, buf.writableBytes()) != -1);
int read = in.read(data, bytesRead, len - bytesRead);
if (read == -1) {
break;
}
bytesRead += read;
} }
} catch (Throwable e) {
ReferenceCountUtil.release(buf);
throw e;
}

if (buf.readableBytes() == 0) {
ReferenceCountUtil.release(buf);
return null;
} }
return bytesRead != 0 ? return new DataNettyBuffer(buf, buf.readableBytes());
new DataNettyBuffer(Unpooled.wrappedBuffer(data, 0, bytesRead), bytesRead) : null;
} }
} }
Expand Up @@ -11,7 +11,8 @@


package alluxio.worker.netty; package alluxio.worker.netty;


import alluxio.network.protocol.RPCBlockWriteRequest; import alluxio.network.protocol.RPCProtoMessage;
import alluxio.proto.dataserver.Protocol;
import alluxio.worker.file.FileSystemWorker; import alluxio.worker.file.FileSystemWorker;


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
Expand All @@ -24,29 +25,20 @@
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;


/** /**
* This class handles {@link RPCBlockWriteRequest}s. * This handler handles file write request. Check more information in
* * {@link DataServerWriteHandler}.
* Protocol: Check {@link alluxio.client.block.stream.NettyBlockWriter} for more information.
* 1. The netty channel handler streams packets from the channel and buffers them. The netty
* reader is paused if the buffer is full by turning off the auto read, and is resumed when
* the buffer is not full.
* 2. The {@link PacketWriter} polls packets from the buffer and writes to the block worker. The
* writer becomes inactive if there is nothing on the buffer to free up the executor. It is
* resumed when the buffer becomes non-empty.
* 3. When an error occurs, the channel is closed. All the buffered packets are released when the
* channel is deregistered.
*/ */
@NotThreadSafe @NotThreadSafe
public abstract class DataServerFileWriteHandler extends DataServerWriteHandler { public final class DataServerFileWriteHandler extends DataServerWriteHandler {
/** Filesystem worker which handles file level operations for the worker. */ /** Filesystem worker which handles file level operations for the worker. */
private final FileSystemWorker mWorker; private final FileSystemWorker mWorker;


private class FileWriteRequestInternal extends WriteRequestInternal { private class FileWriteRequestInternal extends WriteRequestInternal {
public OutputStream mOutputStream; public OutputStream mOutputStream;


public FileWriteRequestInternal(RPCBlockWriteRequest request) throws Exception { public FileWriteRequestInternal(Protocol.WriteRequest request) throws Exception {
mOutputStream = mWorker.getUfsOutputStream(request.getBlockId()); mOutputStream = mWorker.getUfsOutputStream(request.getId());
mId = request.getBlockId(); mId = request.getId();
} }


@Override @Override
Expand All @@ -69,10 +61,11 @@ public DataServerFileWriteHandler(ExecutorService executorService, FileSystemWor
* @param msg the block write request * @param msg the block write request
* @throws Exception if it fails to initialize * @throws Exception if it fails to initialize
*/ */
protected void initializeRequest(RPCBlockWriteRequest msg) throws Exception { @Override
protected void initializeRequest(RPCProtoMessage msg) throws Exception {
super.initializeRequest(msg); super.initializeRequest(msg);
if (mRequest == null) { if (mRequest == null) {
mRequest = new FileWriteRequestInternal(msg); mRequest = new FileWriteRequestInternal((Protocol.WriteRequest) msg.getMessage());
} }
} }


Expand Down

0 comments on commit fb18239

Please sign in to comment.