From 184867ed63286dc6f92d1caa5f62195028ece933 Mon Sep 17 00:00:00 2001 From: Bin Fan Date: Sun, 30 Jul 2017 17:38:31 -0700 Subject: [PATCH] [ALLUXIO-2977] Remove unnecessary AtomicBoolean usage --- .../alluxio/worker/netty/AbstractReadHandler.java | 13 ++++++------- .../alluxio/worker/netty/BlockWriteHandler.java | 14 +++++++++----- .../alluxio/worker/netty/UfsFileWriteHandler.java | 10 ++++++---- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/core/server/worker/src/main/java/alluxio/worker/netty/AbstractReadHandler.java b/core/server/worker/src/main/java/alluxio/worker/netty/AbstractReadHandler.java index f939976e37f2..519ab7162a42 100644 --- a/core/server/worker/src/main/java/alluxio/worker/netty/AbstractReadHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/netty/AbstractReadHandler.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -124,7 +123,7 @@ abstract class AbstractReadHandler private Error mError; /** This is set when the SUCCESS or CANCEL response is sent. This is only for sanity check. */ - private AtomicBoolean mDone = new AtomicBoolean(false); + private volatile boolean mDone; /** * A wrapper on an error used to pass error information from the netty I/O thread to the packet @@ -299,7 +298,7 @@ private void reset() { mCancel = false; mError = null; mRequest.set(null); - mDone.set(false); + mDone = false; } } @@ -509,8 +508,8 @@ private void replyError(AlluxioStatusException e) { * Writes a success response. */ private void replyEof() { - Preconditions.checkState(!mDone.get()); - mDone.set(true); + Preconditions.checkState(!mDone); + mDone = true; mChannel.writeAndFlush(RPCProtoMessage.createOkResponse(null)) .addListeners(ChannelFutureListener.CLOSE_ON_FAILURE); } @@ -519,8 +518,8 @@ private void replyEof() { * Writes a cancel response. */ private void replyCancel() { - Preconditions.checkState(!mDone.get()); - mDone.set(true); + Preconditions.checkState(!mDone); + mDone = true; mChannel.writeAndFlush(RPCProtoMessage.createCancelResponse()) .addListeners(ChannelFutureListener.CLOSE_ON_FAILURE); } diff --git a/core/server/worker/src/main/java/alluxio/worker/netty/BlockWriteHandler.java b/core/server/worker/src/main/java/alluxio/worker/netty/BlockWriteHandler.java index 9838c98b7d1a..90914725a26f 100644 --- a/core/server/worker/src/main/java/alluxio/worker/netty/BlockWriteHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/netty/BlockWriteHandler.java @@ -167,8 +167,9 @@ protected BlockWriteRequest createRequest(RPCProtoMessage msg) throws Exception @Override protected void completeRequest(Channel channel) throws Exception { BlockWriteRequest request = getRequest(); - Preconditions.checkState(request != null); - + if (request == null) { + return; + } if (request.getContext().getBlockWriter() != null) { request.getContext().getBlockWriter().close(); } @@ -178,8 +179,9 @@ protected void completeRequest(Channel channel) throws Exception { @Override protected void cancelRequest() throws Exception { BlockWriteRequest request = getRequest(); - Preconditions.checkState(request != null); - + if (request == null) { + return; + } if (request.getContext().getBlockWriter() != null) { request.getContext().getBlockWriter().close(); } @@ -189,7 +191,9 @@ protected void cancelRequest() throws Exception { @Override protected void cleanupRequest() throws Exception { BlockWriteRequest request = getRequest(); - Preconditions.checkState(request != null); + if (request == null) { + return; + } mWorker.cleanupSession(request.getSessionId()); } diff --git a/core/server/worker/src/main/java/alluxio/worker/netty/UfsFileWriteHandler.java b/core/server/worker/src/main/java/alluxio/worker/netty/UfsFileWriteHandler.java index fe6c8937b3f9..5163bd7593dc 100644 --- a/core/server/worker/src/main/java/alluxio/worker/netty/UfsFileWriteHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/netty/UfsFileWriteHandler.java @@ -180,8 +180,9 @@ protected UfsFileWriteRequest createRequest(RPCProtoMessage msg) throws Exceptio @Override protected void completeRequest(Channel channel) throws Exception { UfsFileWriteRequest request = getRequest(); - Preconditions.checkState(request != null); - + if (request == null) { + return; + } if (request.getContext().getOutputStream() == null) { createUfsFile(channel); } @@ -194,8 +195,9 @@ protected void completeRequest(Channel channel) throws Exception { @Override protected void cancelRequest() throws Exception { UfsFileWriteRequest request = getRequest(); - Preconditions.checkState(request != null); - + if (request == null) { + return; + } // TODO(calvin): Consider adding cancel to the ufs stream api. if (request.getContext().getOutputStream() != null && request.getContext().getUnderFileSystem() != null) {