Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5947: Handle authentication failure in admin client, txn producer #3928

Closed

Conversation

rajinisivaram
Copy link
Contributor

  1. Raise AuthenticationException for authentication failures in admin client
  2. Handle AuthenticationException as a fatal error for transactional producer
  3. Add comments to authentication exceptions

@rajinisivaram
Copy link
Contributor Author

@vahidhashemian @hachikuji Can you review, please? Thanks.

* </p><p>
* SASL authentication failures typically indicate invalid credentials.
* <p>
*/
public class AuthenticationFailedException extends AuthenticationException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be called InvalidCredentialsException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, we have a single error code and exception for authentication failure with an error message giving more details. For SSL, this includes any handshake exception (e.g. server host name verification failure). At the moment, we are reporting only invalid credentials for SASL, but we may add others too. Since we convert SSLException and SaslException from the providers, it is simpler to maintain a more generic AuthenticationFailed exception rather than separate exceptions for different failures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue I have is that AuthenticationFailedException is too similar to AuthenticationException.

Copy link
Contributor Author

@rajinisivaram rajinisivaram Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about SaslAuthenticationFailedException and SslAuthenticationFailedException?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add without the Failed to be consistent with TopicAuthorizationException etc. So SaslAuthenticationException and SslAuthenticationException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read it a bit too quickly on my phone and I hadn't seen the "Failed" part. What you did in the end is what I had in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the SSL exception part from the doc. Will add SslAuthenticationException and the associated docs in #3918

@vahidhashemian
Copy link
Contributor

@rajinisivaram Thanks for the PR. It looks good to me after a quick review.
I like the alternate exception name @ijuma suggested.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR,. Looks good overall, just a few minor comments.

@@ -974,6 +1011,9 @@ public void run() {

// Update the current time and handle the latest responses.
now = time.milliseconds();
if (handleAuthenticationException(now, callsToSend) &&
hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to exit this background thread until a close has been requested. If we are closing and there is an authentication exception, we want to terminate immediately. If we are not closing, the thread is kept alive so that a future request after credentials are added can succeed. This is tested in the integration test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what's the impact of not exiting immediately? It seems like we would not have any responses anyway, so it should happen pretty quickly either way (and all pending requests have been cancelled). Maybe keep it simple?

@@ -579,6 +580,11 @@ synchronized void retry(TxnRequestHandler request) {
enqueueRequest(request);
}

synchronized void authenticationFailed(AuthenticationException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth making this more generic and simply call it fatalError or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... The code was changed from generic to more specific in the other PR based on review comments. So I thought this should be specific too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we can leave it like that then.

* This exception indicates unexpected requests prior to SASL authentication.
* This could be due to misconfigured security, e.g. if PLAINTEXT protocol
* is used to connect to a SASL endpoint.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: empty line.

/**
* This exception indicates that the SASL mechanism requested by the client
* is not enabled on the broker.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: empty line.

result.all().get();
fail("Expected an authentication error!");
} catch (Exception e) {
assertTrue("Expected AuthenticationFailedException, got " + e.getClass(), e.getCause() instanceof AuthenticationFailedException);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: long line.

Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pass the serializers in the constructor and it's a bit more concise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add to the constructors, we need to remove the import restrictions on the package. It feels better to do them as properties instead since this test is currently in the security package. @vahidhashemian did that to avoid changing the restrictions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialization package is open to everyone (it's public API and at the lowest layer). So I don't think we should worry about that. It's not like we're avoiding a dependency here, we are just hiding it via a string based config (that still requires the default constructor to be present).

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copied and pasted earlier :-), removed now.

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also the default, I think.

producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1")
producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5")
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and max in flight are defaults right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, removed.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, a few more comments.

private boolean handleAuthenticationException(long now, Map<Node, List<Call>> callsToSend) {
AuthenticationException authenticationException = null;
try {
metadata.maybeThrowAuthenticationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a method that is like this one, but returns the exception instead? It seems a bit convoluted to throw a stored exception just to catch it. Not sure if that helps other cases, but at least here, it seems to make the code cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the existing method to return the exception rather than throw it.

@@ -579,6 +580,11 @@ synchronized void retry(TxnRequestHandler request) {
enqueueRequest(request);
}

synchronized void authenticationFailed(AuthenticationException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we can leave it like that then.

Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialization package is open to everyone (it's public API and at the lowest layer). So I don't think we should worry about that. It's not like we're avoiding a dependency here, we are just hiding it via a string based config (that still requires the default constructor to be present).

@@ -68,12 +74,27 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
@Test
def testProducerWithAuthenticationFailure() {
verifyAuthenticationException(() => sendOneRecord(10000))
verifyAuthenticationException(() => producers.head.partitionsFor(topic))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can change the parameter to be by name (=> Unit instead of () => Unit) and then you don't need the noisy () => in every invocation.

}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to include the exception message or stacktrace?

}
for (List<Call> calls : callsToSend.values()) {
for (Call call : calls) {
call.handleFailure(authenticationException);
Copy link
Contributor

@ijuma ijuma Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we calling handleFailure directly instead of fail? If we used fail in both places, we could extract a method and call it twice in succession, once for newCalls and once for callsToSend.

@@ -974,6 +1011,9 @@ public void run() {

// Update the current time and handle the latest responses.
now = time.milliseconds();
if (handleAuthenticationException(now, callsToSend) &&
hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what's the impact of not exiting immediately? It seems like we would not have any responses anyway, so it should happen pretty quickly either way (and all pending requests have been cancelled). Maybe keep it simple?

@rajinisivaram
Copy link
Contributor Author

@vahidhashemian @ijuma Thanks for the reviews. Have addressed the comments so far.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll merge it to trunk once the tests pass. If there are additional comments, we can do a follow-up.

@asfgit asfgit closed this in 96ba21e Sep 21, 2017
@@ -200,32 +201,38 @@ public void run() {
*/
void run(long now) {
if (transactionManager != null) {
if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji, if you have a chance, have a look and see if this is OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants