Skip to content

Commit

Permalink
Rename NettyResponseFuture.STATE into ChannelState
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Oct 26, 2015
1 parent d1ad887 commit f87c49c
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 17 deletions.
Expand Up @@ -36,6 +36,7 @@
import org.asynchttpclient.Request; import org.asynchttpclient.Request;
import org.asynchttpclient.channel.pool.ConnectionPoolPartitioning; import org.asynchttpclient.channel.pool.ConnectionPoolPartitioning;
import org.asynchttpclient.future.AbstractListenableFuture; import org.asynchttpclient.future.AbstractListenableFuture;
import org.asynchttpclient.netty.channel.ChannelState;
import org.asynchttpclient.netty.channel.Channels; import org.asynchttpclient.netty.channel.Channels;
import org.asynchttpclient.netty.request.NettyRequest; import org.asynchttpclient.netty.request.NettyRequest;
import org.asynchttpclient.netty.timeout.TimeoutsHolder; import org.asynchttpclient.netty.timeout.TimeoutsHolder;
Expand All @@ -53,10 +54,6 @@ public final class NettyResponseFuture<V> extends AbstractListenableFuture<V> {


private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class); private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);


public enum STATE {
NEW, POOLED, RECONNECTED, CLOSED,
}

private final long start = millisTime(); private final long start = millisTime();
private final ConnectionPoolPartitioning connectionPoolPartitioning; private final ConnectionPoolPartitioning connectionPoolPartitioning;
private final ProxyServer proxyServer; private final ProxyServer proxyServer;
Expand All @@ -72,7 +69,7 @@ public enum STATE {
private final AtomicBoolean inProxyAuth = new AtomicBoolean(false); private final AtomicBoolean inProxyAuth = new AtomicBoolean(false);
private final AtomicBoolean statusReceived = new AtomicBoolean(false); private final AtomicBoolean statusReceived = new AtomicBoolean(false);
private final AtomicLong touch = new AtomicLong(millisTime()); private final AtomicLong touch = new AtomicLong(millisTime());
private final AtomicReference<STATE> state = new AtomicReference<>(STATE.NEW); private final AtomicReference<ChannelState> channelState = new AtomicReference<>(ChannelState.NEW);
private final AtomicBoolean contentProcessed = new AtomicBoolean(false); private final AtomicBoolean contentProcessed = new AtomicBoolean(false);
private final AtomicInteger currentRetry = new AtomicInteger(0); private final AtomicInteger currentRetry = new AtomicInteger(0);
private final AtomicBoolean onThrowableCalled = new AtomicBoolean(false); private final AtomicBoolean onThrowableCalled = new AtomicBoolean(false);
Expand Down Expand Up @@ -348,12 +345,12 @@ public AtomicBoolean getInProxyAuth() {
return inProxyAuth; return inProxyAuth;
} }


public STATE getState() { public ChannelState getChannelState() {
return state.get(); return channelState.get();
} }


public void setState(STATE state) { public void setChannelState(ChannelState channelState) {
this.state.set(state); this.channelState.set(channelState);
} }


