Skip to content

Commit

Permalink
Fix the UTO upgrade issue reported in strimzi#9470.
Browse files Browse the repository at this point in the history
This issue is caused by stale metadata of one or more brokers after restarting the cluster (no risk of data loss).
Using the reproducer, we can see that the UTO fails at 14:27:39 with UnknownTopicOrPartitionException (retriable), while one of the brokers first knows about my-topic at 14:27:44.
This triggers topic creation logic which fails with TopicExistsException.

UTO log:
2023-12-17 14:27:39,55262 TRACE [kafka-admin-client-thread | strimzi-topic-operator-a93c1635-76c3-4c9f-b61f-68c1a6ac98c3] BatchingTopicController:754 - Admin.describeTopics([__strimzi_store_topic, strimzi.cruisecontrol.partitionmetricsamples, __strimzi-topic-operator-kstreams-topic-store-changelog, timer-topic, connect-cluster-status, strimzi.cruisecontrol.modeltrainingsamples, strimzi.cruisecontrol.metrics, my-topic, __consumer_offsets, connect-cluster-offsets]) failed with java.util.concurrent.CompletionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.

Broker log:
2023-12-17 14:27:44,209 TRACE [Broker id=1000] Cached leader info UpdateMetadataPartitionState(topicName='my-topic', partitionIndex=0, controllerEpoch=1, leader=2000, leaderEpoch=4, isr=[1001, 2000, 2001], zkVersion=7, replicas=[2000, 2001, 1001], offlineReplicas=[]) for partition my-topic-0 in response to UpdateMetadata request sent by controller 1001 epoch 2 with correlation id 0 (state.change.logger) [control-plane-kafka-request-handler-0]

I'm proposing to catch and ignore the TopicExistsException, wich is also what BTO does.
If the topic was created by a third party before the UTO, the next reconciliation will try to revert any configuration drift.

Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
  • Loading branch information
fvaleri committed Dec 18, 2023
1 parent 2f13640 commit f9ffc1f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

import java.io.InterruptedIOException;
Expand Down Expand Up @@ -283,7 +284,12 @@ private PartitionedByError<ReconcilableTopic, Void> createTopics(List<Reconcilab
values.get(reconcilableTopic.topicName()).get();
return pair(reconcilableTopic, Either.ofRight((null)));
} catch (ExecutionException e) {
return pair(reconcilableTopic, Either.ofLeft(handleAdminException(e)));
if (e.getCause() != null && e.getCause() instanceof TopicExistsException) {
// we treat this as a success, the next reconciliation checks the configuration
return pair(reconcilableTopic, Either.ofRight((null)));
} else {
return pair(reconcilableTopic, Either.ofLeft(handleAdminException(e)));
}
} catch (InterruptedException e) {
throw new UncheckedInterruptedException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.logging.log4j.Level;
Expand Down Expand Up @@ -1883,7 +1884,7 @@ public void shouldFailIfNumPartitionsDivergedWithConfigChange(@BrokerConfig(name
"Decreasing partitions not supported"));
}

private static <T> KafkaFuture<T> failedFuture(Throwable error) throws ExecutionException, InterruptedException {
private static <T> KafkaFuture<T> failedFuture(Throwable error) {
var future = new KafkaFutureImpl<T>();
future.completeExceptionally(error);
return future;
Expand Down Expand Up @@ -2080,4 +2081,22 @@ public void shouldNotReconcileKafkaTopicWithMissingSpec(

assertNotExistsInKafka(expectedTopicName(created));
}

@Test
public void shouldReconcileOnTopicExistsException(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
var config = topicOperatorConfig(NAMESPACE, kafkaCluster);
var topicName = randomTopicName();

var creteTopicResult = mock(CreateTopicsResult.class);
var existsException = new TopicExistsException(format("Topic '%s' already exists.", topicName));
Mockito.doReturn(failedFuture(existsException)).when(creteTopicResult).all();
Mockito.doReturn(Map.of(topicName, failedFuture(existsException))).when(creteTopicResult).values();
operatorAdmin = new Admin[]{Mockito.spy(Admin.create(config.adminClientConfig()))};
Mockito.doReturn(creteTopicResult).when(operatorAdmin[0]).createTopics(any());

KafkaTopic kafkaTopic = createTopic(kafkaCluster, kafkaTopic(NAMESPACE, topicName, true, topicName, 2, 1));
assertTrue(readyIsTrue().test(kafkaTopic));
}
}

0 comments on commit f9ffc1f

Please sign in to comment.