Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27199 Clean up error-prone findings in hbase-asyncfs #4620

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public interface AsyncFSOutput extends Closeable {
void close() throws IOException;

/**
* @return byteSize success synced to underlying filesystem.
* Returns the number of bytes successfully synced to underlying filesystem.
*/
long getSyncedLength();
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ private void completed(Channel channel) {
// this usually does not happen which means it is not on the critical path so make it synchronized
// so that the implementation will not burn up our brain as there are multiple state changes and
// checks.
@SuppressWarnings("FutureReturnValueIgnored")
private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
if (state == State.CLOSED) {
return;
Expand Down Expand Up @@ -317,6 +318,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
Expand Down Expand Up @@ -405,6 +407,7 @@ public DatanodeInfo[] getPipeline() {
return locations;
}

@SuppressWarnings("FutureReturnValueIgnored")
private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
long nextPacketOffsetInBlock, boolean syncBlock) {
int dataLen = dataBuf.readableBytes();
Expand Down Expand Up @@ -545,6 +548,7 @@ public CompletableFuture<Long> flush(boolean syncBlock) {
return future;
}

@SuppressWarnings("FutureReturnValueIgnored")
private void endBlock() throws IOException {
Preconditions.checkState(waitingAckQueue.isEmpty(),
"should call flush first before calling close");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -341,7 +340,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
});
}

private static void requestWriteBlock(Channel channel, StorageType storageType,
private static ChannelFuture requestWriteBlock(Channel channel, StorageType storageType,
OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
OpWriteBlockProto proto =
writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
Expand All @@ -351,18 +350,19 @@ private static void requestWriteBlock(Channel channel, StorageType storageType,
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
buffer.writeByte(Op.WRITE_BLOCK.code);
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
channel.writeAndFlush(buffer);
return channel.writeAndFlush(buffer);
}

private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
private static Promise<Void> initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
throws IOException {
Promise<Void> saslPromise = channel.eventLoop().newPromise();
trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
saslPromise.addListener(new FutureListener<Void>() {
return saslPromise.addListener(new FutureListener<Void>() {

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
// setup response processing pipeline first, then send request.
Expand All @@ -375,6 +375,7 @@ public void operationComplete(Future<Void> future) throws Exception {
});
}

@SuppressWarnings("FutureReturnValueIgnored")
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
Expand Down Expand Up @@ -451,6 +452,7 @@ private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
return new EnumSetWritable<>(EnumSet.copyOf(flags));
}

@SuppressWarnings("FutureReturnValueIgnored")
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
Expand Down Expand Up @@ -488,7 +490,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
DataChecksum summer = createChecksum(client);
locatedBlock = namenode.addBlock(src, client.getClientName(), null,
toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
IdentityHashMap<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
for (int i = 0, n = futureList.size(); i < n; i++) {
Expand Down Expand Up @@ -610,6 +612,7 @@ static void sleepIgnoreInterrupt(int retry) {
try {
Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
Expand Down Expand Up @@ -335,8 +336,9 @@ public SaslNegotiateHandler(Configuration conf, String username, char[] password
this.dfsClient = dfsClient;
}

private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
sendSaslMessage(ctx, payload, null);
private ChannelFuture sendSaslMessage(ChannelHandlerContext ctx, byte[] payload)
throws IOException {
return sendSaslMessage(ctx, payload, null);
}

private List<CipherOption> getCipherOptions() throws IOException {
Expand Down Expand Up @@ -432,7 +434,7 @@ static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder,
}
}

private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
private ChannelFuture sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
List<CipherOption> options) throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
Expand All @@ -448,10 +450,11 @@ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
size += CodedOutputStream.computeRawVarint32Size(size);
ByteBuf buf = ctx.alloc().buffer(size);
proto.writeDelimitedTo(new ByteBufOutputStream(buf));
ctx.write(buf);
return ctx.write(buf);
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
sendSaslMessage(ctx, new byte[0]);
Expand Down Expand Up @@ -546,8 +549,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (requestedQopContainsPrivacy()) {
cipherOptions = getCipherOptions();
}
sendSaslMessage(ctx, response, cipherOptions);
ChannelFuture sendFuture = sendSaslMessage(ctx, response, cipherOptions);
ctx.flush();
sendFuture.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a no no, we should not do any blocking operations in netty handler...

step++;
break;
}
Expand Down Expand Up @@ -635,6 +639,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof ByteBuf) {
Expand All @@ -647,6 +652,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void flush(ChannelHandlerContext ctx) throws Exception {
if (cBuf.isReadable()) {
byte[] b = new byte[cBuf.readableBytes()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public ProtobufDecoder(Object prototype) {
}
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
int length = msg.readableBytes();
byte[] array;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public static void setUp() throws Exception {
}

@AfterClass
public static void tearDown() throws IOException, InterruptedException {
public static void tearDown() throws Exception {
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
EVENT_LOOP_GROUP.shutdownGracefully().sync().get();
}
shutdownMiniDFSCluster();
}
Expand Down Expand Up @@ -254,15 +254,15 @@ public void testExcludeFailedConnectToDatanode()
}

@Test
public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
public void testWriteLargeChunk() throws Exception {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
byte[] b = new byte[50 * 1024 * 1024];
Bytes.random(b);
out.write(b);
out.flush(false);
out.flush(false).get();
assertEquals(b.length, out.flush(false).get().longValue());
out.close();
assertEquals(b.length, FS.getFileStatus(f).getLen());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -103,12 +102,12 @@ public static void setUp() throws Exception {
}

@AfterClass
public static void tearDown() throws IOException, InterruptedException {
public static void tearDown() throws Exception {
if (OUT != null) {
OUT.recoverAndClose(null);
}
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
EVENT_LOOP_GROUP.shutdownGracefully().sync().get();
}
shutdownMiniDFSCluster();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public class TestLocalAsyncOutput {
private static StreamSlowMonitor MONITOR;

@AfterClass
public static void tearDownAfterClass() throws IOException {
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.cleanupTestDir();
GROUP.shutdownGracefully();
GROUP.shutdownGracefully().get();
MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void testNotOverwrite() throws IOException {
}

@Test
@SuppressWarnings("MissingFail")
public void testOverwrite() throws IOException {
Path file = new Path("/" + name.getMethodName());
FSDataOutputStream out1 = FS.create(file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ public static void setUpBeforeClass() throws Exception {
}

@AfterClass
public static void tearDownAfterClass() throws IOException, InterruptedException {
public static void tearDownAfterClass() throws Exception {
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
EVENT_LOOP_GROUP.shutdownGracefully().sync().get();
}
if (KDC != null) {
KDC.stop();
Expand Down