Skip to content

Commit

Permalink
added support for self-signed certificates
Browse files Browse the repository at this point in the history
 - now if validateCertificates and respectively a CA is configured they will be used for Kafka messaging. For both consumers and producers.

Signed-off-by: Kalin Kostashki <kalin.kostashki@bosch.io>
  • Loading branch information
Kalin Kostashki authored and thjaeckle committed Aug 24, 2022
1 parent 1ffae8b commit 8908c07
Showing 1 changed file with 11 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -95,6 +96,7 @@ ConsumerSettings<String, ByteBuffer> getConsumerSettings(final boolean dryRun) {
.withBootstrapServers(bootstrapServers)
.withGroupId(connection.getId().toString())
.withClientId(clientId + "-consumer")
.withProperties(getTrustedSelfSignedCertificates())
.withProperties(getConsumerSpecificConfigProperties())
.withProperties(getSecurityProtocolProperties())
.withConnectionChecker(connectionCheckerSettings);
Expand All @@ -114,10 +116,19 @@ ProducerSettings<String, ByteBuffer> getProducerSettings() {
return ProducerSettings.apply(alpakkaConfig, new StringSerializer(), new ByteBufferSerializer())
.withBootstrapServers(bootstrapServers)
.withProperties(getClientIdProperties())
.withProperties(getTrustedSelfSignedCertificates())
.withProperties(getProducerSpecificConfigProperties())
.withProperties(getSecurityProtocolProperties());
}

private Map<String, String> getTrustedSelfSignedCertificates() {
if (connection.isValidateCertificates() && connection.getTrustedCertificates().isPresent()) {
return Map.of(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM",
SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, connection.getTrustedCertificates().orElse(""));
}
return Map.of();
}

private Map<String, String> getClientIdProperties() {
return Map.of(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
}
Expand Down

0 comments on commit 8908c07

Please sign in to comment.