Skip to content

Commit

Permalink
revert BlockDataServerHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Dec 20, 2016
1 parent 5f5a5e0 commit 1b48dfd
Showing 1 changed file with 8 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* This class handles {@link RPCBlockReadRequest}s and {@link RPCBlockWriteRequest}s.
*/
@NotThreadSafe
final public class BlockDataServerHandler {
final class BlockDataServerHandler {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);

/** The Block Worker which handles blocks stored in the Alluxio storage of the worker. */
Expand All @@ -58,7 +58,7 @@ final public class BlockDataServerHandler {
/** An object storing the mapping of tier aliases to ordinals. */
private final StorageTierAssoc mStorageTierAssoc;

public BlockDataServerHandler(BlockWorker worker) {
BlockDataServerHandler(BlockWorker worker) {
mWorker = worker;
mStorageTierAssoc = new WorkerStorageTierAssoc();
mTransferType = Configuration
Expand All @@ -74,9 +74,9 @@ public BlockDataServerHandler(BlockWorker worker) {
* @param req The initiating {@link RPCBlockReadRequest}
* @throws IOException if an I/O error occurs when reading the data requested
*/
public void handleBlockReadRequest(final ChannelHandlerContext ctx, RPCBlockReadRequest req)
void handleBlockReadRequest(final ChannelHandlerContext ctx, final RPCBlockReadRequest req)
throws IOException {
long blockId = req.getBlockId();
final long blockId = req.getBlockId();
final long offset = req.getOffset();
final long len = req.getLength();
final long lockId = req.getLockId();
Expand All @@ -103,47 +103,11 @@ public void handleBlockReadRequest(final ChannelHandlerContext ctx, RPCBlockRead
LOG.error("Exception reading block {}", blockId, e);
RPCBlockReadResponse resp;
if (e instanceof BlockDoesNotExistException) {
resp = RPCBlockReadResponse.createErrorResponse(req, RPCResponse.Status.FILE_DNE);
} else {
resp = RPCBlockReadResponse.createErrorResponse(req, RPCResponse.Status.UFS_READ_FAILED);
}
ChannelFuture future = ctx.writeAndFlush(resp);
future.addListener(ChannelFutureListener.CLOSE);
if (reader != null) {
reader.close();
}
}
}

private void handleBlockReadRequestInternal(ChannelHandlerContext ctx, RPCBlockReadRequest req, BlockReader reader) {
long blockId = req.getBlockId();
long offset = req.getOffset();
long len = req.getLength();
long lockId = req.getLockId();
long sessionId = req.getSessionId();

DataBuffer buffer;
try {
req.validate();
final long fileLength = reader.getLength();
validateBounds(req, fileLength);
final long readLength = returnLength(offset, len, fileLength);
buffer = getDataBuffer(req, reader, readLength);
Metrics.BYTES_READ_REMOTE.inc(buffer.getLength());
RPCBlockReadResponse resp =
new RPCBlockReadResponse(blockId, offset, readLength, buffer, RPCResponse.Status.SUCCESS);
ChannelFuture future = ctx.writeAndFlush(resp);
future.addListener(new ClosableResourceChannelListener(reader));
future.addListener(new ReleasableResourceChannelListener(buffer));
mWorker.accessBlock(sessionId, blockId);
LOG.debug("Preparation for responding to remote block request for: {} done.", blockId);
} catch (Exception e) {
LOG.error("Exception reading block {}", blockId, e);
RPCBlockReadResponse resp;
if (e instanceof BlockDoesNotExistException) {
resp = RPCBlockReadResponse.createErrorResponse(req, RPCResponse.Status.FILE_DNE);
resp =
RPCBlockReadResponse.createErrorResponse(req.getBlockId(), RPCResponse.Status.FILE_DNE);
} else {
resp = RPCBlockReadResponse.createErrorResponse(req, RPCResponse.Status.UFS_READ_FAILED);
resp = RPCBlockReadResponse
.createErrorResponse(req.getBlockId(), RPCResponse.Status.UFS_READ_FAILED);
}
ChannelFuture future = ctx.writeAndFlush(resp);
future.addListener(ChannelFutureListener.CLOSE);
Expand Down

0 comments on commit 1b48dfd

Please sign in to comment.