Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## 1.2.0 (UNRELEASED)

### New Features
- Added ability to authenticate with Kafka-Connect REST endpoints utilizing Basic-Authentication.

## 1.1.0 (01/30/2019)

Expand Down
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@
<scope>test</scope>
</dependency>

<!-- Mocking in Tests -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.23.4</version>
<scope>test</scope>
</dependency>


<!-- Data providers on tests -->
<dependency>
<groupId>com.tngtech.java</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ public final class Configuration {
// Defines the URL/Hostname of Kafka-Connect
private final String apiHost;

// Optional Connection settings
// Optional Connection options
private int requestTimeoutInSeconds = 300;

// Optional BasicAuth options
private String basicAuthUsername = null;
private String basicAuthPassword = null;

// Optional SSL options
private boolean ignoreInvalidSslCertificates = false;
private File trustStoreFile = null;
Expand Down Expand Up @@ -64,6 +68,19 @@ public Configuration(final String kafkaConnectHost) {
}
}

/**
* Allow setting http Basic-Authentication username and password to authenticate requests.
*
* @param username username to authenticate requests to Kafka-Connect with.
* @param password password to authenticate requests to Kafka-Connect with.
* @return Configuration instance.
*/
public Configuration useBasicAuth(final String username, final String password) {
this.basicAuthUsername = username;
this.basicAuthPassword = password;
return this;
}

/**
* Allow setting optional proxy configuration.
*
Expand Down Expand Up @@ -168,6 +185,14 @@ public int getRequestTimeoutInSeconds() {
return requestTimeoutInSeconds;
}

public String getBasicAuthUsername() {
return basicAuthUsername;
}

public String getBasicAuthPassword() {
return basicAuthPassword;
}

@Override
public String toString() {
final StringBuilder stringBuilder = new StringBuilder("Configuration{")
Expand All @@ -191,6 +216,11 @@ public String toString() {
stringBuilder.append(", sslTrustStorePassword='******'");
}
}
if (basicAuthUsername != null) {
stringBuilder
.append(", basicAuthUsername='").append(basicAuthUsername).append('\'')
.append(", basicAuthPassword='******'");
}
stringBuilder.append('}');

return stringBuilder.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.sourcelab.kafka.connect.apiclient;

import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
import org.sourcelab.kafka.connect.apiclient.rest.RestResponse;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.UnauthorizedRequestException;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -289,6 +291,19 @@ private <T> T submitRequest(final Request<T> request) {
}
}

// Server reject's client's authentication.
if (responseCode == HttpStatus.SC_UNAUTHORIZED) {
// Throw contextual error msg based on if credentials are configured or not.
String errorMsg;
if (configuration.getBasicAuthUsername() == null) {
errorMsg = "Server required authentication credentials but none were provided in client configuration.";
} else {
errorMsg = "Client authentication credentials (username=" + configuration.getBasicAuthUsername() + ") was rejected by server.";
}
errorMsg = errorMsg + " Server responded with: \"" + responseStr + "\"";
throw new UnauthorizedRequestException(errorMsg, responseCode);
}

// Attempt to parse error response
try {
final RequestErrorResponse errorResponse = JacksonFactory.newInstance().readValue(responseStr, RequestErrorResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.ResponseHandler;
Expand All @@ -29,8 +30,11 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
Expand All @@ -45,8 +49,10 @@
import org.sourcelab.kafka.connect.apiclient.rest.handlers.RestResponseHandler;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -78,6 +84,8 @@ public class HttpClientRestClient implements RestClient {
*/
private CloseableHttpClient httpClient;

private HttpClientContext httpClientContext;

