Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8662; Fix producer metadata error handling and consumer manual assignment #7086

Merged
merged 4 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 64 additions & 18 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -44,6 +45,7 @@
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* A class encapsulating some of the logic around metadata.
Expand All @@ -66,7 +68,8 @@ public class Metadata implements Closeable {
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
private KafkaException fatalException;
private KafkaException recoverableException;
private Set<String> invalidTopics;
private Set<String> unauthorizedTopics;
private MetadataCache cache = MetadataCache.empty();
private boolean needUpdate;
private final ClusterResourceListeners clusterResourceListeners;
Expand Down Expand Up @@ -97,6 +100,8 @@ public Metadata(long refreshBackoffMs,
this.clusterResourceListeners = clusterResourceListeners;
this.isClosed = false;
this.lastSeenLeaderEpochs = new HashMap<>();
this.invalidTopics = Collections.emptySet();
this.unauthorizedTopics = Collections.emptySet();
}

/**
Expand Down Expand Up @@ -204,16 +209,6 @@ public synchronized Optional<MetadataCache.PartitionInfoAndEpoch> partitionInfoI
}
}

/**
* If any non-retriable exceptions were encountered during metadata update, clear and return the exception.
*/
public synchronized KafkaException getAndClearMetadataException() {
KafkaException metadataException = Optional.ofNullable(fatalException).orElse(recoverableException);
fatalException = null;
recoverableException = null;
return metadataException;
}

public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) {
this.needUpdate = true;
this.lastRefreshMs = now;
Expand Down Expand Up @@ -272,24 +267,22 @@ public synchronized void update(int requestVersion, MetadataResponse response, l

private void maybeSetMetadataError(Cluster cluster) {
// if we encounter any invalid topics, cache the exception to later throw to the user
Copy link
Contributor

@hachikuji hachikuji Jul 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this comment seems a little stale. We could probably just drop it.

recoverableException = null;
clearRecoverableErrors();
checkInvalidTopics(cluster);
checkUnauthorizedTopics(cluster);
}

private void checkInvalidTopics(Cluster cluster) {
if (!cluster.invalidTopics().isEmpty()) {
log.error("Metadata response reported invalid topics {}", cluster.invalidTopics());
// We may be able to recover from this exception if metadata for this topic is no longer needed
recoverableException = new InvalidTopicException(cluster.invalidTopics());
invalidTopics = new HashSet<>(cluster.invalidTopics());
}
}

private void checkUnauthorizedTopics(Cluster cluster) {
if (!cluster.unauthorizedTopics().isEmpty()) {
log.error("Topic authorization failed for topics {}", cluster.unauthorizedTopics());
// We may be able to recover from this exception if metadata for this topic is no longer needed
recoverableException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
unauthorizedTopics = new HashSet<>(cluster.unauthorizedTopics());
}
}

Expand Down Expand Up @@ -360,12 +353,65 @@ private void updatePartitionInfo(String topic,
}
}

public synchronized void maybeThrowException() {
KafkaException metadataException = getAndClearMetadataException();
/**
* If any non-retriable exceptions were encountered during metadata update, clear and throw the exception.
* This is used by the consumer to propagate any fatal exceptions or topic exceptions for any of the topics
* in the consumer's Metadata.
*/
public synchronized void maybeThrowAnyException() {
clearErrorsAndMaybeThrowException(this::recoverableException);
}

/**
* If any fatal exceptions were encountered during metadata update, throw the exception. All
* exceptions from the last metadata update are cleared. This is used by the producer to abort waiting
* for metadata if there were fatal exceptions (e.g. authentication failures) in the last metadata update.
*/
public synchronized void maybeThrowFatalException() {
clearErrorsAndMaybeThrowException(() -> null);
}

/**
* If any non-retriable exceptions were encountered during metadata update, throw exception if the exception
* is fatal or related to the specified topic. All exceptions from the last metadata update are cleared.
* This is used by the producer to propagate topic metadata errors for send requests.
*/
public synchronized void maybeThrowExceptionForTopic(String topic) {
clearErrorsAndMaybeThrowException(() -> recoverableExceptionForTopic(topic));
}

private void clearErrorsAndMaybeThrowException(Supplier<KafkaException> recoverableExceptionSupplier) {
KafkaException metadataException = Optional.ofNullable(fatalException).orElseGet(recoverableExceptionSupplier);
fatalException = null;
clearRecoverableErrors();
if (metadataException != null)
throw metadataException;
}

// We may be able to recover from this exception if metadata for this topic is no longer needed
private KafkaException recoverableException() {
if (!unauthorizedTopics.isEmpty())
return new TopicAuthorizationException(unauthorizedTopics);
else if (!invalidTopics.isEmpty())
return new InvalidTopicException(invalidTopics);
else
return null;
}

private KafkaException recoverableExceptionForTopic(String topic) {
if (unauthorizedTopics.contains(topic))
return new TopicAuthorizationException(Collections.singleton(topic));
else if (invalidTopics.contains(topic))
return new InvalidTopicException(Collections.singleton(topic));
else
return null;
}

private void clearRecoverableErrors() {
invalidTopics = Collections.emptySet();
unauthorizedTopics = Collections.emptySet();
}

/**
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup
// called without the lock to avoid deadlock potential if handlers need to acquire locks
firePendingCompletedRequests();

metadata.maybeThrowException();
metadata.maybeThrowAnyException();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,12 @@ private boolean changeSubscription(Set<String> topicsToSubscribe) {
return false;

subscription = topicsToSubscribe;
groupSubscription = new HashSet<>(groupSubscription);
groupSubscription.addAll(topicsToSubscribe);
if (subscriptionType != SubscriptionType.USER_ASSIGNED) {
groupSubscription = new HashSet<>(groupSubscription);
groupSubscription.addAll(topicsToSubscribe);
} else {
groupSubscription = new HashSet<>(topicsToSubscribe);
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
metadata.maybeThrowException();
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs
long currentTimeMs = time.milliseconds();
long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;
time.waitObject(this, () -> {
maybeThrowException();
return updateVersion() > lastVersion || isClosed();
boolean done = updateVersion() > lastVersion || isClosed();
// Propagate fatal exceptions if we haven't yet processed required metadata version to avoid unnecessary wait.
// If metadata has been updated to the required version, don't clear error state so that caller can process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow this. Aren't fatal errors always thrown before topic errors?

// errors related to the topic being processed.
if (!done)
maybeThrowFatalException();
return done;
}, deadlineMs);

if (isClosed())
Expand Down
47 changes: 41 additions & 6 deletions clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -403,18 +403,18 @@ public void testInvalidTopicError() {
Collections.singletonMap(invalidTopic, Errors.INVALID_TOPIC_EXCEPTION), Collections.emptyMap());
metadata.update(invalidTopicResponse, time.milliseconds());

InvalidTopicException e = assertThrows(InvalidTopicException.class, () -> metadata.maybeThrowException());
InvalidTopicException e = assertThrows(InvalidTopicException.class, () -> metadata.maybeThrowAnyException());

assertEquals(Collections.singleton(invalidTopic), e.invalidTopics());
// We clear the exception once it has been raised to the user
assertNull(metadata.getAndClearMetadataException());
metadata.maybeThrowAnyException();

// Reset the invalid topic error
metadata.update(invalidTopicResponse, time.milliseconds());

// If we get a good update, the error should clear even if we haven't had a chance to raise it to the user
metadata.update(emptyMetadataResponse(), time.milliseconds());
assertNull(metadata.getAndClearMetadataException());
metadata.maybeThrowAnyException();
}

@Test
Expand All @@ -426,17 +426,52 @@ public void testTopicAuthorizationError() {
Collections.singletonMap(invalidTopic, Errors.TOPIC_AUTHORIZATION_FAILED), Collections.emptyMap());
metadata.update(unauthorizedTopicResponse, time.milliseconds());

TopicAuthorizationException e = assertThrows(TopicAuthorizationException.class, () -> metadata.maybeThrowException());
TopicAuthorizationException e = assertThrows(TopicAuthorizationException.class, () -> metadata.maybeThrowAnyException());
assertEquals(Collections.singleton(invalidTopic), e.unauthorizedTopics());
// We clear the exception once it has been raised to the user
assertNull(metadata.getAndClearMetadataException());
metadata.maybeThrowAnyException();

// Reset the unauthorized topic error
metadata.update(unauthorizedTopicResponse, time.milliseconds());

// If we get a good update, the error should clear even if we haven't had a chance to raise it to the user
metadata.update(emptyMetadataResponse(), time.milliseconds());
assertNull(metadata.getAndClearMetadataException());
metadata.maybeThrowAnyException();
}

@Test
public void testMetadataTopicErrors() {
Time time = new MockTime();

Map<String, Errors> topicErrors = new HashMap<>(3);
topicErrors.put("invalidTopic", Errors.INVALID_TOPIC_EXCEPTION);
topicErrors.put("sensitiveTopic1", Errors.TOPIC_AUTHORIZATION_FAILED);
topicErrors.put("sensitiveTopic2", Errors.TOPIC_AUTHORIZATION_FAILED);
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("clusterId", 1, topicErrors, Collections.emptyMap());

metadata.update(metadataResponse, time.milliseconds());
TopicAuthorizationException e1 = assertThrows(TopicAuthorizationException.class,
() -> metadata.maybeThrowExceptionForTopic("sensitiveTopic1"));
assertEquals(Collections.singleton("sensitiveTopic1"), e1.unauthorizedTopics());
// We clear the exception once it has been raised to the user
metadata.maybeThrowAnyException();

metadata.update(metadataResponse, time.milliseconds());
TopicAuthorizationException e2 = assertThrows(TopicAuthorizationException.class,
() -> metadata.maybeThrowExceptionForTopic("sensitiveTopic2"));
assertEquals(Collections.singleton("sensitiveTopic2"), e2.unauthorizedTopics());
metadata.maybeThrowAnyException();

metadata.update(metadataResponse, time.milliseconds());
InvalidTopicException e3 = assertThrows(InvalidTopicException.class,
() -> metadata.maybeThrowExceptionForTopic("invalidTopic"));
assertEquals(Collections.singleton("invalidTopic"), e3.invalidTopics());
metadata.maybeThrowAnyException();

// Other topics should not throw exception, but they should clear existing exception
metadata.update(metadataResponse, time.milliseconds());
metadata.maybeThrowExceptionForTopic("anotherTopic");
metadata.maybeThrowAnyException();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public void testUserAssignment() {
new TopicPartition("bar", 0),
new TopicPartition("__consumer_offsets", 0)));
testBasicSubscription(Utils.mkSet("foo", "bar"), Utils.mkSet("__consumer_offsets"));

subscription.assignFromUser(Utils.mkSet(
new TopicPartition("baz", 0),
new TopicPartition("__consumer_offsets", 0)));
testBasicSubscription(Utils.mkSet("baz"), Utils.mkSet("__consumer_offsets"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -242,7 +241,7 @@ public void testAuthenticationExceptionPropagatedFromMetadata() {
fail("Expected authentication error thrown");
} catch (AuthenticationException e) {
// After the exception is raised, it should have been cleared
assertNull(metadata.getAndClearMetadataException());
metadata.maybeThrowAnyException();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
Expand All @@ -37,6 +39,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertThrows;

public class ProducerMetadataTest {

Expand Down Expand Up @@ -174,6 +177,13 @@ public void testTopicExpiry() {
}
}

@Test
public void testMetadataWaitAbortedOnFatalException() throws Exception {
Time time = new MockTime();
metadata.failedUpdate(time.milliseconds(), new AuthenticationException("Fatal exception from test"));
assertThrows(AuthenticationException.class, () -> metadata.awaitUpdate(0, 1000));
}

private MetadataResponse responseWithCurrentTopics() {
return responseWithTopics(metadata.topics());
}
Expand Down
Loading