Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public class LoadRetriever implements BrokerStatusListener {
private final Lock lock;
private final Condition cond;
private final Controller controller;
private final ScheduledExecutorService createTopicExecutorService;
private final ScheduledExecutorService mainExecutorService;
private final Set<Integer> brokerIdsInUse;
private final Set<TopicPartition> currentAssignment = new HashSet<>();
Expand All @@ -105,7 +104,6 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller,
this.brokerIdsInUse = new HashSet<>();
this.lock = new ReentrantLock();
this.cond = lock.newCondition();
this.createTopicExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-create-topic"));
this.mainExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-main"));
leaderEpochInitialized = false;
metricReporterTopic = config.getString(AutoBalancerConfig.AUTO_BALANCER_TOPIC_CONFIG);
Expand All @@ -119,19 +117,14 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller,

public void start() {
this.shutdown = false;
this.createTopicExecutorService.scheduleAtFixedRate(this::checkAndCreateTopic, 1, 1L, TimeUnit.MINUTES);
this.mainExecutorService.schedule(this::retrieve, 0, TimeUnit.MILLISECONDS);
logger.info("Started");
}

public void shutdown() {
this.shutdown = true;
this.createTopicExecutorService.shutdown();
this.mainExecutorService.shutdown();
try {
if (!createTopicExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
this.createTopicExecutorService.shutdownNow();
}
if (!mainExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
this.mainExecutorService.shutdownNow();
}
Expand Down Expand Up @@ -325,6 +318,10 @@ private void checkAndCreateTopic() {
return;
}

if (!hasAvailableBroker()) {
return;
}

CreateTopicsRequestData request = new CreateTopicsRequestData();
CreateTopicsRequestData.CreatableTopicCollection topicCollection = new CreateTopicsRequestData.CreatableTopicCollection();
CreateTopicsRequestData.CreateableTopicConfigCollection configCollection = new CreateTopicsRequestData.CreateableTopicConfigCollection();
Expand Down Expand Up @@ -367,6 +364,7 @@ public void retrieve() {
}
try {
if (!refreshAssignment()) {
checkAndCreateTopic();
this.mainExecutorService.schedule(this::retrieve, 1, TimeUnit.SECONDS);
break;
}
Expand Down