public boolean getAndSetStatusReceived(boolean sr) { public boolean getAndSetStatusReceived(boolean sr) {
Expand Down
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.netty.channel;

public enum ChannelState {
NEW, POOLED, RECONNECTED, CLOSED,
}
Expand Up @@ -119,7 +119,7 @@ private void onFutureFailure(Channel channel, Throwable cause) {
LOGGER.debug("Trying to recover from failing to connect channel {} with a retry value of {} ", channel, canRetry); LOGGER.debug("Trying to recover from failing to connect channel {} with a retry value of {} ", channel, canRetry);
if (canRetry// if (canRetry//
&& cause != null// && cause != null//
&& (future.getState() != NettyResponseFuture.STATE.NEW || StackTraceInspector.recoverOnNettyDisconnectException(cause))) { && (future.getChannelState() != ChannelState.NEW || StackTraceInspector.recoverOnNettyDisconnectException(cause))) {


if (requestSender.retry(future)) { if (requestSender.retry(future)) {
return; return;
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.asynchttpclient.netty.NettyResponseHeaders; import org.asynchttpclient.netty.NettyResponseHeaders;
import org.asynchttpclient.netty.NettyResponseStatus; import org.asynchttpclient.netty.NettyResponseStatus;
import org.asynchttpclient.netty.channel.ChannelManager; import org.asynchttpclient.netty.channel.ChannelManager;
import org.asynchttpclient.netty.channel.ChannelState;
import org.asynchttpclient.netty.channel.Channels; import org.asynchttpclient.netty.channel.Channels;
import org.asynchttpclient.netty.request.NettyRequestSender; import org.asynchttpclient.netty.request.NettyRequestSender;
import org.asynchttpclient.ntlm.NtlmEngine; import org.asynchttpclient.ntlm.NtlmEngine;
Expand Down Expand Up @@ -202,7 +203,7 @@ private boolean exitAfterHandling401(//
} }


// FIXME what's this??? // FIXME what's this???
future.setState(NettyResponseFuture.STATE.NEW); future.setChannelState(ChannelState.NEW);
HttpHeaders requestHeaders = new DefaultHttpHeaders().add(request.getHeaders()); HttpHeaders requestHeaders = new DefaultHttpHeaders().add(request.getHeaders());


switch (realm.getScheme()) { switch (realm.getScheme()) {
Expand Down Expand Up @@ -333,7 +334,7 @@ private boolean exitAfterHandling407(//
} }


// FIXME what's this??? // FIXME what's this???
future.setState(NettyResponseFuture.STATE.NEW); future.setChannelState(ChannelState.NEW);
HttpHeaders requestHeaders = new DefaultHttpHeaders().add(request.getHeaders()); HttpHeaders requestHeaders = new DefaultHttpHeaders().add(request.getHeaders());


switch (proxyRealm.getScheme()) { switch (proxyRealm.getScheme()) {
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.asynchttpclient.netty.Callback; import org.asynchttpclient.netty.Callback;
import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.channel.ChannelManager; import org.asynchttpclient.netty.channel.ChannelManager;
import org.asynchttpclient.netty.channel.ChannelState;
import org.asynchttpclient.netty.channel.Channels; import org.asynchttpclient.netty.channel.Channels;
import org.asynchttpclient.netty.channel.NettyConnectListener; import org.asynchttpclient.netty.channel.NettyConnectListener;
import org.asynchttpclient.netty.timeout.ReadTimeoutTimerTask; import org.asynchttpclient.netty.timeout.ReadTimeoutTimerTask;
Expand Down Expand Up @@ -207,7 +208,7 @@ private <T> ListenableFuture<T> sendRequestWithCachedChannel(Request request, Pr
if (asyncHandler instanceof AsyncHandlerExtensions) if (asyncHandler instanceof AsyncHandlerExtensions)
AsyncHandlerExtensions.class.cast(asyncHandler).onConnectionPooled(channel); AsyncHandlerExtensions.class.cast(asyncHandler).onConnectionPooled(channel);


future.setState(NettyResponseFuture.STATE.POOLED); future.setChannelState(ChannelState.POOLED);
future.attachChannel(channel, false); future.attachChannel(channel, false);


LOGGER.debug("Using cached Channel {} for {} '{}'", channel, future.getNettyRequest().getHttpRequest().getMethod(), future.getNettyRequest().getHttpRequest().getUri()); LOGGER.debug("Using cached Channel {} for {} '{}'", channel, future.getNettyRequest().getHttpRequest().getMethod(), future.getNettyRequest().getHttpRequest().getUri());
Expand Down Expand Up @@ -359,7 +360,7 @@ public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {
channelManager.closeChannel(channel); channelManager.closeChannel(channel);


if (!future.isDone()) { if (!future.isDone()) {
future.setState(NettyResponseFuture.STATE.CLOSED); future.setChannelState(ChannelState.CLOSED);
LOGGER.debug("Aborting Future {}\n", future); LOGGER.debug("Aborting Future {}\n", future);
LOGGER.debug(t.getMessage(), t); LOGGER.debug(t.getMessage(), t);
future.abort(t); future.abort(t);
Expand All @@ -380,7 +381,7 @@ public boolean retry(NettyResponseFuture<?> future) {
return false; return false;


if (future.canBeReplayed()) { if (future.canBeReplayed()) {
future.setState(NettyResponseFuture.STATE.RECONNECTED); future.setChannelState(ChannelState.RECONNECTED);
future.getAndSetStatusReceived(false); future.getAndSetStatusReceived(false);


LOGGER.debug("Trying to recover request {}\n", future.getNettyRequest().getHttpRequest()); LOGGER.debug("Trying to recover request {}\n", future.getNettyRequest().getHttpRequest());
Expand Down Expand Up @@ -467,7 +468,7 @@ public void replayRequest(final NettyResponseFuture<?> future, FilterContext fc,


Request newRequest = fc.getRequest(); Request newRequest = fc.getRequest();
future.setAsyncHandler(fc.getAsyncHandler()); future.setAsyncHandler(fc.getAsyncHandler());
future.setState(NettyResponseFuture.STATE.NEW); future.setChannelState(ChannelState.NEW);
future.touch(); future.touch();


LOGGER.debug("\n\nReplaying Request {}\n for Future {}\n", newRequest, future); LOGGER.debug("\n\nReplaying Request {}\n for Future {}\n", newRequest, future);
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.asynchttpclient.AsyncHandler; import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.handler.ProgressAsyncHandler; import org.asynchttpclient.handler.ProgressAsyncHandler;
import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.channel.ChannelState;
import org.asynchttpclient.netty.channel.Channels; import org.asynchttpclient.netty.channel.Channels;
import org.asynchttpclient.netty.future.StackTraceInspector; import org.asynchttpclient.netty.future.StackTraceInspector;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -49,7 +50,7 @@ public ProgressListener(AsyncHandler<?> asyncHandler,//


private boolean abortOnThrowable(Throwable cause, Channel channel) { private boolean abortOnThrowable(Throwable cause, Channel channel) {


if (cause != null && future.getState() != NettyResponseFuture.STATE.NEW) { if (cause != null && future.getChannelState() != ChannelState.NEW) {
if (cause instanceof IllegalStateException || cause instanceof ClosedChannelException || StackTraceInspector.recoverOnReadOrWriteException(cause)) { if (cause instanceof IllegalStateException || cause instanceof ClosedChannelException || StackTraceInspector.recoverOnReadOrWriteException(cause)) {
LOGGER.debug(cause.getMessage(), cause); LOGGER.debug(cause.getMessage(), cause);
Channels.silentlyCloseChannel(channel); Channels.silentlyCloseChannel(channel);
Expand Down

0 comments on commit f87c49c

Please sign in to comment.