Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.sourcelab</groupId>
<artifactId>kafka-connect-client</artifactId>
<version>3.1.2</version>
<version>3.1.3-SNAPSHOT</version>
<packaging>jar</packaging>

<!-- Require Maven 3.3.9 -->
Expand Down Expand Up @@ -386,4 +386,4 @@
</build>
</profile>
</profiles>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
Expand Down Expand Up @@ -85,7 +86,15 @@ public class HttpClientRestClient implements RestClient {
*/
private CloseableHttpClient httpClient;

private HttpClientContext httpClientContext;
/**
* The AuthCache used when creating the HttpClientContext.
*/
private AuthCache authCache;

/**
* The CredentialsProvider used when creating the HttpClientContext.
*/
private CredentialsProvider credsProvider;

/**
* Constructor.
Expand Down Expand Up @@ -121,13 +130,10 @@ public void init(final Configuration configuration) {
requestConfigBuilder.setConnectTimeout(configuration.getRequestTimeoutInSeconds() * 1_000);

// Define our Credentials Provider
final CredentialsProvider credsProvider = new BasicCredentialsProvider();

// Define our context
httpClientContext = HttpClientContext.create();
credsProvider = new BasicCredentialsProvider();

// Define our auth cache
final AuthCache authCache = new BasicAuthCache();
authCache = new BasicAuthCache();

// If we have a configured proxy host
if (configuration.getProxyHost() != null) {
Expand Down Expand Up @@ -180,10 +186,6 @@ public void init(final Configuration configuration) {
}
}

// Configure context.
httpClientContext.setAuthCache(authCache);
httpClientContext.setCredentialsProvider(credsProvider);

// Attach Credentials provider to client builder.
clientBuilder.setDefaultCredentialsProvider(credsProvider);

Expand Down Expand Up @@ -272,7 +274,7 @@ private <T> T submitGetRequest(final String url, final Map<String, String> getPa
logger.debug("Executing request {}", get.getRequestLine());

// Execute and return
return httpClient.execute(get, responseHandler, httpClientContext);
return execute(get, responseHandler);
} catch (final ClientProtocolException | SocketException | URISyntaxException | SSLHandshakeException connectionException) {
// Typically this is a connection or certificate issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
Expand Down Expand Up @@ -305,7 +307,7 @@ private <T> T submitPostRequest(final String url, final Object requestBody, fina
logger.debug("Executing request {} with {}", post.getRequestLine(), jsonPayloadStr);

// Execute and return
return httpClient.execute(post, responseHandler, httpClientContext);
return execute(post, responseHandler);
} catch (final ClientProtocolException | SocketException | SSLHandshakeException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
Expand Down Expand Up @@ -337,7 +339,7 @@ private <T> T submitPutRequest(final String url, final Object requestBody, final
logger.debug("Executing request {} with {}", put.getRequestLine(), jsonPayloadStr);

// Execute and return
return httpClient.execute(put, responseHandler, httpClientContext);
return execute(put, responseHandler);
} catch (final ClientProtocolException | SocketException | SSLHandshakeException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
Expand Down Expand Up @@ -368,7 +370,7 @@ private <T> T submitDeleteRequest(final String url, final Object requestBody, fi
logger.debug("Executing request {} with {}", delete.getRequestLine(), jsonPayloadStr);

// Execute and return
return httpClient.execute(delete, responseHandler, httpClientContext);
return execute(delete, responseHandler);
} catch (final ClientProtocolException | SocketException | SSLHandshakeException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
Expand All @@ -378,6 +380,18 @@ private <T> T submitDeleteRequest(final String url, final Object requestBody, fi
}
}

/**
* Creates an HttpClientContext and executes the HTTP request.
*
* @param request The request to execute
* @param responseHandler The response Handler to use to parse the response
* @param <T> The type that ResponseHandler returns.
* @return Parsed response.
*/
private <T> T execute(HttpUriRequest request, ResponseHandler<T> responseHandler) throws IOException {
return httpClient.execute(request, responseHandler, createHttpClientContext());
}

/**
* Internal helper method for generating URLs w/ the appropriate API host and API version.
* @param endPoint The end point you want to hit.
Expand All @@ -386,4 +400,18 @@ private <T> T submitDeleteRequest(final String url, final Object requestBody, fi
private String constructApiUrl(final String endPoint) {
return configuration.getApiHost() + endPoint;
}

/**
* Creates a new HttpClientContext with the authCache and credsProvider.
* @return the created HttpClientContext.
*/
private HttpClientContext createHttpClientContext() {
// Define our context
HttpClientContext httpClientContext = HttpClientContext.create();
// Configure context.
httpClientContext.setAuthCache(authCache);
httpClientContext.setCredentialsProvider(credsProvider);

return httpClientContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.sourcelab.kafka.connect.apiclient.rest;

import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -26,10 +30,16 @@
import testserver.TestHttpServer;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class HttpClientRestClientTest {

Expand Down Expand Up @@ -103,6 +113,48 @@ protected HttpClientBuilder createHttpClientBuilder() {
verify(builderMock).build();
}

/**
* Test that the every request uses a new HttpClientContext.
*/
@Test
public void doHttp_verifyNewHttpContextOnEveryRequest() throws IOException {
AtomicReference<HttpClientContext> firstContext = new AtomicReference<>();
CloseableHttpClient httpClientMock = mock(CloseableHttpClient.class);
when(httpClientMock.execute(any(HttpUriRequest.class), any(ResponseHandler.class), any(HttpClientContext.class)))
.then(invocation -> {
// Store the context of first request
HttpClientContext context = invocation.getArgument(2);
firstContext.set(context);
return null;
})
.then(invocation -> {
// Compare the context of second request with the first context
HttpClientContext context = invocation.getArgument(2);
assertNotSame(context, firstContext.get());
return null;
});

// Create a mock builder and a rest client that uses the mock builder
final HttpClientBuilder builderMock = mock(HttpClientBuilder.class);
HttpClientRestClient restClient = new HttpClientRestClient() {
@Override
protected HttpClientBuilder createHttpClientBuilder() {
return builderMock;
}
};
when(builderMock.build()).thenReturn(httpClientMock);

// Init the rest client
final Configuration configuration = new Configuration("http://localhost:" + HTTP_PORT);
restClient.init(configuration);

restClient.submitRequest(new DummyRequest());
restClient.submitRequest(new DummyRequest());

verify(httpClientMock, times(2))
.execute(any(HttpUriRequest.class), any(ResponseHandler.class), any(HttpClientContext.class));
}

/**
* Test against Https server.
*/
Expand Down