diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java index 47152d4841..b62669d6ed 100644 --- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java +++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java @@ -81,7 +81,6 @@ public class LoadRetriever implements BrokerStatusListener { private final Lock lock; private final Condition cond; private final Controller controller; - private final ScheduledExecutorService createTopicExecutorService; private final ScheduledExecutorService mainExecutorService; private final Set brokerIdsInUse; private final Set currentAssignment = new HashSet<>(); @@ -105,7 +104,6 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller, this.brokerIdsInUse = new HashSet<>(); this.lock = new ReentrantLock(); this.cond = lock.newCondition(); - this.createTopicExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-create-topic")); this.mainExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-main")); leaderEpochInitialized = false; metricReporterTopic = config.getString(AutoBalancerConfig.AUTO_BALANCER_TOPIC_CONFIG); @@ -119,19 +117,14 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller, public void start() { this.shutdown = false; - this.createTopicExecutorService.scheduleAtFixedRate(this::checkAndCreateTopic, 1, 1L, TimeUnit.MINUTES); this.mainExecutorService.schedule(this::retrieve, 0, TimeUnit.MILLISECONDS); logger.info("Started"); } public void shutdown() { this.shutdown = true; - this.createTopicExecutorService.shutdown(); this.mainExecutorService.shutdown(); try { - if (!createTopicExecutorService.awaitTermination(5, TimeUnit.SECONDS)) { - this.createTopicExecutorService.shutdownNow(); - } if (!mainExecutorService.awaitTermination(5, TimeUnit.SECONDS)) { this.mainExecutorService.shutdownNow(); } @@ -325,6 +318,10 @@ private void checkAndCreateTopic() { return; } + if (!hasAvailableBroker()) { + return; + } + CreateTopicsRequestData request = new CreateTopicsRequestData(); CreateTopicsRequestData.CreatableTopicCollection topicCollection = new CreateTopicsRequestData.CreatableTopicCollection(); CreateTopicsRequestData.CreateableTopicConfigCollection configCollection = new CreateTopicsRequestData.CreateableTopicConfigCollection(); @@ -367,6 +364,7 @@ public void retrieve() { } try { if (!refreshAssignment()) { + checkAndCreateTopic(); this.mainExecutorService.schedule(this::retrieve, 1, TimeUnit.SECONDS); break; }