diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java index b88b32bdb814..afb1987a591b 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java @@ -90,7 +90,7 @@ public interface AsyncFSOutput extends Closeable { void close() throws IOException; /** - * @return byteSize success synced to underlying filesystem. + * Returns the number of bytes successfully synced to underlying filesystem. */ long getSyncedLength(); } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index a43bed317345..e194fd30c7b1 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -234,6 +234,7 @@ private void completed(Channel channel) { // this usually does not happen which means it is not on the critical path so make it synchronized // so that the implementation will not burn up our brain as there are multiple state changes and // checks. + @SuppressWarnings("FutureReturnValueIgnored") private synchronized void failed(Channel channel, Supplier errorSupplier) { if (state == State.CLOSED) { return; @@ -317,6 +318,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } @Override + @SuppressWarnings("FutureReturnValueIgnored") public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; @@ -405,6 +407,7 @@ public DatanodeInfo[] getPipeline() { return locations; } + @SuppressWarnings("FutureReturnValueIgnored") private void flushBuffer(CompletableFuture future, ByteBuf dataBuf, long nextPacketOffsetInBlock, boolean syncBlock) { int dataLen = dataBuf.readableBytes(); @@ -545,6 +548,7 @@ public CompletableFuture flush(boolean syncBlock) { return future; } + @SuppressWarnings("FutureReturnValueIgnored") private void endBlock() throws IOException { Preconditions.checkState(waitingAckQueue.isEmpty(), "should call flush first before calling close"); diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 2517f2d2c01a..cce3cdd60d45 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -36,7 +36,6 @@ import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -341,7 +340,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E }); } - private static void requestWriteBlock(Channel channel, StorageType storageType, + private static ChannelFuture requestWriteBlock(Channel channel, StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { OpWriteBlockProto proto = writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build(); @@ -351,18 +350,19 @@ private static void requestWriteBlock(Channel channel, StorageType storageType, buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); buffer.writeByte(Op.WRITE_BLOCK.code); proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); - channel.writeAndFlush(buffer); + return channel.writeAndFlush(buffer); } - private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, + private static Promise initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, DFSClient client, Token accessToken, Promise promise) throws IOException { Promise saslPromise = channel.eventLoop().newPromise(); trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); - saslPromise.addListener(new FutureListener() { + return saslPromise.addListener(new FutureListener() { @Override + @SuppressWarnings("FutureReturnValueIgnored") public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { // setup response processing pipeline first, then send request. @@ -375,6 +375,7 @@ public void operationComplete(Future future) throws Exception { }); } + @SuppressWarnings("FutureReturnValueIgnored") private static List> connectToDataNodes(Configuration conf, DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, @@ -451,6 +452,7 @@ private static EnumSetWritable getCreateFlags(boolean overwrite) { return new EnumSetWritable<>(EnumSet.copyOf(flags)); } + @SuppressWarnings("FutureReturnValueIgnored") private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass, StreamSlowMonitor monitor) @@ -488,7 +490,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d DataChecksum summer = createChecksum(client); locatedBlock = namenode.addBlock(src, client.getClientName(), null, toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null); - Map datanodes = new IdentityHashMap<>(); + IdentityHashMap datanodes = new IdentityHashMap<>(); futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); for (int i = 0, n = futureList.size(); i < n; i++) { @@ -610,6 +612,7 @@ static void sleepIgnoreInterrupt(int retry) { try { Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping", e); } } } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index ee02d42d2d3b..2f35356cffb7 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -86,6 +86,7 @@ import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; @@ -335,8 +336,9 @@ public SaslNegotiateHandler(Configuration conf, String username, char[] password this.dfsClient = dfsClient; } - private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { - sendSaslMessage(ctx, payload, null); + private ChannelFuture sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) + throws IOException { + return sendSaslMessage(ctx, payload, null); } private List getCipherOptions() throws IOException { @@ -432,7 +434,7 @@ static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, } } - private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, + private ChannelFuture sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List options) throws IOException { DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto.newBuilder(); @@ -448,10 +450,11 @@ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, size += CodedOutputStream.computeRawVarint32Size(size); ByteBuf buf = ctx.alloc().buffer(size); proto.writeDelimitedTo(new ByteBufOutputStream(buf)); - ctx.write(buf); + return ctx.write(buf); } @Override + @SuppressWarnings("FutureReturnValueIgnored") public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); sendSaslMessage(ctx, new byte[0]); @@ -546,8 +549,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (requestedQopContainsPrivacy()) { cipherOptions = getCipherOptions(); } - sendSaslMessage(ctx, response, cipherOptions); + ChannelFuture sendFuture = sendSaslMessage(ctx, response, cipherOptions); ctx.flush(); + sendFuture.get(); step++; break; } @@ -635,6 +639,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override + @SuppressWarnings("FutureReturnValueIgnored") public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof ByteBuf) { @@ -647,6 +652,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } @Override + @SuppressWarnings("FutureReturnValueIgnored") public void flush(ChannelHandlerContext ctx) throws Exception { if (cBuf.isReadable()) { byte[] b = new byte[cBuf.readableBytes()]; diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java index a0b5cc00841b..0c4e15e63b1c 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java @@ -83,6 +83,7 @@ public ProtobufDecoder(Object prototype) { } } + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { int length = msg.readableBytes(); byte[] array; diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 26cbbe034a58..115df8a0fcbf 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -93,9 +93,9 @@ public static void setUp() throws Exception { } @AfterClass - public static void tearDown() throws IOException, InterruptedException { + public static void tearDown() throws Exception { if (EVENT_LOOP_GROUP != null) { - EVENT_LOOP_GROUP.shutdownGracefully().sync(); + EVENT_LOOP_GROUP.shutdownGracefully().sync().get(); } shutdownMiniDFSCluster(); } @@ -254,7 +254,7 @@ public void testExcludeFailedConnectToDatanode() } @Test - public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { + public void testWriteLargeChunk() throws Exception { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, @@ -262,7 +262,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec byte[] b = new byte[50 * 1024 * 1024]; Bytes.random(b); out.write(b); - out.flush(false); + out.flush(false).get(); assertEquals(b.length, out.flush(false).get().longValue()); out.close(); assertEquals(b.length, FS.getFileStatus(f).getLen()); diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java index 53fb37a8e0bc..82b17b77a2a3 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -103,12 +102,12 @@ public static void setUp() throws Exception { } @AfterClass - public static void tearDown() throws IOException, InterruptedException { + public static void tearDown() throws Exception { if (OUT != null) { OUT.recoverAndClose(null); } if (EVENT_LOOP_GROUP != null) { - EVENT_LOOP_GROUP.shutdownGracefully().sync(); + EVENT_LOOP_GROUP.shutdownGracefully().sync().get(); } shutdownMiniDFSCluster(); } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index cb936a4e7c65..d1ce128b118d 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -53,9 +53,9 @@ public class TestLocalAsyncOutput { private static StreamSlowMonitor MONITOR; @AfterClass - public static void tearDownAfterClass() throws IOException { + public static void tearDownAfterClass() throws Exception { TEST_UTIL.cleanupTestDir(); - GROUP.shutdownGracefully(); + GROUP.shutdownGracefully().get(); MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor"); } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java index 7a3a6de10f09..fcdb8dfbb3ad 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java @@ -83,6 +83,7 @@ public void testNotOverwrite() throws IOException { } @Test + @SuppressWarnings("MissingFail") public void testOverwrite() throws IOException { Path file = new Path("/" + name.getMethodName()); FSDataOutputStream out1 = FS.create(file); diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index cb5fb4006d3e..c63fde2cbfd1 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -193,9 +193,9 @@ public static void setUpBeforeClass() throws Exception { } @AfterClass - public static void tearDownAfterClass() throws IOException, InterruptedException { + public static void tearDownAfterClass() throws Exception { if (EVENT_LOOP_GROUP != null) { - EVENT_LOOP_GROUP.shutdownGracefully().sync(); + EVENT_LOOP_GROUP.shutdownGracefully().sync().get(); } if (KDC != null) { KDC.stop();