Skip to content

Commit

Permalink
Adding SSL for Schema Registry, SASL for Broker and upgrade to conflu…
Browse files Browse the repository at this point in the history
…ent 6.0
  • Loading branch information
christian.edelsbrunn authored and christian.edelsbrunn committed Dec 15, 2020
1 parent 7501275 commit 73fd40c
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 21 deletions.
55 changes: 55 additions & 0 deletions README.md
Expand Up @@ -25,3 +25,58 @@ Allows for defining and configurating topics once and apply them to different cl
### Message Books
Allows for playing a set of Messages over different topics into a cluster, see the [Wiki]("https://github.com/patschuh/KafkaEsque/wiki/Message-Books") for details
***

### Authentication
At the moment the UI only supports cluster configs without any authentication
Within the cluster.json file it is possible to configure Authentication for Kafka and Confluent Schema Registry:
Note: the secrets have to be given in the json file in plain text. This might be a security issue. Feel free to apply a PR if you want to improve this.

sslEnabled controls the SSL Authentication method

###### Example for SSL with mTLS Authentication to the broker:
```
{
"identifier": "my-mtls-secured-cluster",
"bootstrapServers": "broker:<portofmtlslistener>",
"sslEnabled": true,
"keyStoreLocation": "mykeystore.jks",
"keyStorePassword": "mykeystorepw",
"trustStoreLocation": "mytruststore.jks",
"trustStorePassword": "mykeystorepw"
}
```
###### Example for SASL_SSL Authentication

saslSecurityProtocol,saslMechanism and saslJaasConfig can be provided
This can also be combined with given trust and keystore configuration

```
{
"identifier": "my-mtls-secured-cluster",
"bootstrapServers": "broker:<portofmtlslistener>",
"saslSecurityProtocol": "SASL_SSL",
"saslMechanism" : "PLAIN",
"saslJaasConfig" : "org.apache.kafka.common.security.plain.PlainLoginModule required serviceName=kafka username=\"MYUSER\" password=\"53CR37\";"
}
```

###### Example with Schema Registry with HTTPS and Basic Auth

The http**s** and 'sslEnabled' is important if you want to use truststore and/or keystore otherwise those attributes are ignored and now sslContext is provided to Schema Registry client

you can use only Basic Auth if you SR is only protected with basic auth, you can use only keystore+truststore if your SR is protected with mTLS or you can use both settings in parallel.

```
{
....
"schemaRegistry": "https://myschemaregistry:8081",
"schemaRegistryBasicAuthUserInfo": "<BasicAuthUser>:<BasicAuthPW>",
...
"sslEnabled": true,
"keyStoreLocation": "mykeystore.jks",
"keyStorePassword": "mykeystorepw",
"trustStoreLocation": "mytruststore.jks",
"trustStorePassword": "mykeystorepw"
}
```