/**
* Constructor.
*/
Expand Down Expand Up @@ -109,6 +117,15 @@ public void init(final Configuration configuration) {
// Define our RequestConfigBuilder
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();

// Define our Credentials Provider
final CredentialsProvider credsProvider = new BasicCredentialsProvider();

// Define our context
httpClientContext = HttpClientContext.create();

// Define our auth cache
final AuthCache authCache = new BasicAuthCache();

// If we have a configured proxy host
if (configuration.getProxyHost() != null) {
// Define proxy host
Expand All @@ -120,21 +137,53 @@ public void init(final Configuration configuration) {

// If we have proxy auth enabled
if (configuration.getProxyUsername() != null) {
// Create credential provider
final CredentialsProvider credsProvider = new BasicCredentialsProvider();
// Add proxy credentials
credsProvider.setCredentials(
new AuthScope(configuration.getProxyHost(), configuration.getProxyPort()),
new UsernamePasswordCredentials(configuration.getProxyUsername(), configuration.getProxyPassword())
);

// Attach Credentials provider to client builder.
clientBuilder.setDefaultCredentialsProvider(credsProvider);
// Preemptive load context with authentication.
authCache.put(
new HttpHost(configuration.getProxyHost(), configuration.getProxyPort(), configuration.getProxyScheme()), new BasicScheme()
);
}

// Attach Proxy to request config builder
requestConfigBuilder.setProxy(proxyHost);
}

// If BasicAuth credentials are configured.
if (configuration.getBasicAuthUsername() != null) {
try {
// parse ApiHost for Hostname and port.
final URL apiUrl = new URL(configuration.getApiHost());

// Add Kafka-Connect credentials
credsProvider.setCredentials(
new AuthScope(apiUrl.getHost(), apiUrl.getPort()),
new UsernamePasswordCredentials(
configuration.getBasicAuthUsername(),
configuration.getBasicAuthPassword()
)
);

// Preemptive load context with authentication.
authCache.put(
new HttpHost(apiUrl.getHost(), apiUrl.getPort(), apiUrl.getProtocol()), new BasicScheme()
);
} catch (final MalformedURLException exception) {
throw new RuntimeException(exception.getMessage(), exception);
}
}

// Configure context.
httpClientContext.setAuthCache(authCache);
httpClientContext.setCredentialsProvider(credsProvider);

// Attach Credentials provider to client builder.
clientBuilder.setDefaultCredentialsProvider(credsProvider);

// Attach default request config
clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());

Expand Down Expand Up @@ -211,7 +260,7 @@ private <T> T submitGetRequest(final String url, final Map<String, String> getPa
logger.debug("Executing request {}", get.getRequestLine());

// Execute and return
return httpClient.execute(get, responseHandler);
return httpClient.execute(get, responseHandler, httpClientContext);
} catch (final ClientProtocolException | SocketException | URISyntaxException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
Expand Down Expand Up @@ -244,7 +293,7 @@ private <T> T submitPostRequest(final String url, final Object requestBody, fina
logger.debug("Executing request {} with {}", post.getRequestLine(), jsonPayloadStr);

// Execute and return
return httpClient.execute(post, responseHandler);
return httpClient.execute(post, responseHandler, httpClientContext);
} catch (final ClientProtocolException | SocketException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
Expand Down Expand Up @@ -276,7 +325,7 @@ private <T> T submitPutRequest(final String url, final Object requestBody, final
logger.debug("Executing request {} with {}", put.getRequestLine(), jsonPayloadStr);

// Execute and return
return httpClient.execute(put, responseHandler);
return httpClient.execute(put, responseHandler, httpClientContext);
} catch (final ClientProtocolException | SocketException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
Expand Down Expand Up @@ -307,7 +356,7 @@ private <T> T submitDeleteRequest(final String url, final Object requestBody, fi
logger.debug("Executing request {} with {}", delete.getRequestLine(), jsonPayloadStr);

// Execute and return
return httpClient.execute(delete, responseHandler);
return httpClient.execute(delete, responseHandler, httpClientContext);
} catch (final ClientProtocolException | SocketException connectionException) {
// Typically this is a connection issue.
throw new ConnectionException(connectionException.getMessage(), connectionException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public static InvalidRequestException factory(final RequestErrorResponse errorRe
Objects.requireNonNull(errorResponse, "Invalid RequestErrorResponse parameter, must not be null");

switch (errorResponse.getErrorCode()) {
case 401:
return new UnauthorizedRequestException(errorResponse.getMessage(), errorResponse.getErrorCode());
case 404:
return new ResourceNotFoundException(errorResponse.getMessage());
case 409:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
* persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package org.sourcelab.kafka.connect.apiclient.rest.exceptions;

/**
* Thrown if the server required Authentication, but the client was either not configured to provide credentials,
* or those credentials were rejected/invalid.
*/
public class UnauthorizedRequestException extends InvalidRequestException {

/**
* Constructor.
* @param message Error message.
* @param errorCode Error code.
*/
public UnauthorizedRequestException(final String message, final int errorCode) {
super(message, errorCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public void setup() {
);
}

final String basicAuthUsername = System.getenv("KAFKA_CONNECT_BASICAUTH_USERNAME");
if (basicAuthUsername != null && !basicAuthUsername.isEmpty()) {
configuration.useBasicAuth(basicAuthUsername, System.getenv("KAFKA_CONNECT_BASICAUTH_PASSWORD"));
}

// Build api client
this.kafkaConnectClient = new KafkaConnectClient(configuration);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
* persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package org.sourcelab.kafka.connect.apiclient;

import org.apache.http.HttpStatus;
import org.junit.Test;
import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
import org.sourcelab.kafka.connect.apiclient.rest.RestResponse;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.UnauthorizedRequestException;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Unit tests over KafkaConnectClient.
*/
public class KafkaConnectClientUnitTest {

/**
* This test verifies that if the underlying RestClient returns a response with Http Status Code 401,
* then KafkaConnectClient will throw an UnauthorizedRequestException.
*/
@Test(expected = UnauthorizedRequestException.class)
public void unAuthorizedException() {
// Create configuration
final Configuration configuration = new Configuration("http://localhost:9092");

// Create mock RestResponse
final RestResponse restResponse = new RestResponse("Invalid credentials.", HttpStatus.SC_UNAUTHORIZED);

// Create mock RestClient
final RestClient mockRestClient = mock(RestClient.class);
when(mockRestClient.submitRequest(any()))
.thenReturn(restResponse);

// Create client
final KafkaConnectClient client = new KafkaConnectClient(configuration, mockRestClient);

// Call any method that makes a request via RestClient.
client.getConnectors();
}
}