Skip to content

Commit

Permalink
fix(elasticsearch): build in resilience against IO exceptions on http…
Browse files Browse the repository at this point in the history
…client (#6680)

* fix(elasticsearch): build in resilience against IO exceptions on http client
  • Loading branch information
RyanHolstien committed Dec 14, 2022
1 parent b7a2ce5 commit f85fd15
Showing 1 changed file with 83 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,28 @@

import com.linkedin.gms.factory.auth.AwsRequestSigningApacheInterceptor;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import java.io.IOException;
import javax.annotation.Nonnull;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.util.PublicSuffixMatcherLoader;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -70,106 +84,104 @@ public class RestHighLevelClientFactory {

@Bean(name = "elasticSearchRestHighLevelClient")
@Nonnull
protected RestHighLevelClient createInstance() {
RestClientBuilder restClientBuilder;
if (useSSL) {
restClientBuilder = loadRestHttpsClient(host, port, pathPrefix, threadCount, connectionRequestTimeout, sslContext, username,
password, opensearchUseAwsIamAuth, region);
} else {
restClientBuilder = loadRestHttpClient(host, port, pathPrefix, threadCount, connectionRequestTimeout, username,
password, opensearchUseAwsIamAuth, region);
}
public RestHighLevelClient createInstance(RestClientBuilder restClientBuilder) {

return new RestHighLevelClient(restClientBuilder);
}

@Nonnull
private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, String pathPrefix, int threadCount,
int connectionRequestTimeout) {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
@Bean
public RestClientBuilder loadRestClient() {
final RestClientBuilder builder = createBuilder(useSSL ? "https" : "http");

if (!StringUtils.isEmpty(pathPrefix)) {
builder.setPathPrefix(pathPrefix);
}

builder.setRequestConfigCallback(
requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout));
builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
if (useSSL) {
httpAsyncClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier());
}
try {
httpAsyncClientBuilder.setConnectionManager(createConnectionManager());
} catch (IOReactorException e) {
throw new IllegalStateException("Unable to start ElasticSearch client. Please verify connection configuration.");
}
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build());

return builder;
}
setCredentials(httpAsyncClientBuilder);

@Nonnull
private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, String pathPrefix, int threadCount,
int connectionRequestTimeout, String username, String password, boolean opensearchUseAwsIamAuth, String region) {
RestClientBuilder builder = loadRestHttpClient(host, port, pathPrefix, threadCount, connectionRequestTimeout);

builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build());

if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
if (opensearchUseAwsIamAuth) {
HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region);
httpAsyncClientBuilder.addInterceptorLast(interceptor);
}

return httpAsyncClientBuilder;
}
return httpAsyncClientBuilder;
});

return builder;
}

@Nonnull
private static RestClientBuilder loadRestHttpsClient(@Nonnull String host, int port, String pathPrefix, int threadCount,
int connectionRequestTimeout, @Nonnull SSLContext sslContext, String username, String password,
boolean opensearchUseAwsIamAuth, String region) {

final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "https"));
private RestClientBuilder createBuilder(String scheme) {
final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, scheme));

if (!StringUtils.isEmpty(pathPrefix)) {
builder.setPathPrefix(pathPrefix);
}

builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build());

if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
} else if (opensearchUseAwsIamAuth) {
HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region);
httpAsyncClientBuilder.addInterceptorLast(interceptor);
}

return httpAsyncClientBuilder;
}
});

builder.setRequestConfigCallback(
requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout));

return builder;
}

private static HttpRequestInterceptor getAwsRequestSigningInterceptor(String region) {
/**
* Needed to override ExceptionHandler behavior for cases where IO error would have put client in unrecoverable state
* We don't utilize system properties in the client builder, so setting defaults pulled from
* {@link HttpAsyncClientBuilder#build()}.
* @return
*/
private NHttpClientConnectionManager createConnectionManager() throws IOReactorException {
SSLContext sslContext = SSLContexts.createDefault();
HostnameVerifier hostnameVerifier = new DefaultHostnameVerifier(PublicSuffixMatcherLoader.getDefault());
SchemeIOSessionStrategy sslStrategy =
new SSLIOSessionStrategy(sslContext, null, null, hostnameVerifier);

IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(threadCount).build();
DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
IOReactorExceptionHandler ioReactorExceptionHandler = new IOReactorExceptionHandler() {
@Override
public boolean handle(IOException ex) {
log.error("IO Exception caught during ElasticSearch connection.", ex);
return true;
}

@Override
public boolean handle(RuntimeException ex) {
log.error("Runtime Exception caught during ElasticSearch connection.", ex);
return true;
}
};
ioReactor.setExceptionHandler(ioReactorExceptionHandler);

return new PoolingNHttpClientConnectionManager(ioReactor,
RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", sslStrategy)
.build());
}

private void setCredentials(HttpAsyncClientBuilder httpAsyncClientBuilder) {
if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
if (opensearchUseAwsIamAuth) {
HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region);
httpAsyncClientBuilder.addInterceptorLast(interceptor);
}
}

private HttpRequestInterceptor getAwsRequestSigningInterceptor(String region) {

if (region == null) {
throw new NullPointerException("Region must not be null when opensearchUseAwsIamAuth is enabled");
throw new IllegalArgumentException("Region must not be null when opensearchUseAwsIamAuth is enabled");
}
Aws4Signer signer = Aws4Signer.create();
// Uses default AWS credentials
HttpRequestInterceptor interceptor = new AwsRequestSigningApacheInterceptor("es", signer,
return new AwsRequestSigningApacheInterceptor("es", signer,
DefaultCredentialsProvider.create(), region);
return interceptor;
}
}

0 comments on commit f85fd15

Please sign in to comment.