diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3139f49..0e72cb4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## 1.2.0 (UNRELEASED)
+### New Features
+- Added ability to authenticate with Kafka-Connect REST endpoints utilizing Basic-Authentication.
## 1.1.0 (01/30/2019)
diff --git a/pom.xml b/pom.xml
index 59de3f5..282837f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,15 @@
test
+
+
+ org.mockito
+ mockito-core
+ 2.23.4
+ test
+
+
+
com.tngtech.java
diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java
index 4419712..229c528 100644
--- a/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java
+++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java
@@ -29,9 +29,13 @@ public final class Configuration {
// Defines the URL/Hostname of Kafka-Connect
private final String apiHost;
- // Optional Connection settings
+ // Optional Connection options
private int requestTimeoutInSeconds = 300;
+ // Optional BasicAuth options
+ private String basicAuthUsername = null;
+ private String basicAuthPassword = null;
+
// Optional SSL options
private boolean ignoreInvalidSslCertificates = false;
private File trustStoreFile = null;
@@ -64,6 +68,19 @@ public Configuration(final String kafkaConnectHost) {
}
}
+ /**
+ * Allow setting http Basic-Authentication username and password to authenticate requests.
+ *
+ * @param username username to authenticate requests to Kafka-Connect with.
+ * @param password password to authenticate requests to Kafka-Connect with.
+ * @return Configuration instance.
+ */
+ public Configuration useBasicAuth(final String username, final String password) {
+ this.basicAuthUsername = username;
+ this.basicAuthPassword = password;
+ return this;
+ }
+
/**
* Allow setting optional proxy configuration.
*
@@ -168,6 +185,14 @@ public int getRequestTimeoutInSeconds() {
return requestTimeoutInSeconds;
}
+ public String getBasicAuthUsername() {
+ return basicAuthUsername;
+ }
+
+ public String getBasicAuthPassword() {
+ return basicAuthPassword;
+ }
+
@Override
public String toString() {
final StringBuilder stringBuilder = new StringBuilder("Configuration{")
@@ -191,6 +216,11 @@ public String toString() {
stringBuilder.append(", sslTrustStorePassword='******'");
}
}
+ if (basicAuthUsername != null) {
+ stringBuilder
+ .append(", basicAuthUsername='").append(basicAuthUsername).append('\'')
+ .append(", basicAuthPassword='******'");
+ }
stringBuilder.append('}');
return stringBuilder.toString();
diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java
index 10f1b07..2f44aa1 100644
--- a/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java
+++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java
@@ -17,6 +17,7 @@
package org.sourcelab.kafka.connect.apiclient;
+import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory;
@@ -49,6 +50,7 @@
import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
import org.sourcelab.kafka.connect.apiclient.rest.RestResponse;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException;
+import org.sourcelab.kafka.connect.apiclient.rest.exceptions.UnauthorizedRequestException;
import java.io.IOException;
import java.util.Collection;
@@ -289,6 +291,19 @@ private T submitRequest(final Request request) {
}
}
+ // Server reject's client's authentication.
+ if (responseCode == HttpStatus.SC_UNAUTHORIZED) {
+ // Throw contextual error msg based on if credentials are configured or not.
+ String errorMsg;
+ if (configuration.getBasicAuthUsername() == null) {
+ errorMsg = "Server required authentication credentials but none were provided in client configuration.";
+ } else {
+ errorMsg = "Client authentication credentials (username=" + configuration.getBasicAuthUsername() + ") was rejected by server.";
+ }
+ errorMsg = errorMsg + " Server responded with: \"" + responseStr + "\"";
+ throw new UnauthorizedRequestException(errorMsg, responseCode);
+ }
+
// Attempt to parse error response
try {
final RequestErrorResponse errorResponse = JacksonFactory.newInstance().readValue(responseStr, RequestErrorResponse.class);
diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java
index 1db31a6..5353023 100644
--- a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java
+++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java
@@ -21,6 +21,7 @@
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.AuthCache;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.ResponseHandler;
@@ -29,8 +30,11 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
@@ -45,8 +49,10 @@
import org.sourcelab.kafka.connect.apiclient.rest.handlers.RestResponseHandler;
import java.io.IOException;
+import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.URISyntaxException;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
@@ -78,6 +84,8 @@ public class HttpClientRestClient implements RestClient {
*/
private CloseableHttpClient httpClient;
+ private HttpClientContext httpClientContext;
+
/**
* Constructor.
*/
@@ -109,6 +117,15 @@ public void init(final Configuration configuration) {
// Define our RequestConfigBuilder
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+ // Define our Credentials Provider
+ final CredentialsProvider credsProvider = new BasicCredentialsProvider();
+
+ // Define our context
+ httpClientContext = HttpClientContext.create();
+
+ // Define our auth cache
+ final AuthCache authCache = new BasicAuthCache();
+
// If we have a configured proxy host
if (configuration.getProxyHost() != null) {
// Define proxy host
@@ -120,21 +137,53 @@ public void init(final Configuration configuration) {
// If we have proxy auth enabled
if (configuration.getProxyUsername() != null) {
- // Create credential provider
- final CredentialsProvider credsProvider = new BasicCredentialsProvider();
+ // Add proxy credentials
credsProvider.setCredentials(
new AuthScope(configuration.getProxyHost(), configuration.getProxyPort()),
new UsernamePasswordCredentials(configuration.getProxyUsername(), configuration.getProxyPassword())
);
- // Attach Credentials provider to client builder.
- clientBuilder.setDefaultCredentialsProvider(credsProvider);
+ // Preemptive load context with authentication.
+ authCache.put(
+ new HttpHost(configuration.getProxyHost(), configuration.getProxyPort(), configuration.getProxyScheme()), new BasicScheme()
+ );
}
// Attach Proxy to request config builder
requestConfigBuilder.setProxy(proxyHost);
}
+ // If BasicAuth credentials are configured.
+ if (configuration.getBasicAuthUsername() != null) {
+ try {
+ // parse ApiHost for Hostname and port.
+ final URL apiUrl = new URL(configuration.getApiHost());
+
+ // Add Kafka-Connect credentials
+ credsProvider.setCredentials(
+ new AuthScope(apiUrl.getHost(), apiUrl.getPort()),
+ new UsernamePasswordCredentials(
+ configuration.getBasicAuthUsername(),
+ configuration.getBasicAuthPassword()
+ )
+ );
+
+ // Preemptive load context with authentication.
+ authCache.put(
+ new HttpHost(apiUrl.getHost(), apiUrl.getPort(), apiUrl.getProtocol()), new BasicScheme()
+ );
+ } catch (final MalformedURLException exception) {
+ throw new RuntimeException(exception.getMessage(), exception);
+ }
+ }
+
+ // Configure context.
+ httpClientContext.setAuthCache(authCache);
+ httpClientContext.setCredentialsProvider(credsProvider);
+
+ // Attach Credentials provider to client builder.
+ clientBuilder.setDefaultCredentialsProvider(credsProvider);
+
// Attach default request config
clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
@@ -211,7 +260,7 @@ private T submitGetRequest(final String url, final Map getPa
logger.debug("Executing request {}", get.getRequestLine());
// Execute and return
- return httpClient.execute(get, responseHandler);
+ return httpClient.execute(get, responseHandler, httpClientContext);
} catch (final ClientProtocolException | SocketException | URISyntaxException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -244,7 +293,7 @@ private T submitPostRequest(final String url, final Object requestBody, fina
logger.debug("Executing request {} with {}", post.getRequestLine(), jsonPayloadStr);
// Execute and return
- return httpClient.execute(post, responseHandler);
+ return httpClient.execute(post, responseHandler, httpClientContext);
} catch (final ClientProtocolException | SocketException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -276,7 +325,7 @@ private T submitPutRequest(final String url, final Object requestBody, final
logger.debug("Executing request {} with {}", put.getRequestLine(), jsonPayloadStr);
// Execute and return
- return httpClient.execute(put, responseHandler);
+ return httpClient.execute(put, responseHandler, httpClientContext);
} catch (final ClientProtocolException | SocketException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -307,7 +356,7 @@ private T submitDeleteRequest(final String url, final Object requestBody, fi
logger.debug("Executing request {} with {}", delete.getRequestLine(), jsonPayloadStr);
// Execute and return
- return httpClient.execute(delete, responseHandler);
+ return httpClient.execute(delete, responseHandler, httpClientContext);
} catch (final ClientProtocolException | SocketException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/InvalidRequestException.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/InvalidRequestException.java
index 3697227..2168be8 100644
--- a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/InvalidRequestException.java
+++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/InvalidRequestException.java
@@ -64,6 +64,8 @@ public static InvalidRequestException factory(final RequestErrorResponse errorRe
Objects.requireNonNull(errorResponse, "Invalid RequestErrorResponse parameter, must not be null");
switch (errorResponse.getErrorCode()) {
+ case 401:
+ return new UnauthorizedRequestException(errorResponse.getMessage(), errorResponse.getErrorCode());
case 404:
return new ResourceNotFoundException(errorResponse.getMessage());
case 409:
diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java
new file mode 100644
index 0000000..ebbb8b8
--- /dev/null
+++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/UnauthorizedRequestException.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
+ * persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
+ * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.sourcelab.kafka.connect.apiclient.rest.exceptions;
+
+/**
+ * Thrown if the server required Authentication, but the client was either not configured to provide credentials,
+ * or those credentials were rejected/invalid.
+ */
+public class UnauthorizedRequestException extends InvalidRequestException {
+
+ /**
+ * Constructor.
+ * @param message Error message.
+ * @param errorCode Error code.
+ */
+ public UnauthorizedRequestException(final String message, final int errorCode) {
+ super(message, errorCode);
+ }
+}
diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java
index f94bbcb..2ef51c4 100644
--- a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java
+++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java
@@ -61,6 +61,11 @@ public void setup() {
);
}
+ final String basicAuthUsername = System.getenv("KAFKA_CONNECT_BASICAUTH_USERNAME");
+ if (basicAuthUsername != null && !basicAuthUsername.isEmpty()) {
+ configuration.useBasicAuth(basicAuthUsername, System.getenv("KAFKA_CONNECT_BASICAUTH_PASSWORD"));
+ }
+
// Build api client
this.kafkaConnectClient = new KafkaConnectClient(configuration);
}
diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientUnitTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientUnitTest.java
new file mode 100644
index 0000000..04b503c
--- /dev/null
+++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientUnitTest.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
+ * persons to whom the Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
+ * Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
+ * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.sourcelab.kafka.connect.apiclient;
+
+import org.apache.http.HttpStatus;
+import org.junit.Test;
+import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
+import org.sourcelab.kafka.connect.apiclient.rest.RestResponse;
+import org.sourcelab.kafka.connect.apiclient.rest.exceptions.UnauthorizedRequestException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests over KafkaConnectClient.
+ */
+public class KafkaConnectClientUnitTest {
+
+ /**
+ * This test verifies that if the underlying RestClient returns a response with Http Status Code 401,
+ * then KafkaConnectClient will throw an UnauthorizedRequestException.
+ */
+ @Test(expected = UnauthorizedRequestException.class)
+ public void unAuthorizedException() {
+ // Create configuration
+ final Configuration configuration = new Configuration("http://localhost:9092");
+
+ // Create mock RestResponse
+ final RestResponse restResponse = new RestResponse("Invalid credentials.", HttpStatus.SC_UNAUTHORIZED);
+
+ // Create mock RestClient
+ final RestClient mockRestClient = mock(RestClient.class);
+ when(mockRestClient.submitRequest(any()))
+ .thenReturn(restResponse);
+
+ // Create client
+ final KafkaConnectClient client = new KafkaConnectClient(configuration, mockRestClient);
+
+ // Call any method that makes a request via RestClient.
+ client.getConnectors();
+ }
+}