diff --git a/pom.xml b/pom.xml
index 788c6d0..787f9de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
org.sourcelab
kafka-connect-client
- 3.1.2
+ 3.1.3-SNAPSHOT
jar
@@ -386,4 +386,4 @@
-
\ No newline at end of file
+
diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java
index c889bd4..804d141 100644
--- a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java
+++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java
@@ -30,6 +30,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
@@ -85,7 +86,15 @@ public class HttpClientRestClient implements RestClient {
*/
private CloseableHttpClient httpClient;
- private HttpClientContext httpClientContext;
+ /**
+ * The AuthCache used when creating the HttpClientContext.
+ */
+ private AuthCache authCache;
+
+ /**
+ * The CredentialsProvider used when creating the HttpClientContext.
+ */
+ private CredentialsProvider credsProvider;
/**
* Constructor.
@@ -121,13 +130,10 @@ public void init(final Configuration configuration) {
requestConfigBuilder.setConnectTimeout(configuration.getRequestTimeoutInSeconds() * 1_000);
// Define our Credentials Provider
- final CredentialsProvider credsProvider = new BasicCredentialsProvider();
-
- // Define our context
- httpClientContext = HttpClientContext.create();
+ credsProvider = new BasicCredentialsProvider();
// Define our auth cache
- final AuthCache authCache = new BasicAuthCache();
+ authCache = new BasicAuthCache();
// If we have a configured proxy host
if (configuration.getProxyHost() != null) {
@@ -180,10 +186,6 @@ public void init(final Configuration configuration) {
}
}
- // Configure context.
- httpClientContext.setAuthCache(authCache);
- httpClientContext.setCredentialsProvider(credsProvider);
-
// Attach Credentials provider to client builder.
clientBuilder.setDefaultCredentialsProvider(credsProvider);
@@ -272,7 +274,7 @@ private T submitGetRequest(final String url, final Map getPa
logger.debug("Executing request {}", get.getRequestLine());
// Execute and return
- return httpClient.execute(get, responseHandler, httpClientContext);
+ return execute(get, responseHandler);
} catch (final ClientProtocolException | SocketException | URISyntaxException | SSLHandshakeException connectionException) {
// Typically this is a connection or certificate issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -305,7 +307,7 @@ private T submitPostRequest(final String url, final Object requestBody, fina
logger.debug("Executing request {} with {}", post.getRequestLine(), jsonPayloadStr);
// Execute and return
- return httpClient.execute(post, responseHandler, httpClientContext);
+ return execute(post, responseHandler);
} catch (final ClientProtocolException | SocketException | SSLHandshakeException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -337,7 +339,7 @@ private T submitPutRequest(final String url, final Object requestBody, final
logger.debug("Executing request {} with {}", put.getRequestLine(), jsonPayloadStr);
// Execute and return
- return httpClient.execute(put, responseHandler, httpClientContext);
+ return execute(put, responseHandler);
} catch (final ClientProtocolException | SocketException | SSLHandshakeException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -368,7 +370,7 @@ private T submitDeleteRequest(final String url, final Object requestBody, fi
logger.debug("Executing request {} with {}", delete.getRequestLine(), jsonPayloadStr);
// Execute and return
- return httpClient.execute(delete, responseHandler, httpClientContext);
+ return execute(delete, responseHandler);
} catch (final ClientProtocolException | SocketException | SSLHandshakeException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -378,6 +380,18 @@ private T submitDeleteRequest(final String url, final Object requestBody, fi
}
}
+ /**
+ * Creates an HttpClientContext and executes the HTTP request.
+ *
+ * @param request The request to execute
+ * @param responseHandler The response Handler to use to parse the response
+ * @param The type that ResponseHandler returns.
+ * @return Parsed response.
+ */
+ private T execute(HttpUriRequest request, ResponseHandler responseHandler) throws IOException {
+ return httpClient.execute(request, responseHandler, createHttpClientContext());
+ }
+
/**
* Internal helper method for generating URLs w/ the appropriate API host and API version.
* @param endPoint The end point you want to hit.
@@ -386,4 +400,18 @@ private T submitDeleteRequest(final String url, final Object requestBody, fi
private String constructApiUrl(final String endPoint) {
return configuration.getApiHost() + endPoint;
}
+
+ /**
+ * Creates a new HttpClientContext with the authCache and credsProvider.
+ * @return the created HttpClientContext.
+ */
+ private HttpClientContext createHttpClientContext() {
+ // Define our context
+ HttpClientContext httpClientContext = HttpClientContext.create();
+ // Configure context.
+ httpClientContext.setAuthCache(authCache);
+ httpClientContext.setCredentialsProvider(credsProvider);
+
+ return httpClientContext;
+ }
}
diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClientTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClientTest.java
index cf63cca..18c32b7 100644
--- a/src/test/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClientTest.java
+++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClientTest.java
@@ -17,6 +17,10 @@
package org.sourcelab.kafka.connect.apiclient.rest;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -26,10 +30,16 @@
import testserver.TestHttpServer;
import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class HttpClientRestClientTest {
@@ -103,6 +113,48 @@ protected HttpClientBuilder createHttpClientBuilder() {
verify(builderMock).build();
}
+ /**
+ * Test that the every request uses a new HttpClientContext.
+ */
+ @Test
+ public void doHttp_verifyNewHttpContextOnEveryRequest() throws IOException {
+ AtomicReference firstContext = new AtomicReference<>();
+ CloseableHttpClient httpClientMock = mock(CloseableHttpClient.class);
+ when(httpClientMock.execute(any(HttpUriRequest.class), any(ResponseHandler.class), any(HttpClientContext.class)))
+ .then(invocation -> {
+ // Store the context of first request
+ HttpClientContext context = invocation.getArgument(2);
+ firstContext.set(context);
+ return null;
+ })
+ .then(invocation -> {
+ // Compare the context of second request with the first context
+ HttpClientContext context = invocation.getArgument(2);
+ assertNotSame(context, firstContext.get());
+ return null;
+ });
+
+ // Create a mock builder and a rest client that uses the mock builder
+ final HttpClientBuilder builderMock = mock(HttpClientBuilder.class);
+ HttpClientRestClient restClient = new HttpClientRestClient() {
+ @Override
+ protected HttpClientBuilder createHttpClientBuilder() {
+ return builderMock;
+ }
+ };
+ when(builderMock.build()).thenReturn(httpClientMock);
+
+ // Init the rest client
+ final Configuration configuration = new Configuration("http://localhost:" + HTTP_PORT);
+ restClient.init(configuration);
+
+ restClient.submitRequest(new DummyRequest());
+ restClient.submitRequest(new DummyRequest());
+
+ verify(httpClientMock, times(2))
+ .execute(any(HttpUriRequest.class), any(ResponseHandler.class), any(HttpClientContext.class));
+ }
+
/**
* Test against Https server.
*/