15 changes: 10 additions & 5 deletions build.gradle
Expand Up @@ -28,31 +28,36 @@ jfx {

repositories {
mavenLocal()

maven {
url = 'https://repo1.maven.org/maven2'
}

maven {
url = 'https://packages.confluent.io/maven/'
}

maven {
url = 'https://repository.mulesoft.org/nexus/content/repositories/public/'
}
}

dependencies {
compile 'org.apache.kafka:kafka-clients:2.0.0'
compile group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: '1.9.13'
compile 'org.apache.kafka:kafka-clients:2.5.1'
compile 'com.fasterxml.jackson.core:jackson-databind:2.9.9'
compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.9.9'
compile 'org.slf4j:slf4j-simple:1.7.25'
compile 'org.apache.commons:commons-lang3:3.7'
compile 'org.kordamp.ikonli:ikonli-javafx:2.4.0'
compile 'org.kordamp.ikonli:ikonli-fontawesome-pack:2.4.0'
compile 'com.opencsv:opencsv:4.0'
compile 'io.confluent:kafka-schema-registry:5.0.0'
compile 'io.confluent:kafka-avro-serializer:5.0.0'
compile 'io.confluent:kafka-schema-registry:6.0.0'
compile 'io.confluent:kafka-avro-serializer:6.0.0'
compile 'com.google.inject:guice:4.2.2'
compile 'org.fxmisc.richtext:richtextfx:0.10.5'
compile 'net.thisptr:jackson-jq:1.0.0-preview.20191208'

testCompile 'junit:junit:4.12'
testCompile 'org.springframework.kafka:spring-kafka-test:2.2.0.RELEASE'
testCompile 'com.google.code.tempus-fugit:tempus-fugit:1.1'
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/at/esque/kafka/Controller.java
Expand Up @@ -230,7 +230,7 @@ public void setup(Stage controlledStage) {
specificOffsetTextField.setVisible(newValue == FetchTypes.SPECIFIC_OFFSET));

clusterComboBox.getSelectionModel().selectedItemProperty().addListener((observable, oldValue, newValue) -> {
adminClient = new KafkaesqueAdminClient(newValue.getBootStrapServers(), configHandler.getSslProperties(selectedCluster()));
adminClient = new KafkaesqueAdminClient(newValue.getBootStrapServers(), configHandler.getSslProperties(selectedCluster()), configHandler.getSaslProperties(selectedCluster()));
refreshTopicList(newValue);
});

Expand Down Expand Up @@ -536,7 +536,7 @@ public void schemaRegistryClick(ActionEvent event) {
FXMLLoader fxmlLoader = new FXMLLoader(getClass().getResource("/fxml/schemaRegistryBrowser.fxml"));
Parent root1 = fxmlLoader.load();
SchemaRegistryBrowserController controller = fxmlLoader.getController();
controller.setup(selectedConfig.getSchemaRegistry());
controller.setup(selectedConfig,configHandler);
Stage stage = new Stage();
stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png")));
stage.initModality(Modality.APPLICATION_MODAL);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/at/esque/kafka/CrossClusterController.java
Expand Up @@ -143,7 +143,7 @@ private void setupClusterControls(ClusterConfig clusterConfig, KafkaesqueAdminCl
if (adminClient != null) {
adminClient.close();
}
adminClient = new KafkaesqueAdminClient(clusterConfig.getBootStrapServers(), configHandler.getSslProperties(clusterConfig));
adminClient = new KafkaesqueAdminClient(clusterConfig.getBootStrapServers(), configHandler.getSslProperties(clusterConfig),configHandler.getSaslProperties(clusterConfig));
KafkaesqueAdminClient finalAdminClient = adminClient;
runInDaemonThread(() -> {
ObservableList<String> topics = FXCollections.observableArrayList(finalAdminClient.getTopics());
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java
Expand Up @@ -2,9 +2,11 @@

import at.esque.kafka.alerts.ConfirmationAlert;
import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.cluster.ClusterConfig;
import at.esque.kafka.controls.FilterableListView;
import at.esque.kafka.controls.JsonTreeView;
import at.esque.kafka.controls.KafkaEsqueCodeArea;
import at.esque.kafka.handlers.ConfigHandler;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import javafx.collections.FXCollections;
import javafx.event.ActionEvent;
Expand Down Expand Up @@ -37,8 +39,16 @@ public class SchemaRegistryBrowserController {
private JsonTreeView jsonTreeView;


public void setup(String schemaregistryUrl) {
schemaRegistryRestService = new RestService(schemaregistryUrl);
public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) {
schemaRegistryRestService = new RestService(selectedConfig.getSchemaRegistry());

if(selectedConfig.isSchemaRegistryHttps())
{
schemaRegistryRestService.setSslSocketFactory(selectedConfig.buildSSlSocketFactory());
}

schemaRegistryRestService.configure(configHandler.getSchemaRegistryAuthProperties(selectedConfig));

jsonTreeView.jsonStringProperty().bind(schemaTextArea.textProperty());
try {
versionComboBox.getSelectionModel().selectedItemProperty().addListener(((observable1, oldValue1, newValue1) -> {
Expand Down
91 changes: 91 additions & 0 deletions src/main/java/at/esque/kafka/cluster/ClusterConfig.java
Expand Up @@ -6,15 +6,28 @@
import javafx.beans.property.SimpleStringProperty;
import javafx.beans.property.StringProperty;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.*;
import java.security.cert.CertificateException;

public class ClusterConfig {
private StringProperty identifier = new SimpleStringProperty();
private StringProperty bootStrapServers = new SimpleStringProperty();
private StringProperty schemaRegistry = new SimpleStringProperty();
private StringProperty schemaRegistryBasicAuthUserInfo = new SimpleStringProperty();
private BooleanProperty sslEnabled = new SimpleBooleanProperty();
private StringProperty keyStoreLocation = new SimpleStringProperty();
private StringProperty keyStorePassword = new SimpleStringProperty();
private StringProperty trustStoreLocation = new SimpleStringProperty();
private StringProperty trustStorePassword = new SimpleStringProperty();
private StringProperty saslSecurityProtocol = new SimpleStringProperty();
private StringProperty saslMechanism = new SimpleStringProperty();
private StringProperty saslJaasConfig = new SimpleStringProperty();

@JsonProperty("identifier")
public String getIdentifier() {
Expand Down Expand Up @@ -120,8 +133,86 @@ public void setTrustStorePassword(String trustStorePassword) {
this.trustStorePassword.set(trustStorePassword);
}


@JsonProperty("saslSecurityProtocol")
public String getSaslSecurityProtocol() { return saslSecurityProtocol.get(); }

public StringProperty saslSecurityProtocolProperty() {
return saslSecurityProtocol;
}

public void setSaslSecurityProtocol(String saslSecurityProtocol) { this.saslSecurityProtocol.set(saslSecurityProtocol); }

@JsonProperty("saslMechanism")
public String getSaslMechanism() {
return saslMechanism.get();
}

public StringProperty saslMechanismProperty() {
return saslMechanism;
}

public void setSaslMechanism(String saslMechanism) {
this.saslMechanism.set(saslMechanism);
}

@JsonProperty("saslJaasConfig")
public String getSaslJaasConfig() {
return saslJaasConfig.get();
}

public StringProperty saslJaasConfigProperty() {
return saslJaasConfig;
}

public void setSaslJaasConfig(String saslJaasConfig) { this.saslJaasConfig.set(saslJaasConfig); }

@JsonProperty("schemaRegistryBasicAuthUserInfo")
public String getSchemaRegistryBasicAuthUserInfo() {
return schemaRegistryBasicAuthUserInfo.get();
}

public StringProperty schemaRegistryBasicAuthUserInfoProperty() {
return schemaRegistryBasicAuthUserInfo;
}

public void setSchemaRegistryBasicAuthUserInfo(String schemaRegistryBasicAuthUserInfo) { this.schemaRegistryBasicAuthUserInfo.set(schemaRegistryBasicAuthUserInfo); }

public boolean isSchemaRegistryHttps()
{
return this.getSchemaRegistry().toLowerCase().startsWith("https:");
}

@Override
public String toString(){
return String.format("%s (%s)", getIdentifier(), getBootStrapServers());
}



public SSLSocketFactory buildSSlSocketFactory() {
try {
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(new FileInputStream(this.getKeyStoreLocation()), this.getKeyStorePassword().toCharArray());

KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType());
ts.load(new FileInputStream(this.getTrustStoreLocation()), this.getTrustStorePassword().toCharArray());

KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, this.getKeyStorePassword().toCharArray());

TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ts);

SSLContext sc = SSLContext.getInstance("TLSv1.2");
sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

return sc.getSocketFactory();

} catch (KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException | UnrecoverableKeyException | KeyManagementException e) {
at.esque.kafka.alerts.ErrorAlert.show(e);
return null;
}
}

}
Expand Up @@ -35,11 +35,12 @@
public class KafkaesqueAdminClient {
private AdminClient adminClient;

public KafkaesqueAdminClient(String bootstrapServers, Map<String, String> sslProps) {
public KafkaesqueAdminClient(String bootstrapServers, Map<String, String> sslProps, Map<String, String> saslProps) {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, String.format("kafkaesque-%s", UUID.randomUUID()));
props.putAll(sslProps);
props.putAll(saslProps);

this.adminClient = AdminClient.create(props);
}
Expand Down
49 changes: 49 additions & 0 deletions src/main/java/at/esque/kafka/handlers/ConfigHandler.java
Expand Up @@ -8,8 +8,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import com.google.inject.Singleton;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -181,27 +183,74 @@ public Map<String, String> getSslProperties(ClusterConfig config) {
if (config.isSslEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
}

if (config.isSchemaRegistryHttps()) {
props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
}

if (StringUtils.isNotEmpty(config.getKeyStoreLocation())) {
String keyStoreLocation = getJksStoreLocation(config.getIdentifier(), config.getKeyStoreLocation());
if (keyStoreLocation != null) {
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreLocation);
props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreLocation);

if (StringUtils.isNotEmpty(config.getKeyStorePassword())) {
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, config.getKeyStorePassword());
props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, config.getKeyStorePassword());
}
}
}
if (StringUtils.isNotEmpty(config.getTrustStoreLocation())) {
String trustStoreLocation = getJksStoreLocation(config.getIdentifier(), config.getTrustStoreLocation());

if (trustStoreLocation != null) {
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);

if (StringUtils.isNotEmpty(config.getTrustStorePassword())) {
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, config.getTrustStorePassword());
props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, config.getTrustStorePassword());
}
}
}
return props;
}


public Map<String, String> getSaslProperties(ClusterConfig config) {
Map<String, String> props = new HashMap<>();

if (StringUtils.isNoneEmpty(config.getSaslSecurityProtocol()))
{
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,config.getSaslSecurityProtocol());
}

if (StringUtils.isNotEmpty(config.getSaslMechanism()))
{
props.put(SaslConfigs.SASL_MECHANISM,config.getSaslMechanism());
}


if (StringUtils.isNotEmpty(config.getSaslJaasConfig()))
{
props.put(SaslConfigs.SASL_JAAS_CONFIG,config.getSaslJaasConfig());
}

return props;
}

public Map<String, ?> getSchemaRegistryAuthProperties(ClusterConfig config) {
Map<String, String> props = new HashMap<>();

if (StringUtils.isNoneEmpty(config.getSchemaRegistryBasicAuthUserInfo()))
{
props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE,"USER_INFO");
props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getSchemaRegistryBasicAuthUserInfo());
}

return props;
}

private String getJksStoreLocation(String clusterIdentification, String location) {
File jksStore = new File(location);
if (jksStore.exists() && jksStore.isFile()) {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/at/esque/kafka/handlers/ConsumerHandler.java
Expand Up @@ -74,6 +74,8 @@ public UUID registerConsumer(ClusterConfig config, TopicMessageTypeConfig topicM
consumerProps.setProperty("schema.registry.url", config.getSchemaRegistry());
}
consumerProps.putAll(configHandler.getSslProperties(config));
consumerProps.putAll(configHandler.getSaslProperties(config));
consumerProps.putAll(configHandler.getSchemaRegistryAuthProperties(config));
consumerProps.putAll(consumerConfigs);

LOGGER.info("Creating new Consumer with properties: [{}]", consumerProps);
Expand Down

0 comments on commit 73fd40c

Please sign in to comment.