From 764f38785f83a95f840c398c9ef2f557e67fe07a Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 20 Sep 2017 16:27:23 +0100 Subject: [PATCH 1/3] KAFKA-5947: Handle authentication exceptions in admin client and txn producer --- .../kafka/clients/NetworkClientUtils.java | 4 +- .../kafka/clients/admin/KafkaAdminClient.java | 40 ++++++++++ .../clients/producer/internals/Sender.java | 59 ++++++++------- .../internals/TransactionManager.java | 6 ++ .../errors/AuthenticationException.java | 15 ++++ .../errors/AuthenticationFailedException.java | 13 ++++ .../errors/IllegalSaslStateException.java | 6 ++ .../UnsupportedSaslMechanismException.java | 5 ++ .../ClientAuthenticationFailureTest.java | 35 +++++++++ ...aslClientsWithInvalidCredentialsTest.scala | 73 ++++++++++++++++++- 10 files changed, 225 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java index 84629799b37b3..c4559a48ca84d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java @@ -47,7 +47,7 @@ public static boolean isReady(KafkaClient client, Node node, long currentTime) { * It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails, * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive * connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which - * has recently disconnected. + * has recently disconnected. If authentication to the node fails, an `AuthenticationException` is thrown. * * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with * care. @@ -69,6 +69,8 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long } long pollTimeout = expiryTime - attemptStartTime; client.poll(pollTimeout, attemptStartTime); + if (client.authenticationException(node) != null) + throw client.authenticationException(node); attemptStartTime = time.milliseconds(); } return client.isReady(node, attemptStartTime); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 20db6b77a5365..a6837646c8a37 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InvalidRequestException; @@ -847,6 +848,42 @@ private void timeoutCallsInFlight(TimeoutProcessor processor, Map> callsToSend) { + AuthenticationException authenticationException = null; + try { + metadata.maybeThrowAuthenticationException(); + } catch (AuthenticationException e) { + authenticationException = e; + } + if (authenticationException == null) { + for (Node node : callsToSend.keySet()) { + authenticationException = client.authenticationException(node); + if (authenticationException != null) + break; + } + } + if (authenticationException != null) { + synchronized (this) { + for (Call newCall : newCalls) { + newCall.fail(now, authenticationException); + } + newCalls.clear(); + } + for (List calls : callsToSend.values()) { + for (Call call : calls) { + call.handleFailure(authenticationException); + } + } + callsToSend.clear(); + return true; + } else + return false; + } + /** * Handle responses from the server. * @@ -974,6 +1011,9 @@ public void run() { // Update the current time and handle the latest responses. now = time.milliseconds(); + if (handleAuthenticationException(now, callsToSend) && + hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) + break; handleResponses(now, responses, callsInFlight, correlationIdToCalls); } int numTimedOut = 0; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index ecfad709eb041..9b928b0eb9ab2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; @@ -200,32 +201,38 @@ public void run() { */ void run(long now) { if (transactionManager != null) { - if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) - // Check if the previous run expired batches which requires a reset of the producer state. - transactionManager.resetProducerId(); - - if (!transactionManager.isTransactional()) { - // this is an idempotent producer, so make sure we have a producer id - maybeWaitForProducerId(); - } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { - transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + - "some previously sent messages and can no longer retry them. It isn't safe to continue.")); - } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { - // as long as there are outstanding transactional requests, we simply wait for them to return - client.poll(retryBackoffMs, now); - return; - } + try { + if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) + // Check if the previous run expired batches which requires a reset of the producer state. + transactionManager.resetProducerId(); + + if (!transactionManager.isTransactional()) { + // this is an idempotent producer, so make sure we have a producer id + maybeWaitForProducerId(); + } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { + transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); + } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { + // as long as there are outstanding transactional requests, we simply wait for them to return + client.poll(retryBackoffMs, now); + return; + } - // do not continue sending if the transaction manager is in a failed state or if there - // is no producer id (for the idempotent case). - if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { - RuntimeException lastError = transactionManager.lastError(); - if (lastError != null) - maybeAbortBatches(lastError); - client.poll(retryBackoffMs, now); - return; - } else if (transactionManager.hasAbortableError()) { - accumulator.abortUndrainedBatches(transactionManager.lastError()); + // do not continue sending if the transaction manager is in a failed state or if there + // is no producer id (for the idempotent case). + if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { + RuntimeException lastError = transactionManager.lastError(); + if (lastError != null) + maybeAbortBatches(lastError); + client.poll(retryBackoffMs, now); + return; + } else if (transactionManager.hasAbortableError()) { + accumulator.abortUndrainedBatches(transactionManager.lastError()); + } + } catch (AuthenticationException e) { + // This is already logged as error, but propagated here to perform any clean ups. + log.trace("Authentication exception while processing transactional request"); + transactionManager.authenticationFailed(e); } } @@ -406,7 +413,7 @@ private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOExc private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException { Node node = client.leastLoadedNode(time.milliseconds()); - if (NetworkClientUtils.awaitReady(client, node, time, remainingTimeMs)) { + if (node != null && NetworkClientUtils.awaitReady(client, node, time, remainingTimeMs)) { return node; } return null; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index b2387a0a74e10..628094065de10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.protocol.Errors; @@ -579,6 +580,11 @@ synchronized void retry(TxnRequestHandler request) { enqueueRequest(request); } + synchronized void authenticationFailed(AuthenticationException e) { + for (TxnRequestHandler request : pendingRequests) + request.fatalError(e); + } + Node coordinator(FindCoordinatorRequest.CoordinatorType type) { switch (type) { case GROUP: diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java index aa4a111a00cb0..d34b82c62081e 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java @@ -16,6 +16,21 @@ */ package org.apache.kafka.common.errors; +import javax.net.ssl.SSLException; + +/** + * This exception indicates that SSL handshake or SASL authentication has failed. + * On authentication failure, clients abort the operation requested and raise one + * of the subclasses of this exception: + *
    + * {@link AuthenticationFailedException} if SSL handshake fails with an + * {@link SSLException} or SASL handshake fails with invalid credentials. + *
  • {@link UnsupportedSaslMechanismException} if the SASL mechanism requested by the client + * is not supported on the broker.
  • + *
  • {@link IllegalSaslStateException} if an unexpected request is received on during SASL + * handshake. This could be due to misconfigured security protocol. + *
+ */ public class AuthenticationException extends ApiException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java index 3be72f0f94a29..f1e1e6d54ac0f 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java @@ -16,6 +16,19 @@ */ package org.apache.kafka.common.errors; +/** + * This exception indicates that SSL handshake or SASL authentication has + * failed. + *

+ * SSL handshake failures in clients may indicate client authentication + * failure due to untrusted certificates if server is configured to request + * client certificates. Handshake failures could also indicate misconfigured + * security including protocol/cipher suite mismatch, server certificate + * authentication failure or server host name verification failure. + *

+ * SASL authentication failures typically indicate invalid credentials. + *

+ */ public class AuthenticationFailedException extends AuthenticationException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java index c45f00776fb28..5db3f9e12c7bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java @@ -16,6 +16,12 @@ */ package org.apache.kafka.common.errors; +/** + * This exception indicates unexpected requests prior to SASL authentication. + * This could be due to misconfigured security, e.g. if PLAINTEXT protocol + * is used to connect to a SASL endpoint. + * + */ public class IllegalSaslStateException extends AuthenticationException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java index 9dab22ac7a07c..a61da8079820e 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java @@ -16,6 +16,11 @@ */ package org.apache.kafka.common.errors; +/** + * This exception indicates that the SASL mechanism requested by the client + * is not enabled on the broker. + * + */ public class UnsupportedSaslMechanismException extends AuthenticationException { private static final long serialVersionUID = 1L; diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index b4460f76aa7c9..f87681a89419d 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.common.security.authenticator; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -35,6 +37,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -103,6 +106,38 @@ public void testProducerWithInvalidCredentials() { } } + @Test + public void testAdminClientWithInvalidCredentials() { + Map props = new HashMap<>(saslClientConfigs); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + try (AdminClient client = AdminClient.create(props)) { + DescribeTopicsResult result = client.describeTopics(Collections.singleton("test")); + result.all().get(); + fail("Expected an authentication error!"); + } catch (Exception e) { + assertTrue("Expected AuthenticationFailedException, got " + e.getClass(), e.getCause() instanceof AuthenticationFailedException); + } + } + + @Test + public void testTransactionalProducerWithInvalidCredentials() throws Exception { + Map props = new HashMap<>(saslClientConfigs); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1"); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); + + try (KafkaProducer producer = new KafkaProducer<>(props)) { + producer.initTransactions(); + fail("Expected an authentication error!"); + } catch (AuthenticationFailedException e) { + // expected exception + } + } + private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception { return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol); } diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index 52fbdba45464d..41b3e51810ff4 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -13,12 +13,14 @@ package kafka.api import java.io.FileOutputStream +import java.util.Collections import java.util.concurrent.{ExecutionException, Future, TimeUnit} -import scala.collection.JavaConverters.seqAsJavaListConverter +import scala.collection.JavaConverters._ +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.AuthenticationFailedException import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.serialization.ByteArrayDeserializer @@ -39,9 +41,13 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with val producerCount = 1 val serverCount = 1 + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") + this.serverConfig.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") + this.serverConfig.setProperty(KafkaConfig.TransactionsTopicMinISRProp, "1") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val topic = "topic" + val numPartitions = 1 val tp = new TopicPartition(topic, 0) override def configureSecurityBeforeServersStart() { @@ -56,7 +62,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, JaasTestUtils.KafkaServerContextName)) super.setUp() - TestUtils.createTopic(this.zkUtils, topic, 1, serverCount, this.servers) + TestUtils.createTopic(this.zkUtils, topic, numPartitions, serverCount, this.servers) } @After @@ -68,11 +74,26 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with @Test def testProducerWithAuthenticationFailure() { verifyAuthenticationException(() => sendOneRecord(10000)) + verifyAuthenticationException(() => producers.head.partitionsFor(topic)) createClientCredential() verifyWithRetry(() => sendOneRecord()) } + @Test + def testTransactionalProducerWithAuthenticationFailure() { + val txProducer = createTransactionalProducer() + verifyAuthenticationException(() => txProducer.initTransactions()) + + createClientCredential() + try { + txProducer.initTransactions() + fail("Transaction initialization should fail after authentication failure") + } catch { + case _: KafkaException => // expected exception + } + } + @Test def testConsumerWithAuthenticationFailure() { val consumer = this.consumers.head @@ -100,12 +121,41 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with private def verifyConsumerWithAuthenticationFailure(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { verifyAuthenticationException(() => consumer.poll(10000)) + verifyAuthenticationException(() => consumer.partitionsFor(topic)) createClientCredential() verifyWithRetry(() => sendOneRecord()) verifyWithRetry(() => assertEquals(1, consumer.poll(1000).count)) } + @Test + def testKafkaAdminClientWithAuthenticationFailure() { + val props = TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val adminClient = AdminClient.create(props) + + def describeTopic(): Unit = { + try { + val response = adminClient.describeTopics(Collections.singleton(topic)).all.get + assertEquals(1, response.size) + response.asScala.foreach { case (topic, description) => + assertEquals(numPartitions, description.partitions.size) + } + } catch { + case e: ExecutionException => throw e.getCause + } + } + + try { + verifyAuthenticationException(() => describeTopic()) + + createClientCredential() + verifyWithRetry(() => describeTopic()) + } finally { + adminClient.close + } + } + @Test def testConsumerGroupServiceWithAuthenticationFailure() { val propsFile = TestUtils.tempFile() @@ -173,4 +223,19 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with } }, s"Operation did not succeed within timeout after $attempts") } + + private def createTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { + producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1") + producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") + producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384") + val txProducer = TestUtils.createNewProducer(brokerList, + securityProtocol = this.securityProtocol, + saslProperties = this.clientSaslProperties, + retries = 1000, + acks = -1, + props = Some(producerConfig)) + producers += txProducer + txProducer + } } From 362ab332ddcb1a7e3af8a37c3ea74651cdf0b21c Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 21 Sep 2017 10:27:48 +0100 Subject: [PATCH 2/3] Address review comments --- .../errors/AuthenticationException.java | 10 ++++----- .../errors/IllegalSaslStateException.java | 1 - ....java => SaslAuthenticationException.java} | 22 ++++++++----------- .../UnsupportedSaslMechanismException.java | 1 - .../kafka/common/network/KafkaChannel.java | 2 +- .../apache/kafka/common/protocol/Errors.java | 6 ++--- .../requests/SaslAuthenticateResponse.java | 2 +- .../SaslClientAuthenticator.java | 3 ++- .../SaslServerAuthenticator.java | 2 +- .../ClientAuthenticationFailureTest.java | 14 ++++++------ ...aslClientsWithInvalidCredentialsTest.scala | 8 +++---- 11 files changed, 31 insertions(+), 40 deletions(-) rename clients/src/main/java/org/apache/kafka/common/errors/{AuthenticationFailedException.java => SaslAuthenticationException.java} (60%) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java index d34b82c62081e..c56ac887cd1af 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java @@ -16,19 +16,17 @@ */ package org.apache.kafka.common.errors; -import javax.net.ssl.SSLException; - /** - * This exception indicates that SSL handshake or SASL authentication has failed. + * This exception indicates that SASL authentication has failed. * On authentication failure, clients abort the operation requested and raise one * of the subclasses of this exception: *

    - * {@link AuthenticationFailedException} if SSL handshake fails with an - * {@link SSLException} or SASL handshake fails with invalid credentials. + * {@link SaslAuthenticationException} if SASL handshake fails with invalid credentials + * or any other failure specific to the SASL mechanism used for authentication *
  • {@link UnsupportedSaslMechanismException} if the SASL mechanism requested by the client * is not supported on the broker.
  • *
  • {@link IllegalSaslStateException} if an unexpected request is received on during SASL - * handshake. This could be due to misconfigured security protocol. + * handshake. This could be due to misconfigured security protocol.
  • *
*/ public class AuthenticationException extends ApiException { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java index 5db3f9e12c7bd..691244a7ebc9a 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java @@ -20,7 +20,6 @@ * This exception indicates unexpected requests prior to SASL authentication. * This could be due to misconfigured security, e.g. if PLAINTEXT protocol * is used to connect to a SASL endpoint. - * */ public class IllegalSaslStateException extends AuthenticationException { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java similarity index 60% rename from clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java rename to clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java index f1e1e6d54ac0f..d128c251a5893 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationFailedException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java @@ -17,27 +17,23 @@ package org.apache.kafka.common.errors; /** - * This exception indicates that SSL handshake or SASL authentication has - * failed. - *

- * SSL handshake failures in clients may indicate client authentication - * failure due to untrusted certificates if server is configured to request - * client certificates. Handshake failures could also indicate misconfigured - * security including protocol/cipher suite mismatch, server certificate - * authentication failure or server host name verification failure. - *

- * SASL authentication failures typically indicate invalid credentials. + * This exception indicates that SASL authentication has failed. The error message + * in the exception indicates the actual cause of failure. *

+ * SASL authentication failures typically indicate invalid credentials, but + * could also include other failures specific to the SASL mechanism used + * for authentication. + *

*/ -public class AuthenticationFailedException extends AuthenticationException { +public class SaslAuthenticationException extends AuthenticationException { private static final long serialVersionUID = 1L; - public AuthenticationFailedException(String message) { + public SaslAuthenticationException(String message) { super(message); } - public AuthenticationFailedException(String message, Throwable cause) { + public SaslAuthenticationException(String message, Throwable cause) { super(message, cause); } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java index a61da8079820e..4db4aee98c07c 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java @@ -19,7 +19,6 @@ /** * This exception indicates that the SASL mechanism requested by the client * is not enabled on the broker. - * */ public class UnsupportedSaslMechanismException extends AuthenticationException { diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 68f9ed620c3da..24cd9cfabea19 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -79,7 +79,7 @@ public void prepare() throws IOException { authenticator.authenticate(); } catch (AuthenticationException e) { switch (authenticator.error()) { - case AUTHENTICATION_FAILED: + case SASL_AUTHENTICATION_FAILED: case ILLEGAL_SASL_STATE: case UNSUPPORTED_SASL_MECHANISM: state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 1039ca0e8d4c9..d7f4d1d8f5660 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.errors.ApiException; -import org.apache.kafka.common.errors.AuthenticationFailedException; +import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ConcurrentTransactionsException; @@ -518,11 +518,11 @@ public ApiException build(String message) { return new LogDirNotFoundException(message); } }), - AUTHENTICATION_FAILED(58, "Authentication failed.", + SASL_AUTHENTICATION_FAILED(58, "SASL Authentication failed.", new ApiExceptionBuilder() { @Override public ApiException build(String message) { - return new AuthenticationFailedException(message); + return new SaslAuthenticationException(message); } }); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java index 1dd0e76983071..c950cb9708063 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java @@ -51,7 +51,7 @@ public static Schema[] schemaVersions() { /** * Possible error codes: - * AUTHENTICATION_FAILED(57) : Authentication failed + * SASL_AUTHENTICATION_FAILED(57) : Authentication failed */ private final Errors error; private final String errorMessage; diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 7cbb756f751ba..8207a5a057efc 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -104,7 +104,8 @@ public enum SaslState { private RequestHeader currentRequestHeader; // Version of SaslAuthenticate request/responses private short saslAuthenticateVersion; - // Sasl authentication error which may be one of NONE, UNSUPPORTED_SASL_MECHANISM, ILLEGAL_SASL_STATE, AUTHENTICATION_FAILED or NETWORK_EXCEPTION + // Sasl authentication error which may be one of NONE, UNSUPPORTED_SASL_MECHANISM, ILLEGAL_SASL_STATE, + // SASL_AUTHENTICATION_FAILED or NETWORK_EXCEPTION private Errors error; public SaslClientAuthenticator(Map configs, diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 46386cf231493..620213120e347 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -385,7 +385,7 @@ private void handleSaslToken(byte[] clientToken) throws IOException { ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken); sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf)); } catch (SaslException e) { - this.error = Errors.AUTHENTICATION_FAILED; + this.error = Errors.SASL_AUTHENTICATION_FAILED; sendKafkaResponse(requestContext, new SaslAuthenticateResponse(this.error, "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism)); throw e; diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index f87681a89419d..33f8a58d7bd6c 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -26,7 +26,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; -import org.apache.kafka.common.errors.AuthenticationFailedException; +import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.NetworkTestUtils; import org.apache.kafka.common.network.NioEchoServer; @@ -84,7 +84,7 @@ public void testConsumerWithInvalidCredentials() { consumer.subscribe(Arrays.asList(topic)); consumer.poll(100); fail("Expected an authentication error!"); - } catch (AuthenticationFailedException e) { + } catch (SaslAuthenticationException e) { // OK } catch (Exception e) { fail("Expected only an authentication error, but another error occurred: " + e.getMessage()); @@ -102,7 +102,8 @@ public void testProducerWithInvalidCredentials() { producer.send(record).get(); fail("Expected an authentication error!"); } catch (Exception e) { - assertTrue("Expected an exception of type AuthenticationFailedException", e.getCause() instanceof AuthenticationFailedException); + assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(), + e.getCause() instanceof SaslAuthenticationException); } } @@ -115,7 +116,8 @@ public void testAdminClientWithInvalidCredentials() { result.all().get(); fail("Expected an authentication error!"); } catch (Exception e) { - assertTrue("Expected AuthenticationFailedException, got " + e.getClass(), e.getCause() instanceof AuthenticationFailedException); + assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(), + e.getCause() instanceof SaslAuthenticationException); } } @@ -126,14 +128,12 @@ public void testTransactionalProducerWithInvalidCredentials() throws Exception { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1"); - props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); - props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); try (KafkaProducer producer = new KafkaProducer<>(props)) { producer.initTransactions(); fail("Expected an authentication error!"); - } catch (AuthenticationFailedException e) { + } catch (SaslAuthenticationException e) { // expected exception } } diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index 41b3e51810ff4..692bdd8f42d86 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -21,7 +21,7 @@ import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.errors.AuthenticationFailedException +import org.apache.kafka.common.errors.SaslAuthenticationException import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.junit.{After, Before, Test} @@ -203,7 +203,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with f() fail("Expected an authentication exception") } catch { - case e: AuthenticationFailedException => + case e: SaslAuthenticationException => // expected exception val elapsedMs = System.currentTimeMillis - startMs assertTrue(s"Poll took too long, elapsed=$elapsedMs", elapsedMs <= 5000) @@ -219,16 +219,14 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with f() true } catch { - case _: AuthenticationFailedException => false + case _: SaslAuthenticationException => false } }, s"Operation did not succeed within timeout after $attempts") } private def createTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1") - producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384") val txProducer = TestUtils.createNewProducer(brokerList, securityProtocol = this.securityProtocol, saslProperties = this.clientSaslProperties, From 4143990b8eae95f877592b40d31c4abe6f234673 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 21 Sep 2017 12:20:21 +0100 Subject: [PATCH 3/3] Address review comments --- checkstyle/import-control.xml | 1 + .../org/apache/kafka/clients/Metadata.java | 13 ++++--- .../kafka/clients/admin/KafkaAdminClient.java | 35 ++++++++----------- .../internals/ConsumerNetworkClient.java | 4 ++- .../clients/producer/internals/Sender.java | 2 +- .../ClientAuthenticationFailureTest.java | 25 ++++++------- ...aslClientsWithInvalidCredentialsTest.scala | 32 ++++++++--------- 7 files changed, 56 insertions(+), 56 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 3329b2d016060..f4d9655a7d3cd 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -41,6 +41,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 5790d88f0c572..3b8c18a6d8dd6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -149,14 +149,15 @@ public synchronized boolean updateRequested() { /** * If any non-retriable authentication exceptions were encountered during - * metadata update, clear and throw the exception. + * metadata update, clear and return the exception. */ - public synchronized void maybeThrowAuthenticationException() { + public synchronized AuthenticationException getAndClearAuthenticationException() { if (authenticationException != null) { AuthenticationException exception = authenticationException; authenticationException = null; - throw exception; - } + return exception; + } else + return null; } /** @@ -169,7 +170,9 @@ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { - maybeThrowAuthenticationException(); + AuthenticationException ex = getAndClearAuthenticationException(); + if (ex != null) + throw ex; if (remainingWaitMs != 0) wait(remainingWaitMs); long elapsed = System.currentTimeMillis() - begin; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index a6837646c8a37..296559fc0f571 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -850,15 +850,10 @@ private void timeoutCallsInFlight(TimeoutProcessor processor, Map> callsToSend) { - AuthenticationException authenticationException = null; - try { - metadata.maybeThrowAuthenticationException(); - } catch (AuthenticationException e) { - authenticationException = e; - } + private void handleAuthenticationException(long now, Map> callsToSend) { + AuthenticationException authenticationException = metadata.getAndClearAuthenticationException(); if (authenticationException == null) { for (Node node : callsToSend.keySet()) { authenticationException = client.authenticationException(node); @@ -868,20 +863,20 @@ private boolean handleAuthenticationException(long now, Map> ca } if (authenticationException != null) { synchronized (this) { - for (Call newCall : newCalls) { - newCall.fail(now, authenticationException); - } - newCalls.clear(); + failCalls(now, newCalls, authenticationException); } for (List calls : callsToSend.values()) { - for (Call call : calls) { - call.handleFailure(authenticationException); - } + failCalls(now, calls, authenticationException); } callsToSend.clear(); - return true; - } else - return false; + } + } + + private void failCalls(long now, List calls, AuthenticationException authenticationException) { + for (Call call : calls) { + call.fail(now, authenticationException); + } + calls.clear(); } /** @@ -1011,9 +1006,7 @@ public void run() { // Update the current time and handle the latest responses. now = time.milliseconds(); - if (handleAuthenticationException(now, callsToSend) && - hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) - break; + handleAuthenticationException(now, callsToSend); handleResponses(now, responses, callsInFlight, correlationIdToCalls); } int numTimedOut = 0; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 43c6358635c23..86fca9e968adc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -135,7 +135,9 @@ public boolean awaitMetadataUpdate(long timeout) { int version = this.metadata.requestUpdate(); do { poll(timeout); - this.metadata.maybeThrowAuthenticationException(); + AuthenticationException ex = this.metadata.getAndClearAuthenticationException(); + if (ex != null) + throw ex; } while (this.metadata.version() == version && time.milliseconds() - startMs < timeout); return this.metadata.version() > version; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9b928b0eb9ab2..d0aeed54565cc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -231,7 +231,7 @@ void run(long now) { } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. - log.trace("Authentication exception while processing transactional request"); + log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index 33f8a58d7bd6c..d878b7214c9fd 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -32,6 +32,8 @@ import org.apache.kafka.common.network.NioEchoServer; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -76,11 +78,11 @@ public void teardown() throws Exception { @Test public void testConsumerWithInvalidCredentials() { - saslClientConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); - saslClientConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - saslClientConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + Map props = new HashMap<>(saslClientConfigs); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + StringDeserializer deserializer = new StringDeserializer(); - try (KafkaConsumer consumer = new KafkaConsumer<>(saslClientConfigs)) { + try (KafkaConsumer consumer = new KafkaConsumer<>(props, deserializer, deserializer)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(100); fail("Expected an authentication error!"); @@ -93,12 +95,12 @@ public void testConsumerWithInvalidCredentials() { @Test public void testProducerWithInvalidCredentials() { - saslClientConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); - saslClientConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - saslClientConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + Map props = new HashMap<>(saslClientConfigs); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + StringSerializer serializer = new StringSerializer(); - ProducerRecord record = new ProducerRecord<>(topic, "message"); - try (KafkaProducer producer = new KafkaProducer<>(saslClientConfigs)) { + try (KafkaProducer producer = new KafkaProducer<>(props, serializer, serializer)) { + ProducerRecord record = new ProducerRecord<>(topic, "message"); producer.send(record).get(); fail("Expected an authentication error!"); } catch (Exception e) { @@ -125,12 +127,11 @@ public void testAdminClientWithInvalidCredentials() { public void testTransactionalProducerWithInvalidCredentials() throws Exception { Map props = new HashMap<>(saslClientConfigs); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + StringSerializer serializer = new StringSerializer(); - try (KafkaProducer producer = new KafkaProducer<>(props)) { + try (KafkaProducer producer = new KafkaProducer<>(props, serializer, serializer)) { producer.initTransactions(); fail("Expected an authentication error!"); } catch (SaslAuthenticationException e) { diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index 692bdd8f42d86..87650405c0766 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -73,17 +73,17 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with @Test def testProducerWithAuthenticationFailure() { - verifyAuthenticationException(() => sendOneRecord(10000)) - verifyAuthenticationException(() => producers.head.partitionsFor(topic)) + verifyAuthenticationException(sendOneRecord(10000)) + verifyAuthenticationException(producers.head.partitionsFor(topic)) createClientCredential() - verifyWithRetry(() => sendOneRecord()) + verifyWithRetry(sendOneRecord()) } @Test def testTransactionalProducerWithAuthenticationFailure() { val txProducer = createTransactionalProducer() - verifyAuthenticationException(() => txProducer.initTransactions()) + verifyAuthenticationException(txProducer.initTransactions()) createClientCredential() try { @@ -120,12 +120,12 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with } private def verifyConsumerWithAuthenticationFailure(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { - verifyAuthenticationException(() => consumer.poll(10000)) - verifyAuthenticationException(() => consumer.partitionsFor(topic)) + verifyAuthenticationException(consumer.poll(10000)) + verifyAuthenticationException(consumer.partitionsFor(topic)) createClientCredential() - verifyWithRetry(() => sendOneRecord()) - verifyWithRetry(() => assertEquals(1, consumer.poll(1000).count)) + verifyWithRetry(sendOneRecord()) + verifyWithRetry(assertEquals(1, consumer.poll(1000).count)) } @Test @@ -147,10 +147,10 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with } try { - verifyAuthenticationException(() => describeTopic()) + verifyAuthenticationException(describeTopic()) createClientCredential() - verifyWithRetry(() => describeTopic()) + verifyWithRetry(describeTopic()) } finally { adminClient.close } @@ -174,9 +174,9 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with val consumer = consumers.head consumer.subscribe(List(topic).asJava) - verifyAuthenticationException(() => consumerGroupService.listGroups) + verifyAuthenticationException(consumerGroupService.listGroups) createClientCredential() - verifyWithRetry(() => consumer.poll(1000)) + verifyWithRetry(consumer.poll(1000)) assertEquals(1, consumerGroupService.listGroups.size) } @@ -197,10 +197,10 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with } } - private def verifyAuthenticationException(f: () => Unit): Unit = { + private def verifyAuthenticationException(action: => Unit): Unit = { val startMs = System.currentTimeMillis try { - f() + action fail("Expected an authentication exception") } catch { case e: SaslAuthenticationException => @@ -211,12 +211,12 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with } } - private def verifyWithRetry(f: () => Unit): Unit = { + private def verifyWithRetry(action: => Unit): Unit = { var attempts = 0 TestUtils.waitUntilTrue(() => { try { attempts += 1 - f() + action true } catch { case _: SaslAuthenticationException => false