Skip to content
Permalink
Browse files
Custom certificate alias not being used when using HC5 AsyncHTTPCondu…
…it (#923)
  • Loading branch information
reta committed Mar 19, 2022
1 parent f7266bf commit be345311ffa68bc5138d7bc56d074534d9c5dd35
Showing 30 changed files with 3,099 additions and 66 deletions.
@@ -85,7 +85,6 @@
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.config.Registry;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.nio.ssl.BasicClientTlsStrategy;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http.protocol.HttpContext;
@@ -115,9 +114,11 @@ public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t, AsyncHT
this.factory = factory;
}

public synchronized CloseableHttpAsyncClient getHttpAsyncClient() throws IOException {
public synchronized CloseableHttpAsyncClient getHttpAsyncClient(final TlsStrategy tlsStrategy)
throws IOException {

if (client == null) {
client = factory.createClient(this);
client = factory.createClient(this, tlsStrategy);
}
if (client == null) {
throw new IOException("HttpAsyncClient is null");
@@ -523,9 +524,9 @@ public Credentials getCredentials(final AuthScope authscope, HttpContext context

ctx.setCredentialsProvider(credsProvider);

TlsStrategy tlsStrategy = null;
if ("https".equals(url.getScheme())) {
try {
RegistryBuilder<TlsStrategy> regBuilder = RegistryBuilder.<TlsStrategy>create();

// check tlsClientParameters from message header
TLSClientParameters tlsClientParameters = outMessage.get(TLSClientParameters.class);
@@ -538,31 +539,28 @@ public Credentials getCredentials(final AuthScope authscope, HttpContext context
final SSLContext sslcontext = getSSLContext(tlsClientParameters);
final HostnameVerifier verifier = org.apache.cxf.transport.https.SSLUtils
.getHostnameVerifier(tlsClientParameters);
regBuilder
.register("https",
new BasicClientTlsStrategy(
sslcontext,
new SSLSessionInitializer() {
@Override
public void initialize(NamedEndpoint endpoint, SSLEngine engine) {
initializeSSLEngine(sslcontext, engine);
}
},
new SSLSessionVerifier() {
@Override
public TlsDetails verify(NamedEndpoint endpoint, SSLEngine engine)
throws SSLException {
final SSLSession sslsession = engine.getSession();

if (!verifier.verify(endpoint.getHostName(), sslsession)) {
throw new SSLException("Could not verify host " + endpoint.getHostName());
}

setSSLSession(sslsession);
return new TlsDetails(sslsession, engine.getApplicationProtocol());
}

tlsStrategy = new BasicClientTlsStrategy(sslcontext,
new SSLSessionInitializer() {
@Override
public void initialize(NamedEndpoint endpoint, SSLEngine engine) {
initializeSSLEngine(sslcontext, engine);
}
},
new SSLSessionVerifier() {
@Override
public TlsDetails verify(NamedEndpoint endpoint, SSLEngine engine)
throws SSLException {
final SSLSession sslsession = engine.getSession();

if (!verifier.verify(endpoint.getHostName(), sslsession)) {
throw new SSLException("Could not verify host " + endpoint.getHostName());
}
)

setSSLSession(sslsession);
return new TlsDetails(sslsession, engine.getApplicationProtocol());
}
}
);
} catch (final GeneralSecurityException e) {
LOG.warning(e.getMessage());
@@ -580,7 +578,9 @@ public TlsDetails verify(NamedEndpoint endpoint, SSLEngine engine)
}

connectionFuture = new BasicFuture<>(callback);
final HttpAsyncClient c = getHttpAsyncClient();
// The HttpClientContext is not available in the AsyncClientConnectionOperator, so we have
// to provide our own TLS strategy on construction.
final HttpAsyncClient c = getHttpAsyncClient(tlsStrategy);
final Credentials creds = (Credentials)outMessage.getContextualProperty(Credentials.class.getName());
if (creds != null) {
credsProvider.setCredentials(new AnyAuthScope(), creds);
@@ -960,6 +960,14 @@ public void initializeSSLEngine(SSLContext sslcontext, SSLEngine sslengine) {
sslengine.setEnabledProtocols(p);
}
}

@Override
public void close() {
super.close();
if (factory != null) {
factory.close(this.getClient());
}
}

private String[] findProtocols(String p, String[] options) {
List<String> list = new ArrayList<>();
@@ -22,11 +22,14 @@
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

import org.apache.cxf.Bus;
import org.apache.cxf.buslifecycle.BusLifeCycleListener;
import org.apache.cxf.buslifecycle.BusLifeCycleManager;
import org.apache.cxf.common.injection.NoJSR250Annotations;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.SystemPropertyAction;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.HTTPConduit;
@@ -47,7 +50,7 @@
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.config.Registry;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http.protocol.HttpContext;
@@ -63,7 +66,6 @@
*/
@NoJSR250Annotations
public class AsyncHTTPConduitFactory implements HTTPConduitFactory {

//TCP related properties
public static final String TCP_NODELAY = "org.apache.cxf.transport.http.async.TCP_NODELAY";
public static final String SO_KEEPALIVE = "org.apache.cxf.transport.http.async.SO_KEEPALIVE";
@@ -84,6 +86,7 @@ public class AsyncHTTPConduitFactory implements HTTPConduitFactory {
//CXF specific
public static final String USE_POLICY = "org.apache.cxf.transport.http.async.usePolicy";

private static final Logger LOG = LogUtils.getL7dLogger(AsyncHTTPConduitFactory.class);

public enum UseAsyncPolicy {
ALWAYS, ASYNC_ONLY, NEVER;
@@ -109,9 +112,31 @@ public static UseAsyncPolicy getPolicy(Object st) {
return ASYNC_ONLY;
}
};

/**
* See please https://issues.apache.org/jira/browse/CXF-8678 and
* https://issues.apache.org/jira/browse/HTTPCLIENT-2209, the context propagation
* is necessary to remove per-HTTPClientPolicy caching.
*/
private static class AsyncClient {
private final PoolingAsyncClientConnectionManager connectionManager;
private final CloseableHttpAsyncClient client;

AsyncClient(PoolingAsyncClientConnectionManager connectionManager, CloseableHttpAsyncClient client) {
this.connectionManager = connectionManager;
this.client = client;
}

public CloseableHttpAsyncClient getClient() {
return client;
}

public PoolingAsyncClientConnectionManager getConnectionManager() {
return connectionManager;
}
}

private volatile PoolingAsyncClientConnectionManager connectionManager;
private volatile CloseableHttpAsyncClient client;
private volatile Map<HTTPClientPolicy, AsyncClient> clients = new ConcurrentHashMap<>();

private boolean isShutdown;
private UseAsyncPolicy policy;
@@ -147,19 +172,15 @@ public UseAsyncPolicy getUseAsyncPolicy() {
}

public void update(Map<String, Object> props) {
if (setProperties(props) && client != null) {
if (setProperties(props) && !clients.isEmpty()) {
restartReactor();
}
}

private void restartReactor() {
CloseableHttpAsyncClient client2 = client;
resetVars();
shutdown(client2);
}
private synchronized void resetVars() {
client = null;
connectionManager = null;
final Map<HTTPClientPolicy, AsyncClient> clients2 = clients;
clients = new ConcurrentHashMap<>();
shutdown(clients2);
}

private boolean setProperties(Map<String, Object> s) {
@@ -178,9 +199,12 @@ private boolean setProperties(Map<String, Object> s) {
connectionMaxIdle = getInt(s.get(CONNECTION_MAX_IDLE), connectionMaxIdle);
maxPerRoute = getInt(s.get(MAX_PER_HOST_CONNECTIONS), maxPerRoute);

if (connectionManager != null) {
connectionManager.setMaxTotal(maxConnections);
connectionManager.setDefaultMaxPerRoute(maxPerRoute);
if (!clients.isEmpty()) {
for (Map.Entry<HTTPClientPolicy, AsyncClient> entry: clients.entrySet()) {
final PoolingAsyncClientConnectionManager connectionManager = entry.getValue().getConnectionManager();
connectionManager.setMaxTotal(maxConnections);
connectionManager.setDefaultMaxPerRoute(maxPerRoute);
}
}

//properties that need a restart of the reactor
@@ -254,23 +278,28 @@ public HTTPConduit createConduit(Bus bus, EndpointInfo localInfo,
}

public void shutdown() {
if (client != null) {
shutdown(client);
connectionManager = null;
client = null;
}
shutdown(clients);
clients.clear();
isShutdown = true;
}

private static void shutdown(Map<HTTPClientPolicy, AsyncClient> clients) {
if (!clients.isEmpty()) {
for (Map.Entry<HTTPClientPolicy, AsyncClient> entry: clients.entrySet()) {
shutdown(entry.getValue().getClient());
entry.getValue().getConnectionManager().close();
}
}
}

private static void shutdown(CloseableHttpAsyncClient client) {
try {
client.close();
} catch (IOException e1) {
e1.printStackTrace();
} catch (IOException ex) {
LOG.warning(ex.getMessage());
}
}


private void addListener(Bus b) {
BusLifeCycleManager manager = b.getExtension(BusLifeCycleManager.class);
if (manager != null) {
@@ -286,11 +315,17 @@ public void postShutdown() {
}
}

public synchronized void setupNIOClient(HTTPClientPolicy clientPolicy) {
public synchronized void setupNIOClient(HTTPClientPolicy clientPolicy, final TlsStrategy tlsStrategy) {
final AsyncClient client = clients.get(clientPolicy);

if (client != null) {
return;
}


clients.computeIfAbsent(clientPolicy, key -> createNIOClient(key, tlsStrategy));
}

private AsyncClient createNIOClient(HTTPClientPolicy clientPolicy, final TlsStrategy tlsStrategy) {
final IOReactorConfig config = IOReactorConfig.custom()
.setIoThreadCount(ioThreadCount)
.setSelectInterval(TimeValue.ofMilliseconds(selectInterval))
@@ -300,12 +335,13 @@ public synchronized void setupNIOClient(HTTPClientPolicy clientPolicy) {
.setTcpNoDelay(tcpNoDelay)
.build();

final Registry<TlsStrategy> tlsStrategy = RegistryBuilder.<TlsStrategy>create()
.register("https", DefaultClientTlsStrategy.getSystemDefault())

final Lookup<TlsStrategy> tlsLookupStrategy = RegistryBuilder.<TlsStrategy>create()
.register("https", (tlsStrategy != null) ? tlsStrategy : DefaultClientTlsStrategy.getSystemDefault())
.build();

connectionManager = new PoolingAsyncClientConnectionManager(
tlsStrategy,
final PoolingAsyncClientConnectionManager connectionManager = new PoolingAsyncClientConnectionManager(
tlsLookupStrategy,
PoolConcurrencyPolicy.STRICT,
PoolReusePolicy.LIFO,
TimeValue.ofMilliseconds(connectionTTL),
@@ -338,25 +374,29 @@ public void addCookie(Cookie cookie) {

adaptClientBuilder(httpAsyncClientBuilder);

client = httpAsyncClientBuilder
.setIOReactorConfig(config)
.build();
final CloseableHttpAsyncClient client = httpAsyncClientBuilder
.setIOReactorConfig(config)
.build();

// Start the client thread
client.start();
//Always start the idle checker thread to validate pending requests and
//use the ConnectionMaxIdle to close the idle connection
new CloseIdleConnectionThread(connectionManager, client).start();

return new AsyncClient(connectionManager, client);
}

//provide a hook to customize the builder
protected void adaptClientBuilder(HttpAsyncClientBuilder httpAsyncClientBuilder) {
}

public CloseableHttpAsyncClient createClient(final AsyncHTTPConduit c) throws IOException {
if (client == null) {
setupNIOClient(c.getClient());
}
return client;
public CloseableHttpAsyncClient createClient(final AsyncHTTPConduit c, final TlsStrategy tlsStrategy)
throws IOException {

return clients
.computeIfAbsent(c.getClient(), key -> createNIOClient(key, tlsStrategy))
.getClient();
}

int getMaxConnections() {
@@ -395,4 +435,12 @@ public void run() {
}
}
}

public void close(HTTPClientPolicy clientPolicy) {
final AsyncClient client = clients.remove(clientPolicy);
if (client != null) {
shutdown(client.getClient());
client.getConnectionManager().close();
}
}
}
@@ -159,5 +159,15 @@
<artifactId>cglib-nodep</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

0 comments on commit be34531

Please sign in to comment.