Skip to content
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 1.0.4 (UNRELEASED)
## 1.1.0 (UNRELEASED)

### New Features
- Added ability to communicate with Kafka-Connect REST endpoints using SSL. More can be found in README.

### Internal Dependency Updates
- Updated Guava from 24.0-JRE to 25.0-JRE for [CVE-2018-10237](https://github.com/google/guava/wiki/CVE-2018-10237).
Expand Down
46 changes: 42 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ This client library is released on Maven Central. Add a new dependency to your
<dependency>
<groupId>org.sourcelab</groupId>
<artifactId>kafka-connect-client</artifactId>
<version>1.0.3</version>
<version>1.1.0</version>
</dependency>
```

Example Code:

#### Example Code:
```java
/*
* Create a new configuration object.
*
* This configuration also allows you to define some optional details on your connection,
* such as using an outbound proxy (authenticated or not).
* such as using an outbound proxy (authenticated or not), SSL client settings, etc..
*/
final Configuration configuration = new Configuration("hostname.for.kafka-connect.service.com:8083");
final Configuration configuration = new Configuration("http://hostname.for.kafka-connect.service.com:8083");

/*
* Create an instance of KafkaConnectClient, passing your configuration.
Expand All @@ -51,10 +52,47 @@ final ConnectorDefinition connectorDefition = client.addConnector(NewConnectorDe
.withConfig("topics", "test-topic")
.build()
));

/*
* See KafkaConnectClient for other available operations.
*/
```

Public methods available on KafkaConnectClient can be [found here](src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java#L62)


#### Communicating with HTTPS enabled Kafka-Connect REST server:
```java
/*
* Create a new configuration object.
*/
final Configuration configuration = new Configuration("https://hostname.for.kafka-connect.service.com:8083");

/*
* If your JVM's TrustStore has already been updated to accept the certificate installed on your Kafka-Connect
* instance, then no further configuration is required. Typically this is done using the 'key-tool' command.
*
* Alternatively, you can configure the path to JKS formatted TrustStore file to validate the host's certificate
* with.
*/
configuration.useTrustStore(
new File("/path/to/truststore.jks"), "TrustStorePasswordHere or NULL"
);

/*
* Optionally instead of providing a TrustStore, you can disable all verifications of Kafka-Connect's SSL certificates.
*
* Doing this is HIGHLY DISCOURAGED!
*/
//configuration.useInsecureSslCertificates();

/*
* Create an instance of KafkaConnectClient, passing your configuration.
*/
final KafkaConnectClient client = new KafkaConnectClient(configuration);

```

## Changelog

The format is based on [Keep a Changelog](http://keepachangelog.com/)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.sourcelab</groupId>
<artifactId>kafka-connect-client</artifactId>
<version>1.0.4-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<!-- Require Maven 3.3.9 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.sourcelab.kafka.connect.apiclient;

import java.io.File;
import java.util.Objects;

/**
* Configure your Kafka Connect API client.
*
Expand All @@ -26,6 +29,14 @@ public final class Configuration {
// Defines the URL/Hostname of Kafka-Connect
private final String apiHost;

// Optional Connection settings
private int requestTimeoutInSeconds = 300;

// Optional SSL options
private boolean ignoreInvalidSslCertificates = false;
private File trustStoreFile = null;
private String trustStorePassword = null;

// Optional Proxy Configuration
private String proxyHost = null;
private int proxyPort = 0;
Expand All @@ -45,10 +56,11 @@ public Configuration(final String kafkaConnectHost) {
}

// Normalize into "http://<hostname>"
if (!kafkaConnectHost.startsWith("http://")) {
this.apiHost = "http://" + kafkaConnectHost;
} else {
if (kafkaConnectHost.startsWith("http://") || kafkaConnectHost.startsWith("https://")) {
this.apiHost = kafkaConnectHost;
} else {
// Assume http protocol
this.apiHost = "http://" + kafkaConnectHost;
}
}

Expand Down Expand Up @@ -80,6 +92,42 @@ public Configuration useProxyAuthentication(final String proxyUsername, final St
return this;
}

/**
* Skip all validation of SSL Certificates. This is insecure and highly discouraged!
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps log a warning that this is not a good configuration.

*
* @return Configuration instance.
*/
public Configuration useInsecureSslCertificates() {
this.ignoreInvalidSslCertificates = true;
return this;
}

/**
* You can supply a path to a JKS trust store to be used to validate SSL certificates with.
*
* Alternatively you can can explicitly add your certificate to the JVM's truststore using a command like:
* keytool -importcert -keystore truststore.jks -file servercert.pem
*
* @param trustStorePath file path to truststore.
* @param password (optional) Password for truststore.
* @return Configuration instance.
*/
public Configuration useTrustStore(final File trustStorePath, final String password) {
this.trustStoreFile = Objects.requireNonNull(trustStorePath);
this.trustStorePassword = password;
return this;
}

/**
* Set the request timeout value, in seconds.
* @param requestTimeoutInSeconds How long before a request times out, in seconds.
* @return Configuration instance.
*/
public Configuration useRequestTimeoutInSeconds(final int requestTimeoutInSeconds) {
this.requestTimeoutInSeconds = requestTimeoutInSeconds;
return this;
}

public String getProxyHost() {
return proxyHost;
}
Expand All @@ -104,10 +152,27 @@ public String getApiHost() {
return apiHost;
}

public boolean getIgnoreInvalidSslCertificates() {
return ignoreInvalidSslCertificates;
}

public File getTrustStoreFile() {
return trustStoreFile;
}

public String getTrustStorePassword() {
return trustStorePassword;
}

public int getRequestTimeoutInSeconds() {
return requestTimeoutInSeconds;
}

@Override
public String toString() {
final StringBuilder stringBuilder = new StringBuilder("Configuration{")
.append("apiHost='").append(apiHost).append('\'');
.append("apiHost='").append(apiHost).append('\'')
.append(", requestTimeout='").append(requestTimeoutInSeconds).append('\'');
if (proxyHost != null) {
stringBuilder
.append(", proxy='").append(proxyScheme).append("://");
Expand All @@ -119,6 +184,13 @@ public String toString() {

stringBuilder.append(proxyHost).append(":").append(proxyPort).append('\'');
}
stringBuilder.append(", ignoreInvalidSslCertificates='").append(ignoreInvalidSslCertificates).append('\'');
if (trustStoreFile != null) {
stringBuilder.append(", sslTrustStoreFile='").append(trustStoreFile).append('\'');
if (trustStorePassword != null) {
stringBuilder.append(", sslTrustStorePassword='******'");
}
}
stringBuilder.append('}');

return stringBuilder.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.ssl.SSLContexts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.connect.apiclient.Configuration;
Expand All @@ -47,7 +44,6 @@
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ResultParsingException;
import org.sourcelab.kafka.connect.apiclient.rest.handlers.RestResponseHandler;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.SocketException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -98,23 +94,17 @@ public void init(final Configuration configuration) {
// Save reference to configuration
this.configuration = configuration;

// Create default SSLContext
final SSLContext sslcontext = SSLContexts.createDefault();

// Allow TLSv1 protocol only
final LayeredConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(
sslcontext,
new String[] { "TLSv1" },
null,
SSLConnectionSocketFactory.getDefaultHostnameVerifier()
);
// Create https context builder utility.
final HttpsContextBuilder httpsContextBuilder = new HttpsContextBuilder(configuration);

// Setup client builder
final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
clientBuilder
// 3 min timeout?
.setConnectionTimeToLive(300, TimeUnit.SECONDS)
.setSSLSocketFactory(sslSocketFactory);
// Define timeout
.setConnectionTimeToLive(configuration.getRequestTimeoutInSeconds(), TimeUnit.SECONDS)

// Define SSL Socket Factory instance.
.setSSLSocketFactory(httpsContextBuilder.createSslSocketFactory());

// Define our RequestConfigBuilder
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
Expand Down
Loading