/
NettyConnectListener.java
executable file
·145 lines (119 loc) · 5.55 KB
/
NettyConnectListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.netty.channel;
import static org.asynchttpclient.util.AsyncHttpProviderUtils.getBaseUrl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.ConnectException;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.Request;
import org.asynchttpclient.handler.AsyncHandlerExtensions;
import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.future.StackTraceInspector;
import org.asynchttpclient.netty.request.NettyRequestSender;
import org.asynchttpclient.uri.Uri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Non Blocking connect.
*/
public final class NettyConnectListener<T> implements ChannelFutureListener {
private final static Logger LOGGER = LoggerFactory.getLogger(NettyConnectListener.class);
private final NettyRequestSender requestSender;
private final NettyResponseFuture<T> future;
private final ChannelManager channelManager;
private final boolean channelPreempted;
private final Object partitionKey;
public NettyConnectListener(NettyResponseFuture<T> future,//
NettyRequestSender requestSender,//
ChannelManager channelManager,//
boolean channelPreempted,//
Object partitionKey) {
this.future = future;
this.requestSender = requestSender;
this.channelManager = channelManager;
this.channelPreempted = channelPreempted;
this.partitionKey = partitionKey;
}
private void abortChannelPreemption() {
if (channelPreempted)
channelManager.abortChannelPreemption(partitionKey);
}
private void writeRequest(Channel channel) {
LOGGER.debug("Using non-cached Channel {} for {} '{}'",
channel,
future.getNettyRequest().getHttpRequest().getMethod(),
future.getNettyRequest().getHttpRequest().getUri());
Channels.setAttribute(channel, future);
if (future.isDone()) {
abortChannelPreemption();
return;
}
channelManager.registerOpenChannel(channel, partitionKey);
future.attachChannel(channel, false);
requestSender.writeRequest(future, channel);
}
private void onFutureSuccess(final Channel channel) throws Exception {
Request request = future.getRequest();
Uri uri = request.getUri();
// in case of proxy tunneling, we'll add the SslHandler later, after the CONNECT request
if (future.getProxyServer() == null && uri.isSecured()) {
SslHandler sslHandler = channelManager.addSslHandler(channel.pipeline(), uri, request.getVirtualHost());
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> handshakeFuture) throws Exception {
if (handshakeFuture.isSuccess()) {
final AsyncHandler<T> asyncHandler = future.getAsyncHandler();
if (asyncHandler instanceof AsyncHandlerExtensions)
AsyncHandlerExtensions.class.cast(asyncHandler).onSslHandshakeCompleted();
writeRequest(channel);
} else {
onFutureFailure(channel, handshakeFuture.cause());
}
}
});
} else {
writeRequest(channel);
}
}
private void onFutureFailure(Channel channel, Throwable cause) {
abortChannelPreemption();
boolean canRetry = future.canRetry();
LOGGER.debug("Trying to recover from failing to connect channel {} with a retry value of {} ", channel, canRetry);
if (canRetry//
&& cause != null//
&& (future.getState() != NettyResponseFuture.STATE.NEW || StackTraceInspector.recoverOnNettyDisconnectException(cause))) {
if (requestSender.retry(future)) {
return;
}
}
LOGGER.debug("Failed to recover from connect exception: {} with channel {}", cause, channel);
boolean printCause = cause != null && cause.getMessage() != null;
String printedCause = printCause ? cause.getMessage() : getBaseUrl(future.getUri());
ConnectException e = new ConnectException(printedCause);
if (cause != null)
e.initCause(cause);
future.abort(e);
}
public final void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess())
onFutureSuccess(f.channel());
else
onFutureFailure(f.channel(), f.cause());
}
}