From 81f1f9395967d59bc136e059ac90659912b62be5 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 30 Jan 2019 17:47:02 +0900 Subject: [PATCH 1/3] First pass add Basic-Auth capabilities to client --- .../connect/apiclient/Configuration.java | 32 ++++++++- .../connect/apiclient/KafkaConnectClient.java | 15 +++++ .../apiclient/rest/HttpClientRestClient.java | 66 ++++++++++++++++--- .../exceptions/InvalidRequestException.java | 2 + .../UnauthorizedRequestException.java | 17 +++++ .../apiclient/KafkaConnectClientTest.java | 5 ++ 6 files changed, 128 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java index 4419712..229c528 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java @@ -29,9 +29,13 @@ public final class Configuration { // Defines the URL/Hostname of Kafka-Connect private final String apiHost; - // Optional Connection settings + // Optional Connection options private int requestTimeoutInSeconds = 300; + // Optional BasicAuth options + private String basicAuthUsername = null; + private String basicAuthPassword = null; + // Optional SSL options private boolean ignoreInvalidSslCertificates = false; private File trustStoreFile = null; @@ -64,6 +68,19 @@ public Configuration(final String kafkaConnectHost) { } } + /** + * Allow setting http Basic-Authentication username and password to authenticate requests. + * + * @param username username to authenticate requests to Kafka-Connect with. + * @param password password to authenticate requests to Kafka-Connect with. + * @return Configuration instance. + */ + public Configuration useBasicAuth(final String username, final String password) { + this.basicAuthUsername = username; + this.basicAuthPassword = password; + return this; + } + /** * Allow setting optional proxy configuration. * @@ -168,6 +185,14 @@ public int getRequestTimeoutInSeconds() { return requestTimeoutInSeconds; } + public String getBasicAuthUsername() { + return basicAuthUsername; + } + + public String getBasicAuthPassword() { + return basicAuthPassword; + } + @Override public String toString() { final StringBuilder stringBuilder = new StringBuilder("Configuration{") @@ -191,6 +216,11 @@ public String toString() { stringBuilder.append(", sslTrustStorePassword='******'"); } } + if (basicAuthUsername != null) { + stringBuilder + .append(", basicAuthUsername='").append(basicAuthUsername).append('\'') + .append(", basicAuthPassword='******'"); + } stringBuilder.append('}'); return stringBuilder.toString(); diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java index 10f1b07..2f44aa1 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java @@ -17,6 +17,7 @@ package org.sourcelab.kafka.connect.apiclient; +import org.apache.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory; @@ -49,6 +50,7 @@ import org.sourcelab.kafka.connect.apiclient.rest.RestClient; import org.sourcelab.kafka.connect.apiclient.rest.RestResponse; import org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException; +import org.sourcelab.kafka.connect.apiclient.rest.exceptions.UnauthorizedRequestException; import java.io.IOException; import java.util.Collection; @@ -289,6 +291,19 @@ private T submitRequest(final Request request) { } } + // Server reject's client's authentication. + if (responseCode == HttpStatus.SC_UNAUTHORIZED) { + // Throw contextual error msg based on if credentials are configured or not. + String errorMsg; + if (configuration.getBasicAuthUsername() == null) { + errorMsg = "Server required authentication credentials but none were provided in client configuration."; + } else { + errorMsg = "Client authentication credentials (username=" + configuration.getBasicAuthUsername() + ") was rejected by server."; + } + errorMsg = errorMsg + " Server responded with: \"" + responseStr + "\""; + throw new UnauthorizedRequestException(errorMsg, responseCode); + } + // Attempt to parse error response try { final RequestErrorResponse errorResponse = JacksonFactory.newInstance().readValue(responseStr, RequestErrorResponse.class); diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java index 1db31a6..cbfc7ac 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java @@ -21,6 +21,7 @@ import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.AuthCache; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.CredentialsProvider; import org.apache.http.client.ResponseHandler; @@ -29,8 +30,11 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.utils.URIBuilder; import org.apache.http.entity.StringEntity; +import org.apache.http.impl.auth.BasicScheme; +import org.apache.http.impl.client.BasicAuthCache; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; @@ -45,8 +49,11 @@ import org.sourcelab.kafka.connect.apiclient.rest.handlers.RestResponseHandler; import java.io.IOException; +import java.net.MalformedURLException; import java.net.SocketException; +import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; @@ -78,6 +85,8 @@ public class HttpClientRestClient implements RestClient { */ private CloseableHttpClient httpClient; + private HttpClientContext httpClientContext; + /** * Constructor. */ @@ -109,6 +118,15 @@ public void init(final Configuration configuration) { // Define our RequestConfigBuilder final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + // Define our Credentials Provider + final CredentialsProvider credsProvider = new BasicCredentialsProvider(); + + // Define our context + httpClientContext = HttpClientContext.create(); + + // Define our auth cache + final AuthCache authCache = new BasicAuthCache(); + // If we have a configured proxy host if (configuration.getProxyHost() != null) { // Define proxy host @@ -120,21 +138,53 @@ public void init(final Configuration configuration) { // If we have proxy auth enabled if (configuration.getProxyUsername() != null) { - // Create credential provider - final CredentialsProvider credsProvider = new BasicCredentialsProvider(); + // Add proxy credentials credsProvider.setCredentials( new AuthScope(configuration.getProxyHost(), configuration.getProxyPort()), new UsernamePasswordCredentials(configuration.getProxyUsername(), configuration.getProxyPassword()) ); - // Attach Credentials provider to client builder. - clientBuilder.setDefaultCredentialsProvider(credsProvider); + // Preemptive load context with authentication. + authCache.put( + new HttpHost(configuration.getProxyHost(), configuration.getProxyPort(), configuration.getProxyScheme()), new BasicScheme() + ); } // Attach Proxy to request config builder requestConfigBuilder.setProxy(proxyHost); } + // If BasicAuth credentials are configured. + if (configuration.getBasicAuthUsername() != null) { + try { + // parse ApiHost for Hostname and port. + final URL apiUrl = new URL(configuration.getApiHost()); + + // Add Kafka-Connect credentials + credsProvider.setCredentials( + new AuthScope(apiUrl.getHost(), apiUrl.getPort()), + new UsernamePasswordCredentials( + configuration.getBasicAuthUsername(), + configuration.getBasicAuthPassword() + ) + ); + + // Preemptive load context with authentication. + authCache.put( + new HttpHost(apiUrl.getHost(), apiUrl.getPort(), apiUrl.getProtocol()), new BasicScheme() + ); + } catch (final MalformedURLException exception) { + throw new RuntimeException(exception.getMessage(), exception); + } + } + + // Configure context. + httpClientContext.setAuthCache(authCache); + httpClientContext.setCredentialsProvider(credsProvider); + + // Attach Credentials provider to client builder. + clientBuilder.setDefaultCredentialsProvider(credsProvider); + // Attach default request config clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); @@ -211,7 +261,7 @@ private T submitGetRequest(final String url, final Map getPa logger.debug("Executing request {}", get.getRequestLine()); // Execute and return - return httpClient.execute(get, responseHandler); + return httpClient.execute(get, responseHandler, httpClientContext); } catch (final ClientProtocolException | SocketException | URISyntaxException connectionException) { // Typically this is a connection issue. throw new ConnectionException(connectionException.getMessage(), connectionException); @@ -244,7 +294,7 @@ private T submitPostRequest(final String url, final Object requestBody, fina logger.debug("Executing request {} with {}", post.getRequestLine(), jsonPayloadStr); // Execute and return - return httpClient.execute(post, responseHandler); + return httpClient.execute(post, responseHandler, httpClientContext); } catch (final ClientProtocolException | SocketException connectionException) { // Typically this is a connection issue. throw new ConnectionException(connectionException.getMessage(), connectionException); @@ -276,7 +326,7 @@ private T submitPutRequest(final String url, final Object requestBody, final logger.debug("Executing request {} with {}", put.getRequestLine(), jsonPayloadStr); // Execute and return - return httpClient.execute(put, responseHandler); + return httpClient.execute(put, responseHandler, httpClientContext); } catch (final ClientProtocolException | SocketException connectionException) { // Typically this is a connection issue. throw new ConnectionException(connectionException.getMessage(), connectionException); @@ -307,7 +357,7 @@ private T submitDeleteRequest(final String url, final Object requestBody, fi logger.debug("Executing request {} with {}", delete.getRequestLine(), jsonPayloadStr); // Execute and return - return httpClient.execute(delete, responseHandler); + return httpClient.execute(delete, responseHandler, httpClientContext); } catch (final ClientProtocolException | SocketException connectionException) { // Typically this is a connection issue. throw new ConnectionException(connectionException.getMessage(), connectionException); diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/InvalidRequestException.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/InvalidRequestException.java index 3697227..2168be8 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/InvalidRequestException.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/InvalidRequestException.java @@ -64,6 +64,8 @@ public static InvalidRequestException factory(final RequestErrorResponse errorRe Objects.requireNonNull(errorResponse, "Invalid RequestErrorResponse parameter, must not be null"); switch (errorResponse.getErrorCode()) { + case 401: + return new UnauthorizedRequestException(errorResponse.getMessage(), errorResponse.getErrorCode()); case 404: return new ResourceNotFoundException(errorResponse.getMessage()); case 409: diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java new file mode 100644 index 0000000..7d7db10 --- /dev/null +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java @@ -0,0 +1,17 @@ +package org.sourcelab.kafka.connect.apiclient.rest.exceptions; + +/** + * Thrown if the server required Authentication, but the client was either not configured to provide credentials, + * or those credentials were rejected/invalid. + */ +public class UnauthorizedRequestException extends InvalidRequestException { + + /** + * Constructor. + * @param message Error message. + * @param errorCode Error code. + */ + public UnauthorizedRequestException(final String message, final int errorCode) { + super(message, errorCode); + } +} diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java index f94bbcb..2ef51c4 100644 --- a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java +++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java @@ -61,6 +61,11 @@ public void setup() { ); } + final String basicAuthUsername = System.getenv("KAFKA_CONNECT_BASICAUTH_USERNAME"); + if (basicAuthUsername != null && !basicAuthUsername.isEmpty()) { + configuration.useBasicAuth(basicAuthUsername, System.getenv("KAFKA_CONNECT_BASICAUTH_PASSWORD")); + } + // Build api client this.kafkaConnectClient = new KafkaConnectClient(configuration); } From e6f89e255ee7285b27e88dd6a5828e00e1c1668b Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 30 Jan 2019 17:47:56 +0900 Subject: [PATCH 2/3] code style violation fixes --- .../apiclient/rest/HttpClientRestClient.java | 1 - .../UnauthorizedRequestException.java | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java index cbfc7ac..5353023 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java @@ -51,7 +51,6 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.SocketException; -import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.nio.charset.StandardCharsets; diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java index 7d7db10..ebbb8b8 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java @@ -1,3 +1,20 @@ +/** + * Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package org.sourcelab.kafka.connect.apiclient.rest.exceptions; /** From c35b144f14963e8d9c96b97990a166ad108d697d Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Fri, 1 Feb 2019 13:33:10 +0900 Subject: [PATCH 3/3] Add additional test case --- CHANGELOG.md | 2 + pom.xml | 9 +++ .../apiclient/KafkaConnectClientUnitTest.java | 58 +++++++++++++++++++ 3 files changed, 69 insertions(+) create mode 100644 src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientUnitTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 3139f49..0e72cb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## 1.2.0 (UNRELEASED) +### New Features +- Added ability to authenticate with Kafka-Connect REST endpoints utilizing Basic-Authentication. ## 1.1.0 (01/30/2019) diff --git a/pom.xml b/pom.xml index 59de3f5..282837f 100644 --- a/pom.xml +++ b/pom.xml @@ -129,6 +129,15 @@ test + + + org.mockito + mockito-core + 2.23.4 + test + + + com.tngtech.java diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientUnitTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientUnitTest.java new file mode 100644 index 0000000..04b503c --- /dev/null +++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientUnitTest.java @@ -0,0 +1,58 @@ +/** + * Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.sourcelab.kafka.connect.apiclient; + +import org.apache.http.HttpStatus; +import org.junit.Test; +import org.sourcelab.kafka.connect.apiclient.rest.RestClient; +import org.sourcelab.kafka.connect.apiclient.rest.RestResponse; +import org.sourcelab.kafka.connect.apiclient.rest.exceptions.UnauthorizedRequestException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests over KafkaConnectClient. + */ +public class KafkaConnectClientUnitTest { + + /** + * This test verifies that if the underlying RestClient returns a response with Http Status Code 401, + * then KafkaConnectClient will throw an UnauthorizedRequestException. + */ + @Test(expected = UnauthorizedRequestException.class) + public void unAuthorizedException() { + // Create configuration + final Configuration configuration = new Configuration("http://localhost:9092"); + + // Create mock RestResponse + final RestResponse restResponse = new RestResponse("Invalid credentials.", HttpStatus.SC_UNAUTHORIZED); + + // Create mock RestClient + final RestClient mockRestClient = mock(RestClient.class); + when(mockRestClient.submitRequest(any())) + .thenReturn(restResponse); + + // Create client + final KafkaConnectClient client = new KafkaConnectClient(configuration, mockRestClient); + + // Call any method that makes a request via RestClient. + client.getConnectors(); + } +}