Skip to content

Commit

Permalink
fix basic auth login to Kafka broker
Browse files Browse the repository at this point in the history
  • Loading branch information
ffendt committed Mar 13, 2019
1 parent c07d36f commit 5303dfc
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
Expand Up @@ -84,8 +84,12 @@ private KafkaPublisherActor(final String connectionId, final List<Target> target

/*
TODO: test cases:
1. what happens if the topic is missing -> org.apache.kafka.common.errors.TimeoutException: Topic <topic> not present in metadata after 10000 ms.
2. what happens if authentication is unsuccessful
1.0 what happens if the topic is missing -> org.apache.kafka.common.errors.TimeoutException: Topic <topic> not present in metadata after 10000 ms.
1.1 what happens if the partition is not available -> org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 10000 ms.
2. what happens if authentication is unsuccessful -> org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
The internal NetworkClient of the used library will start logging failures repeatedly:
o.a.k.c.NetworkClient - [Producer clientId=producer-3] Connection to node -1 (localhost/127.0.0.1:9092) failed authentication due to: Authentication failed: Invalid username or password
we should therefore definitely stop the producer and ourself
3. what happens if authorization is unsuccessful
4. what happens if the port is closed
5. what happens if kafka is stopped
Expand Down
Expand Up @@ -10,7 +10,10 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.kafka;

import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -43,7 +46,7 @@ ProducerSettings<String, String> createProducerSettings(final Connection connect
// TODO: config may not be null!
ProducerSettings settings = ProducerSettings.create(config, KEY_SERIALIZER, VALUE_SERIALIZER)
.withBootstrapServers(getBootstrapServers(connection))
.withProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
.withProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000"); // TODO blocking timeout, either due to missing metadata or due to full buffer, reset to 60.000
// .withCloseTimeout()
// .withDispatcher()
// .withEosCommitInterval()
Expand All @@ -66,10 +69,10 @@ private ProducerSettings<String, String> addAuthentication(final String username
final String password,
final ProducerSettings<String, String> producerSettings) {

producerSettings.withProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
// TODO: if SSL is enabled, use SASL_SSL, otherwise SASL_PLAINTEXT ... so is it bad to always use sasl_plaintext?
producerSettings.withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
return producerSettings.withProperty(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required " +
// TODO: if SSL is enabled, use SASL_SSL, otherwise SASL_PLAINTEXT
return producerSettings.withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
.withProperty(SaslConfigs.SASL_MECHANISM, "PLAIN")
.withProperty(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username +"\" " +
"password=\"" + password +"\";");
}
Expand Down

0 comments on commit 5303dfc

Please sign in to comment.