Skip to content

Commit

Permalink
nit
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Nov 30, 2017
1 parent 8a15b62 commit 38877ae
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 121 deletions.
Expand Up @@ -43,18 +43,46 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


/** /**
* A {@link Future} that can be used to track when an asynchronous HTTP request has been fully processed. * A {@link Future} that can be used to track when an asynchronous HTTP request
* has been fully processed.
* *
* @param <V> the result type * @param <V>
* the result type
*/ */
public final class NettyResponseFuture<V> implements ListenableFuture<V> { public final class NettyResponseFuture<V> implements ListenableFuture<V> {


private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class); private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);


@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "redirectCount"); private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "redirectCount");
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "currentRetry"); private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "currentRetry");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "isDone");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "isCancelled");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "inAuth");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "inProxyAuth");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "contentProcessed");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "onThrowableCalled");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater
.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater
.newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");


private final long start = unpreciseMillisTime(); private final long start = unpreciseMillisTime();
private final ChannelPoolPartitioning connectionPoolPartitioning; private final ChannelPoolPartitioning connectionPoolPartitioning;
Expand All @@ -79,26 +107,6 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
// partition key, when != null used to release lock in ChannelManager // partition key, when != null used to release lock in ChannelManager
private volatile Object partitionKeyLock; private volatile Object partitionKeyLock;


@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isDoneField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isCancelledField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isCancelled");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> inAuthField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inAuth");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> inProxyAuthField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inProxyAuth");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> contentProcessedField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "contentProcessed");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> onThrowableCalledField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class,
"onThrowableCalled");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> timeoutsHolderField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class,
TimeoutsHolder.class, "timeoutsHolder");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> partitionKeyLockField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class,
Object.class, "partitionKeyLock");

// volatile where we need CAS ops // volatile where we need CAS ops
private volatile int redirectCount = 0; private volatile int redirectCount = 0;
private volatile int currentRetry = 0; private volatile int currentRetry = 0;
Expand Down Expand Up @@ -159,7 +167,7 @@ public Object takePartitionKeyLock() {
return null; return null;
} }


return partitionKeyLockField.getAndSet(this, null); return PARTITION_KEY_LOCK_FIELD.getAndSet(this, null);
} }


// java.util.concurrent.Future // java.util.concurrent.Future
Expand All @@ -179,7 +187,7 @@ public boolean cancel(boolean force) {
releasePartitionKeyLock(); releasePartitionKeyLock();
cancelTimeouts(); cancelTimeouts();


if (isCancelledField.getAndSet(this, 1) != 0) if (IS_CANCELLED_FIELD.getAndSet(this, 1) != 0)
return false; return false;


// cancel could happen before channel was attached // cancel could happen before channel was attached
Expand All @@ -188,7 +196,7 @@ public boolean cancel(boolean force) {
Channels.silentlyCloseChannel(channel); Channels.silentlyCloseChannel(channel);
} }


