diff --git a/CHANGELOG.md b/CHANGELOG.md index b263114..b1078f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -## 1.0.4 (UNRELEASED) +## 1.1.0 (UNRELEASED) + +### New Features +- Added ability to communicate with Kafka-Connect REST endpoints using SSL. More can be found in README. ### Internal Dependency Updates - Updated Guava from 24.0-JRE to 25.0-JRE for [CVE-2018-10237](https://github.com/google/guava/wiki/CVE-2018-10237). diff --git a/README.md b/README.md index 23bd4d5..0641329 100644 --- a/README.md +++ b/README.md @@ -15,19 +15,20 @@ This client library is released on Maven Central. Add a new dependency to your org.sourcelab kafka-connect-client - 1.0.3 + 1.1.0 ``` -Example Code: + +#### Example Code: ```java /* * Create a new configuration object. * * This configuration also allows you to define some optional details on your connection, - * such as using an outbound proxy (authenticated or not). + * such as using an outbound proxy (authenticated or not), SSL client settings, etc.. */ -final Configuration configuration = new Configuration("hostname.for.kafka-connect.service.com:8083"); +final Configuration configuration = new Configuration("http://hostname.for.kafka-connect.service.com:8083"); /* * Create an instance of KafkaConnectClient, passing your configuration. @@ -51,10 +52,47 @@ final ConnectorDefinition connectorDefition = client.addConnector(NewConnectorDe .withConfig("topics", "test-topic") .build() )); + +/* + * See KafkaConnectClient for other available operations. + */ ``` Public methods available on KafkaConnectClient can be [found here](src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java#L62) + +#### Communicating with HTTPS enabled Kafka-Connect REST server: +```java +/* + * Create a new configuration object. + */ +final Configuration configuration = new Configuration("https://hostname.for.kafka-connect.service.com:8083"); + +/* + * If your JVM's TrustStore has already been updated to accept the certificate installed on your Kafka-Connect + * instance, then no further configuration is required. Typically this is done using the 'key-tool' command. + * + * Alternatively, you can configure the path to JKS formatted TrustStore file to validate the host's certificate + * with. + */ +configuration.useTrustStore( + new File("/path/to/truststore.jks"), "TrustStorePasswordHere or NULL" +); + +/* + * Optionally instead of providing a TrustStore, you can disable all verifications of Kafka-Connect's SSL certificates. + * + * Doing this is HIGHLY DISCOURAGED! + */ +//configuration.useInsecureSslCertificates(); + +/* + * Create an instance of KafkaConnectClient, passing your configuration. + */ +final KafkaConnectClient client = new KafkaConnectClient(configuration); + +``` + ## Changelog The format is based on [Keep a Changelog](http://keepachangelog.com/) diff --git a/pom.xml b/pom.xml index 0779d9d..7a498fb 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.sourcelab kafka-connect-client - 1.0.4-SNAPSHOT + 1.1.0-SNAPSHOT jar diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java index bbfa6cf..4419712 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java @@ -17,6 +17,9 @@ package org.sourcelab.kafka.connect.apiclient; +import java.io.File; +import java.util.Objects; + /** * Configure your Kafka Connect API client. * @@ -26,6 +29,14 @@ public final class Configuration { // Defines the URL/Hostname of Kafka-Connect private final String apiHost; + // Optional Connection settings + private int requestTimeoutInSeconds = 300; + + // Optional SSL options + private boolean ignoreInvalidSslCertificates = false; + private File trustStoreFile = null; + private String trustStorePassword = null; + // Optional Proxy Configuration private String proxyHost = null; private int proxyPort = 0; @@ -45,10 +56,11 @@ public Configuration(final String kafkaConnectHost) { } // Normalize into "http://" - if (!kafkaConnectHost.startsWith("http://")) { - this.apiHost = "http://" + kafkaConnectHost; - } else { + if (kafkaConnectHost.startsWith("http://") || kafkaConnectHost.startsWith("https://")) { this.apiHost = kafkaConnectHost; + } else { + // Assume http protocol + this.apiHost = "http://" + kafkaConnectHost; } } @@ -80,6 +92,42 @@ public Configuration useProxyAuthentication(final String proxyUsername, final St return this; } + /** + * Skip all validation of SSL Certificates. This is insecure and highly discouraged! + * + * @return Configuration instance. + */ + public Configuration useInsecureSslCertificates() { + this.ignoreInvalidSslCertificates = true; + return this; + } + + /** + * You can supply a path to a JKS trust store to be used to validate SSL certificates with. + * + * Alternatively you can can explicitly add your certificate to the JVM's truststore using a command like: + * keytool -importcert -keystore truststore.jks -file servercert.pem + * + * @param trustStorePath file path to truststore. + * @param password (optional) Password for truststore. + * @return Configuration instance. + */ + public Configuration useTrustStore(final File trustStorePath, final String password) { + this.trustStoreFile = Objects.requireNonNull(trustStorePath); + this.trustStorePassword = password; + return this; + } + + /** + * Set the request timeout value, in seconds. + * @param requestTimeoutInSeconds How long before a request times out, in seconds. + * @return Configuration instance. + */ + public Configuration useRequestTimeoutInSeconds(final int requestTimeoutInSeconds) { + this.requestTimeoutInSeconds = requestTimeoutInSeconds; + return this; + } + public String getProxyHost() { return proxyHost; } @@ -104,10 +152,27 @@ public String getApiHost() { return apiHost; } + public boolean getIgnoreInvalidSslCertificates() { + return ignoreInvalidSslCertificates; + } + + public File getTrustStoreFile() { + return trustStoreFile; + } + + public String getTrustStorePassword() { + return trustStorePassword; + } + + public int getRequestTimeoutInSeconds() { + return requestTimeoutInSeconds; + } + @Override public String toString() { final StringBuilder stringBuilder = new StringBuilder("Configuration{") - .append("apiHost='").append(apiHost).append('\''); + .append("apiHost='").append(apiHost).append('\'') + .append(", requestTimeout='").append(requestTimeoutInSeconds).append('\''); if (proxyHost != null) { stringBuilder .append(", proxy='").append(proxyScheme).append("://"); @@ -119,6 +184,13 @@ public String toString() { stringBuilder.append(proxyHost).append(":").append(proxyPort).append('\''); } + stringBuilder.append(", ignoreInvalidSslCertificates='").append(ignoreInvalidSslCertificates).append('\''); + if (trustStoreFile != null) { + stringBuilder.append(", sslTrustStoreFile='").append(trustStoreFile).append('\''); + if (trustStorePassword != null) { + stringBuilder.append(", sslTrustStorePassword='******'"); + } + } stringBuilder.append('}'); return stringBuilder.toString(); diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java index 28f8a40..1db31a6 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java @@ -30,14 +30,11 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; import org.apache.http.client.utils.URIBuilder; -import org.apache.http.conn.socket.LayeredConnectionSocketFactory; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.message.BasicHeader; -import org.apache.http.ssl.SSLContexts; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sourcelab.kafka.connect.apiclient.Configuration; @@ -47,7 +44,6 @@ import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ResultParsingException; import org.sourcelab.kafka.connect.apiclient.rest.handlers.RestResponseHandler; -import javax.net.ssl.SSLContext; import java.io.IOException; import java.net.SocketException; import java.net.URISyntaxException; @@ -98,23 +94,17 @@ public void init(final Configuration configuration) { // Save reference to configuration this.configuration = configuration; - // Create default SSLContext - final SSLContext sslcontext = SSLContexts.createDefault(); - - // Allow TLSv1 protocol only - final LayeredConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory( - sslcontext, - new String[] { "TLSv1" }, - null, - SSLConnectionSocketFactory.getDefaultHostnameVerifier() - ); + // Create https context builder utility. + final HttpsContextBuilder httpsContextBuilder = new HttpsContextBuilder(configuration); // Setup client builder final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); clientBuilder - // 3 min timeout? - .setConnectionTimeToLive(300, TimeUnit.SECONDS) - .setSSLSocketFactory(sslSocketFactory); + // Define timeout + .setConnectionTimeToLive(configuration.getRequestTimeoutInSeconds(), TimeUnit.SECONDS) + + // Define SSL Socket Factory instance. + .setSSLSocketFactory(httpsContextBuilder.createSslSocketFactory()); // Define our RequestConfigBuilder final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpsContextBuilder.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpsContextBuilder.java new file mode 100644 index 0000000..6af7564 --- /dev/null +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpsContextBuilder.java @@ -0,0 +1,153 @@ +/** + * Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.sourcelab.kafka.connect.apiclient.rest; + +import org.apache.http.conn.socket.LayeredConnectionSocketFactory; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.ssl.SSLContexts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sourcelab.kafka.connect.apiclient.Configuration; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.util.Objects; + +/** + * Utility for properly configuring the SSL Context based on client configuration settings. + */ +class HttpsContextBuilder { + private static final Logger logger = LoggerFactory.getLogger(HttpsContextBuilder.class); + + /** + * Accept TLS1.2, 1.1, and 1.0 protocols. + */ + private static final String[] sslProtocols = new String[] { "TLSv1.2", "TLSv1.1", "TLSv1" }; + + /** + * Client configuration. + */ + private final Configuration configuration; + + /** + * Constructor. + * @param configuration client configuration instance. + */ + HttpsContextBuilder(final Configuration configuration) { + this.configuration = Objects.requireNonNull(configuration); + } + + /** + * Get HostnameVerifier instance based on client configuration. + * @return HostnameVerifier instance. + */ + HostnameVerifier getHostnameVerifier() { + // If we're configured to ignore invalid certificates + if (configuration.getIgnoreInvalidSslCertificates()) { + // Use the Noop verifier. + return NoopHostnameVerifier.INSTANCE; + } + // Otherwise use the default verifier. + return SSLConnectionSocketFactory.getDefaultHostnameVerifier(); + } + + /** + * Get properly configured SSLContext instance based on client configuration. + * @return SSLContext instance. + */ + SSLContext getSslContext() { + // Create default SSLContext + final SSLContext sslcontext = SSLContexts.createDefault(); + + try { + // If client configuration is set to ignore invalid certificates + if (configuration.getIgnoreInvalidSslCertificates()) { + // Initialize ssl context with a TrustManager instance that just accepts everything blindly. + // HIGHLY INSECURE / NOT RECOMMENDED! + sslcontext.init(new KeyManager[0], new TrustManager[]{new NoopTrustManager()}, new SecureRandom()); + + // If client configuration has a trust store defined. + } else if (configuration.getTrustStoreFile() != null) { + + final TrustManagerFactory trustManagerFactory = TrustManagerFactory + .getInstance(TrustManagerFactory.getDefaultAlgorithm()); + + + // New JKS Keystore. + final KeyStore keyStore = KeyStore.getInstance("JKS"); + + // Attempt to read the trust store from disk. + try (final FileInputStream trustStoreFileInput = new FileInputStream(configuration.getTrustStoreFile())) { + + // If no trust store password is set. + if (configuration.getTrustStorePassword() == null) { + keyStore.load(trustStoreFileInput, null); + } else { + keyStore.load(trustStoreFileInput, configuration.getTrustStorePassword().toCharArray()); + } + trustManagerFactory.init(keyStore); + } + + // Initialize ssl context with our custom loaded trust store. + sslcontext.init(new KeyManager[0], trustManagerFactory.getTrustManagers(), new SecureRandom()); + } + } catch (final KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException | KeyManagementException e) { + throw new RuntimeException(e.getMessage(), e); + } + + return sslcontext; + } + + /** + * Get allowed SSL Protocols. + * @return allowed SslProtocols. + */ + private String[] getSslProtocols() { + return sslProtocols; + } + + /** + * Properly configured SslSocketFactory based on client configuration. + * @return SslSocketFactory instance. + */ + LayeredConnectionSocketFactory createSslSocketFactory() { + // Emit an warning letting everyone know we're using an insecure configuration. + if (configuration.getIgnoreInvalidSslCertificates()) { + logger.warn("Using insecure configuration, skipping server-side certificate validation checks."); + } + + return new SSLConnectionSocketFactory( + getSslContext(), + getSslProtocols(), + null, + getHostnameVerifier() + ); + } +} diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/NoopTrustManager.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/NoopTrustManager.java new file mode 100644 index 0000000..cb3e49d --- /dev/null +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/rest/NoopTrustManager.java @@ -0,0 +1,39 @@ +/** + * Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.sourcelab.kafka.connect.apiclient.rest; + +import javax.net.ssl.X509TrustManager; +import java.security.cert.X509Certificate; + +/** + * Implementation of TrustManager that blindly trusts all certificates with no validation or verification. + */ +class NoopTrustManager implements X509TrustManager { + @Override + public void checkClientTrusted(final X509Certificate[] x509Certificates, final String input) { + } + + @Override + public void checkServerTrusted(final X509Certificate[] x509Certificates, final String input) { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } +} diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java index 0713b66..f94bbcb 100644 --- a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java +++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java @@ -26,6 +26,7 @@ import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigDefinition; import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition; +import java.io.File; import java.util.HashMap; import java.util.Map; @@ -47,10 +48,21 @@ public void setup() { // Pull apiHost from environment String apiHost = System.getenv("KAFKA_CONNECT_HOST"); if (apiHost == null || apiHost.isEmpty()) { - apiHost = "localhost:8083"; + apiHost = "http://localhost:8083"; } + + final Configuration configuration = new Configuration(apiHost); + + // Pull trust store configuration from environment + final String sslTrustStoreFile = System.getenv("KAFKA_CONNECT_TRUSTSTORE"); + if (sslTrustStoreFile != null && !sslTrustStoreFile.isEmpty()) { + configuration.useTrustStore( + new File(sslTrustStoreFile), System.getenv("KAFKA_CONNECT_TRUSTSTORE_PASSWORD") + ); + } + // Build api client - this.kafkaConnectClient = new KafkaConnectClient(new Configuration(apiHost)); + this.kafkaConnectClient = new KafkaConnectClient(configuration); } /** diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/rest/HttpsContextBuilderTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/rest/HttpsContextBuilderTest.java new file mode 100644 index 0000000..aeeef50 --- /dev/null +++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/rest/HttpsContextBuilderTest.java @@ -0,0 +1,65 @@ +/** + * Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.sourcelab.kafka.connect.apiclient.rest; + +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.junit.Test; +import org.sourcelab.kafka.connect.apiclient.Configuration; + +import javax.net.ssl.HostnameVerifier; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class HttpsContextBuilderTest { + + /** + * Constructor should require non-null arguments. + */ + @Test(expected = NullPointerException.class) + public void testConstructorNullArguments() { + new HttpsContextBuilder(null); + } + + /** + * When configured to validate SSL certificates, should not get NoopHostnameVerifier back. + */ + @Test + public void getHostnameVerifier_validateSslCertificates() { + final HttpsContextBuilder builder = new HttpsContextBuilder(new Configuration("http://localhost")); + + final HostnameVerifier verifier = builder.getHostnameVerifier(); + assertNotNull(verifier); + assertFalse("Should not be an instance of NoopHostnameVerifier", verifier instanceof NoopHostnameVerifier); + } + + /** + * When configured to skip validating SSL certificates, should get NoopHostnameVerifier back. + */ + @Test + public void getHostnameVerifier_acceptInvalidSslCertificates() { + final HttpsContextBuilder builder = new HttpsContextBuilder( + new Configuration("http://localhost").useInsecureSslCertificates() + ); + + final HostnameVerifier verifier = builder.getHostnameVerifier(); + assertNotNull(verifier); + assertTrue("Should be an instance of NoopHostnameVerifier", verifier instanceof NoopHostnameVerifier); + } +}