Skip to content

Commit

Permalink
Async (#3)
Browse files Browse the repository at this point in the history
* 增加HttpClientFactory.buildAsync
* Async的proxy模式暂时无法实现
  • Loading branch information
dbstarll committed Mar 20, 2023
1 parent 8010d36 commit 56996e2
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package io.github.dbstarll.utils.http.client;

import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.client5.http.SystemDefaultDnsResolver;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.impl.InMemoryDnsResolver;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactoryBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;

Expand Down Expand Up @@ -133,14 +140,9 @@ public final CloseableHttpClient build(final Consumer<HttpClientBuilder>... cons

private HttpClientConnectionManager buildConnectionManager() {
final PoolingHttpClientConnectionManagerBuilder builder = PoolingHttpClientConnectionManagerBuilder.create()
.setDefaultConnectionConfig(ConnectionConfig.custom()
.setSocketTimeout(socketTimeout)
.setConnectTimeout(connectTimeout)
.build());
.setDefaultConnectionConfig(buildConnectionConfig());
if (sslContext != null) {
builder.setSSLSocketFactory(SSLConnectionSocketFactoryBuilder.create()
.setSslContext(sslContext)
.build());
builder.setSSLSocketFactory(SSLConnectionSocketFactoryBuilder.create().setSslContext(sslContext).build());
}
if (proxy != null && proxy.type() == Type.SOCKS) {
builder.setSSLSocketFactory(new ProxyConnectionSocketFactory(sslContext, proxy, resolveFromProxy));
Expand All @@ -151,7 +153,51 @@ private HttpClientConnectionManager buildConnectionManager() {
return builder.build();
}

private static class FakeDnsResolver extends SystemDefaultDnsResolver {
/**
* 构造CloseableHttpAsyncClient.
*
* @param consumers 用于对HttpAsyncClientBuilder的自定义
* @return CloseableHttpAsyncClient
*/
@SafeVarargs
public final CloseableHttpAsyncClient buildAsync(final Consumer<HttpAsyncClientBuilder>... consumers) {
final HttpAsyncClientBuilder builder = HttpAsyncClients.custom()
.setConnectionManager(buildConnectionManagerAsync())
.setIOReactorConfig(buildIOReactorConfig());
if (retryStrategy != null) {
builder.setRetryStrategy(retryStrategy);
} else if (!automaticRetries) {
builder.disableAutomaticRetries();
}
Arrays.stream(consumers).forEach(c -> c.accept(builder));
return builder.build();
}

private AsyncClientConnectionManager buildConnectionManagerAsync() {
final PoolingAsyncClientConnectionManagerBuilder builder = PoolingAsyncClientConnectionManagerBuilder.create()
.setDefaultConnectionConfig(buildConnectionConfig());
if (sslContext != null) {
builder.setTlsStrategy(ClientTlsStrategyBuilder.create().setSslContext(sslContext).build());
}
if (proxy != null && proxy.type() == Type.SOCKS && resolveFromProxy) {
builder.setDnsResolver(new FakeDnsResolver());
}
return builder.build();
}

private ConnectionConfig buildConnectionConfig() {
return ConnectionConfig.custom().setSocketTimeout(socketTimeout).setConnectTimeout(connectTimeout).build();
}

private IOReactorConfig buildIOReactorConfig() {
final IOReactorConfig.Builder builder = IOReactorConfig.custom().setSoTimeout(socketTimeout);
if (proxy != null && proxy.type() == Type.SOCKS) {
builder.setSocksProxyAddress(proxy.address());
}
return builder.build();
}

private static class FakeDnsResolver extends InMemoryDnsResolver {
@Override
public InetAddress[] resolve(final String host) throws UnknownHostException {
// Return some fake DNS record for every request, we won't be using it
Expand All @@ -163,8 +209,7 @@ private static class ProxyConnectionSocketFactory extends SSLConnectionSocketFac
private final Proxy proxy;
private final boolean resolveFromProxy;

ProxyConnectionSocketFactory(final SSLContext sslContext, final Proxy proxy,
final boolean resolveFromProxy) {
ProxyConnectionSocketFactory(final SSLContext sslContext, final Proxy proxy, final boolean resolveFromProxy) {
super(sslContext != null ? sslContext : SSLContexts.createDefault());
this.proxy = proxy;
this.resolveFromProxy = resolveFromProxy;
Expand All @@ -175,7 +220,6 @@ public Socket createSocket(final HttpContext context) {
return new Socket(proxy);
}


@Override
public Socket connectSocket(final Socket socket, final HttpHost host, final InetSocketAddress remoteAddress,
final InetSocketAddress localAddress, final Timeout connectTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
import okhttp3.mockwebserver.MockWebServer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.classic.BasicHttpClientResponseHandler;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
Expand Down Expand Up @@ -39,6 +45,7 @@
import org.junit.jupiter.api.function.ThrowingConsumer;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
import java.io.IOException;
import java.math.BigInteger;
import java.net.Authenticator;
Expand All @@ -56,6 +63,8 @@
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -75,6 +84,9 @@ class HttpClientFactoryTest {
private static final String PROXY_HOST = "proxy.y1cloud.com";
private static final int PROXY_PORT = 33031;

private String proxyUsername;
private String proxyPassword;

@BeforeEach
void setUp() {
Authenticator.setDefault(getAuthenticator());
Expand All @@ -85,12 +97,12 @@ private Authenticator getAuthenticator() {
if (StringUtils.isNotBlank(proxyAuth)) {
final int split = proxyAuth.indexOf(':');
if (split > 0) {
final String userName = proxyAuth.substring(0, split);
final char[] password = proxyAuth.substring(split + 1).toCharArray();
proxyUsername = proxyAuth.substring(0, split);
proxyPassword = proxyAuth.substring(split + 1);
return new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(userName, password);
return new PasswordAuthentication(proxyUsername, proxyPassword.toCharArray());
}
};
}
Expand Down Expand Up @@ -165,6 +177,19 @@ void http() throws Throwable {
});
}

@Test
void httpAsync() throws Throwable {
useServer(server -> {
try (CloseableHttpAsyncClient client = new HttpClientFactory().setAutomaticRetries(false)
.buildAsync(HttpAsyncClientBuilder::disableAutomaticRetries)) {
client.start();
final SimpleHttpRequest request = SimpleRequestBuilder.get(server.url("/ping.html").uri()).build();
final Future<SimpleHttpResponse> future = client.execute(request, null);
assertEquals("ok", future.get().getBodyText());
}
});
}

@Test
void https() throws Throwable {
final SecureRandom random = SecurityFactory.builder(SecureRandomAlgorithm.SHA1PRNG).build();
Expand Down Expand Up @@ -193,6 +218,36 @@ void https() throws Throwable {
});
}

@Test
void httpsAsync() throws Throwable {
final SecureRandom random = SecurityFactory.builder(SecureRandomAlgorithm.SHA1PRNG).build();
final KeyPair keyPair = genKeyPair(random);
final ContentSigner signer = signer(keyPair.getPrivate(), random);
final char[] password = "changeit".toCharArray();

final X500Name subject = new X500NameBuilder().addRDN(BCStyle.CN, "localhost").build();
final PKCS10CertificationRequest csr = csr(subject, keyPair.getPublic(), signer);
final X509Certificate crt = crt(csr, subject, signer, random);

final KeyStore keyStore = SecurityFactory.builder(KeyStoreAlgorithm.JKS).load(null, null).build();
keyStore.setKeyEntry("localhost", keyPair.getPrivate(), password, new X509Certificate[]{crt});

useServer(server -> {
final SSLContext sslContext = SSLContextBuilder.create().loadTrustMaterial(keyStore, null)
.setSecureRandom(random).build();
try (CloseableHttpAsyncClient client = new HttpClientFactory().setSslContext(sslContext).buildAsync()) {
client.start();
final SimpleHttpRequest request = SimpleRequestBuilder.get(server.url("/ping.html").uri()).build();
final Future<SimpleHttpResponse> future = client.execute(request, null);
assertEquals("ok", future.get().getBodyText());
}
}, s -> {
final SSLContext sslContext = SSLContextBuilder.create().loadKeyMaterial(keyStore, password)
.setSecureRandom(random).build();
s.useHttps(sslContext.getSocketFactory(), false);
});
}

@Test
void proxy() throws Throwable {
try (CloseableHttpClient client = new HttpClientFactory().setSocketTimeout(5000).setConnectTimeout(5000)
Expand All @@ -202,6 +257,22 @@ void proxy() throws Throwable {
}
}

@Test
void proxyAsync() throws Throwable {
try (CloseableHttpAsyncClient client = new HttpClientFactory().setSocketTimeout(5000).setConnectTimeout(5000)
.setProxy(HttpClientFactory.proxy(Type.SOCKS, PROXY_HOST, PROXY_PORT))
.buildAsync()) {
client.start();
final SimpleHttpRequest request = SimpleRequestBuilder.get("https://static.y1cloud.com/ping.html").build();
final Future<SimpleHttpResponse> future = client.execute(request, null);
// assertEquals("ok\n", future.get().getBodyText());
// Async模式下的proxy还有问题
final ExecutionException e = assertThrowsExactly(ExecutionException.class, future::get);
assertNotNull(e.getCause());
assertSame(SSLHandshakeException.class, e.getCause().getClass());
}
}

@Test
void proxyWithContext() throws Throwable {
try (CloseableHttpClient client = new HttpClientFactory().setSocketTimeout(5000).setConnectTimeout(5000)
Expand All @@ -221,6 +292,17 @@ void proxyDirect() throws Throwable {
}
}

@Test
void proxyDirectAsync() throws Throwable {
try (CloseableHttpAsyncClient client = new HttpClientFactory()
.setProxy(HttpClientFactory.proxy(Type.DIRECT, PROXY_HOST, PROXY_PORT)).buildAsync()) {
client.start();
final SimpleHttpRequest request = SimpleRequestBuilder.get("https://static.y1cloud.com/ping.html").build();
final Future<SimpleHttpResponse> future = client.execute(request, null);
assertEquals("ok\n", future.get().getBodyText());
}
}

@Test
void resolveFromProxy() throws Throwable {
try (CloseableHttpClient client = new HttpClientFactory().setSocketTimeout(5000).setConnectTimeout(5000)
Expand Down Expand Up @@ -252,6 +334,28 @@ public boolean retryRequest(HttpRequest request, IOException exception, int exec
}
}

@Test
void retryAsync() throws Throwable {
final AtomicInteger retry = new AtomicInteger();
final HttpRequestRetryStrategy retryHandler = new DefaultHttpRequestRetryStrategy(3, TimeValue.ofSeconds(1L)) {
@Override
public boolean retryRequest(HttpRequest request, IOException exception, int execCount, HttpContext context) {
retry.incrementAndGet();
return super.retryRequest(request, exception, execCount, context);
}
};
try (CloseableHttpAsyncClient client = new HttpClientFactory().setRetryStrategy(retryHandler)
.setResolveFromProxy(true).setProxy(HttpClientFactory.proxy(Type.SOCKS, PROXY_HOST, 1080))
.buildAsync()) {
client.start();
final SimpleHttpRequest request = SimpleRequestBuilder.get("https://static.y1cloud.com/ping.html").build();
final ExecutionException e = assertThrowsExactly(ExecutionException.class, () -> client.execute(request, null).get());
assertNotNull(e.getCause());
assertSame(ConnectTimeoutException.class, e.getCause().getClass());
assertEquals(1, retry.get());
}
}

private static KeyPair genKeyPair(final SecureRandom random) throws InstanceException, NoSuchAlgorithmException {
return SecurityFactory.builder(KeyPairGeneratorAlgorithm.RSA).keySize(2048, random).build().genKeyPair();
}
Expand Down
19 changes: 19 additions & 0 deletions src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<configuration>
<!--定义日志的格式-->
<property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}:%L - %msg%n"/>

<!--定义输出到控制台的appender-->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${LOG_PATTERN}</pattern>
</encoder>
</appender>

<!--定义日志级别-->
<root level="info">
<appender-ref ref="STDOUT"/>
</root>

<!--定义自定义package-->
<logger name="io.github.dbstarll.utils.http.client" level="debug"/>
</configuration>

0 comments on commit 56996e2

Please sign in to comment.