if (onThrowableCalledField.getAndSet(this, 1) == 0) { if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
try { try {
asyncHandler.onThrowable(new CancellationException()); asyncHandler.onThrowable(new CancellationException());
} catch (Throwable t) { } catch (Throwable t) {
Expand Down Expand Up @@ -221,11 +229,11 @@ private V getContent() throws ExecutionException {


// No more retry // No more retry
CURRENT_RETRY_UPDATER.set(this, maxRetry); CURRENT_RETRY_UPDATER.set(this, maxRetry);
if (contentProcessedField.getAndSet(this, 1) == 0) { if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {
try { try {
future.complete(asyncHandler.onCompleted()); future.complete(asyncHandler.onCompleted());
} catch (Throwable ex) { } catch (Throwable ex) {
if (onThrowableCalledField.getAndSet(this, 1) == 0) { if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
try { try {
try { try {
asyncHandler.onThrowable(ex); asyncHandler.onThrowable(ex);
Expand All @@ -249,7 +257,7 @@ private boolean terminateAndExit() {
cancelTimeouts(); cancelTimeouts();
this.channel = null; this.channel = null;
this.reuseChannel = false; this.reuseChannel = false;
return isDoneField.getAndSet(this, 1) != 0 || isCancelled != 0; return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
} }


public final void done() { public final void done() {
Expand All @@ -276,7 +284,7 @@ public final void abort(final Throwable t) {


future.completeExceptionally(t); future.completeExceptionally(t);


if (onThrowableCalledField.compareAndSet(this, 0, 1)) { if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {
try { try {
asyncHandler.onThrowable(t); asyncHandler.onThrowable(t);
} catch (Throwable te) { } catch (Throwable te) {
Expand Down Expand Up @@ -323,7 +331,7 @@ public void setAsyncHandler(AsyncHandler<V> asyncHandler) {
} }


public void cancelTimeouts() { public void cancelTimeouts() {
TimeoutsHolder ref = timeoutsHolderField.getAndSet(this, null); TimeoutsHolder ref = TIMEOUTS_HOLDER_FIELD.getAndSet(this, null);
if (ref != null) { if (ref != null) {
ref.cancel(); ref.cancel();
} }
Expand Down Expand Up @@ -362,11 +370,11 @@ public int incrementAndGetCurrentRedirectCount() {
} }


public void setTimeoutsHolder(TimeoutsHolder timeoutsHolder) { public void setTimeoutsHolder(TimeoutsHolder timeoutsHolder) {
timeoutsHolderField.set(this, timeoutsHolder); TIMEOUTS_HOLDER_FIELD.set(this, timeoutsHolder);
} }


public TimeoutsHolder getTimeoutsHolder() { public TimeoutsHolder getTimeoutsHolder() {
return timeoutsHolderField.get(this); return TIMEOUTS_HOLDER_FIELD.get(this);
} }


public boolean isInAuth() { public boolean isInAuth() {
Expand All @@ -378,7 +386,7 @@ public void setInAuth(boolean inAuth) {
} }


public boolean isAndSetInAuth(boolean set) { public boolean isAndSetInAuth(boolean set) {
return inAuthField.getAndSet(this, set ? 1 : 0) != 0; return IN_AUTH_FIELD.getAndSet(this, set ? 1 : 0) != 0;
} }


public boolean isInProxyAuth() { public boolean isInProxyAuth() {
Expand All @@ -390,7 +398,7 @@ public void setInProxyAuth(boolean inProxyAuth) {
} }


public boolean isAndSetInProxyAuth(boolean inProxyAuth) { public boolean isAndSetInProxyAuth(boolean inProxyAuth) {
return inProxyAuthField.getAndSet(this, inProxyAuth ? 1 : 0) != 0; return IN_PROXY_AUTH_FIELD.getAndSet(this, inProxyAuth ? 1 : 0) != 0;
} }


public ChannelState getChannelState() { public ChannelState getChannelState() {
Expand Down Expand Up @@ -473,21 +481,24 @@ public void setCurrentRequest(Request currentRequest) {
} }


/** /**
* Return true if the {@link Future} can be recovered. There is some scenario where a connection can be closed by an unexpected IOException, and in some situation we can * Return true if the {@link Future} can be recovered. There is some scenario
* recover from that exception. * where a connection can be closed by an unexpected IOException, and in some
* situation we can recover from that exception.
* *
* @return true if that {@link Future} cannot be recovered. * @return true if that {@link Future} cannot be recovered.
*/ */
public boolean isReplayPossible() { public boolean isReplayPossible() {
return !isDone() && !(Channels.isChannelValid(channel) && !getUri().getScheme().equalsIgnoreCase("https")) && inAuth == 0 && inProxyAuth == 0; return !isDone() && !(Channels.isChannelActive(channel) && !getUri().getScheme().equalsIgnoreCase("https"))
&& inAuth == 0 && inProxyAuth == 0;
} }


public long getStart() { public long getStart() {
return start; return start;
} }


public Object getPartitionKey() { public Object getPartitionKey() {
return connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(), proxyServer); return connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(),
proxyServer);
} }


public void acquirePartitionLockLazily() throws IOException { public void acquirePartitionLockLazily() throws IOException {
Expand All @@ -497,7 +508,7 @@ public void acquirePartitionLockLazily() throws IOException {


Object partitionKey = getPartitionKey(); Object partitionKey = getPartitionKey();
connectionSemaphore.acquireChannelLock(partitionKey); connectionSemaphore.acquireChannelLock(partitionKey);
Object prevKey = partitionKeyLockField.getAndSet(this, partitionKey); Object prevKey = PARTITION_KEY_LOCK_FIELD.getAndSet(this, partitionKey);
if (prevKey != null) { if (prevKey != null) {
// self-check // self-check


Expand Down Expand Up @@ -541,7 +552,7 @@ public String toString() {
",\n\turi=" + getUri() + // ",\n\turi=" + getUri() + //
",\n\tkeepAlive=" + keepAlive + // ",\n\tkeepAlive=" + keepAlive + //
",\n\tredirectCount=" + redirectCount + // ",\n\tredirectCount=" + redirectCount + //
",\n\ttimeoutsHolder=" + timeoutsHolderField.get(this) + // ",\n\ttimeoutsHolder=" + TIMEOUTS_HOLDER_FIELD.get(this) + //
",\n\tinAuth=" + inAuth + // ",\n\tinAuth=" + inAuth + //
",\n\tstatusReceived=" + statusReceived + // ",\n\tstatusReceived=" + statusReceived + //
",\n\ttouch=" + touch + // ",\n\ttouch=" + touch + //
Expand Down
Expand Up @@ -43,7 +43,7 @@ public static void setDiscard(Channel channel) {
setAttribute(channel, DiscardEvent.DISCARD); setAttribute(channel, DiscardEvent.DISCARD);
} }


public static boolean isChannelValid(Channel channel) { public static boolean isChannelActive(Channel channel) {
return channel != null && channel.isActive(); return channel != null && channel.isActive();
} }


Expand Down
Expand Up @@ -146,7 +146,6 @@ else if (request.getBodyGenerator() != null)
LOGGER.debug("Sending redirect to {}", newUri); LOGGER.debug("Sending redirect to {}", newUri);


if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(response)) { if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(response)) {

if (sameBase) { if (sameBase) {
future.setReuseChannel(true); future.setReuseChannel(true);
// we can't directly send the next request because we still have to received LastContent // we can't directly send the next request because we still have to received LastContent
Expand Down

0 comments on commit 38877ae

Please sign in to comment.