Skip to content

Commit

Permalink
HBASE-28101 Addendum do not throw EOFException out directly (#5431)
Browse files Browse the repository at this point in the history
Signed-off-by: Nihal Jain <nihaljain@apache.org>
(cherry picked from commit 4b76a95)
  • Loading branch information
Apache9 committed Sep 24, 2023
1 parent 3ddadcf commit 8f17aa0
Showing 1 changed file with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,37 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}

private void finishCall(ResponseHeader responseHeader, ByteBufInputStream in, Call call)
throws IOException {
Message value;
if (call.responseDefaultType != null) {
Message.Builder builder = call.responseDefaultType.newBuilderForType();
if (!builder.mergeDelimitedFrom(in)) {
// The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF
// before reading any bytes out, so here we need to manually finish create the EOFException
// and finish the call
call.setException(new EOFException("EOF while reading response with type: "
+ call.responseDefaultType.getClass().getName()));
return;
}
value = builder.build();
} else {
value = null;
}
CellScanner cellBlockScanner;
if (responseHeader.hasCellBlockMeta()) {
int size = responseHeader.getCellBlockMeta().getLength();
// Maybe we could read directly from the ByteBuf.
// The problem here is that we do not know when to release it.
byte[] cellBlock = new byte[size];
in.readFully(cellBlock);
cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
} else {
cellBlockScanner = null;
}
call.setResponse(value, cellBlockScanner);
}

private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {
int totalSize = buf.readInt();
ByteBufInputStream in = new ByteBufInputStream(buf);
Expand Down Expand Up @@ -166,31 +197,17 @@ private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOExcep
call.setException(remoteExc);
return;
}
Message value;
if (call.responseDefaultType != null) {
Message.Builder builder = call.responseDefaultType.newBuilderForType();
if (!builder.mergeDelimitedFrom(in)) {
// The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF
// before reading any bytes out, so here we need to manually throw the EOFException out
throw new EOFException(
"EOF while reading response with type: " + call.responseDefaultType.getClass().getName());
}
value = builder.build();
} else {
value = null;
}
CellScanner cellBlockScanner;
if (responseHeader.hasCellBlockMeta()) {
int size = responseHeader.getCellBlockMeta().getLength();
// Maybe we could read directly from the ByteBuf.
// The problem here is that we do not know when to release it.
byte[] cellBlock = new byte[size];
buf.readBytes(cellBlock);
cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
} else {
cellBlockScanner = null;
try {
finishCall(responseHeader, in, call);
} catch (IOException e) {
// As the call has been removed from id2Call map, if we hit an exception here, the
// exceptionCaught method can not help us finish the call, so here we need to catch the
// exception and finish it
// And in netty, the decoding the frame based, when reaching here we have already read a full
// frame, so hitting exception here does not mean the stream decoding is broken, thus we do
// not need to throw the exception out and close the connection.
call.setException(e);
}
call.setResponse(value, cellBlockScanner);
}

@Override
Expand Down

0 comments on commit 8f17aa0

Please sign in to comment.