Skip to content

Commit

Permalink
add client config callbck for ES rest client
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese authored and Coduz committed Jun 27, 2022
1 parent 9f13be6 commit 8526768
Showing 1 changed file with 60 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.eclipse.kapua.commons.metric.MetricServiceFactory;
import org.eclipse.kapua.commons.metric.MetricsService;
import org.eclipse.kapua.commons.util.log.ConfigurationPrinter;
import org.eclipse.kapua.service.elasticsearch.client.ElasticsearchClientProvider;
import org.eclipse.kapua.service.elasticsearch.client.ModelContext;
Expand Down Expand Up @@ -71,10 +77,14 @@
*/
public class RestElasticsearchClientProvider implements ElasticsearchClientProvider<RestElasticsearchClient> {

private static final Logger LOG = LoggerFactory.getLogger(RestElasticsearchClientProvider.class);
static final Logger LOG = LoggerFactory.getLogger(RestElasticsearchClientProvider.class);

private static final String PROVIDER_CANNOT_CLOSE_CLIENT_MSG = "Cannot close ElasticSearch REST client. Client is already closed or not initialized";

// metrics
private Counter exceptionCount;
private Counter runtimeExceptionCount;

private RestElasticsearchClient restElasticsearchClient;
private RestClient internalElasticsearchRestClient;

Expand All @@ -85,6 +95,12 @@ public class RestElasticsearchClientProvider implements ElasticsearchClientProvi
private static Counter clientReconnectCallCounter;
private ScheduledExecutorService reconnectExecutorTask;

public RestElasticsearchClientProvider() {
MetricsService metricService = MetricServiceFactory.getInstance();
exceptionCount = metricService.getCounter("datastore", "rest_client", "error", "count");
runtimeExceptionCount = metricService.getCounter("datastore", "rest_client", "runtime_error", "count");
}

@Override
public RestElasticsearchClientProvider init() throws ClientProviderInitException {
synchronized (RestElasticsearchClientProvider.class) {
Expand Down Expand Up @@ -281,6 +297,7 @@ private RestClient initClient() throws ClientInitializationException {
throw new ClientInitializationException(e, "Error while parsing node addresses!");
}

boolean setCallback = false;
// Init internal Elasticseatch client
RestClientBuilder restClientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0]));
if (sslEnabled) {
Expand All @@ -290,7 +307,8 @@ private RestClient initClient() throws ClientInitializationException {
initTrustStore(sslBuilder);

SSLContext sslContext = sslBuilder.build();
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setSSLContext(sslContext));
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> customizeHttpClient(httpClientBuilder.setSSLContext(sslContext)));
setCallback = true;
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new ClientInitializationException(e, "Failed to build SSLContext");
}
Expand All @@ -301,8 +319,15 @@ private RestClient initClient() throws ClientInitializationException {
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> customizeHttpClient(httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
setCallback = true;
}

//issue #
if (!setCallback) {
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> customizeHttpClient(httpClientBuilder));
}
// restClientBuilder.setFailureListener(new RestElasticsearchFailureListener());

RestClient restClient = restClientBuilder.build();

Expand All @@ -318,6 +343,37 @@ private RestClient initClient() throws ClientInitializationException {
return restClient;
}

private HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
try {
DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {

@Override
public boolean handle(IOException e) {
exceptionCount.inc();
LOG.warn("IOReactor encountered a checked exception: {}", e.getMessage(), e);
//return true to note this exception as handled, it will not be re-thrown
return true;
}

@Override
public boolean handle(RuntimeException e) {
runtimeExceptionCount.inc();
LOG.warn("IOReactor encountered a runtime exception: {}", e.getMessage(), e);
//return true to note this exception as handled, it will not be re-thrown
return true;
}
});

httpClientBuilder.setConnectionManager(new PoolingNHttpClientConnectionManager(ioReactor));
}
catch (IOReactorException e) {
throw new RuntimeException(e);
}

return httpClientBuilder;
}

@Override
public ElasticsearchClientProvider<RestElasticsearchClient> withClientConfiguration(ElasticsearchClientConfiguration elasticsearchClientConfiguration) {
this.elasticsearchClientConfiguration = elasticsearchClientConfiguration;
Expand Down Expand Up @@ -460,4 +516,4 @@ private void initTrustStore(SSLContextBuilder sslBuilder) throws ClientInitializ
throw new ClientInitializationException(ltme, "Failed to init TrustStore");
}
}
}
}

0 comments on commit 8526768

Please sign in to comment.