Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 52 additions & 22 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
Expand All @@ -39,7 +41,6 @@
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ControllerRequestContext;
Expand All @@ -51,6 +52,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Properties;
Expand All @@ -64,7 +66,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LoadRetriever implements Runnable, BrokerStatusListener {
public class LoadRetriever implements BrokerStatusListener {
public static final Random RANDOM = new Random();
private final Logger logger;
private final Map<Integer, BrokerEndpoints> bootstrapServerMap;
Expand All @@ -74,15 +76,15 @@ public class LoadRetriever implements Runnable, BrokerStatusListener {
private final String metricReporterTopicCleanupPolicy;
private final long consumerPollTimeout;
private final String consumerClientIdPrefix;
private final String consumerGroupIdPrefix;
private final long consumerRetryBackOffMs;
private final ClusterModel clusterModel;
private final KafkaThread retrieveTask;
private final Lock lock;
private final Condition cond;
private final Controller controller;
private final ScheduledExecutorService executorService;
private final ScheduledExecutorService createTopicExecutorService;
private final ScheduledExecutorService mainExecutorService;
private final Set<Integer> brokerIdsInUse;
private final Set<TopicPartition> currentAssignment = new HashSet<>();
private volatile boolean leaderEpochInitialized;
private volatile boolean isLeader;
private volatile Consumer<String, AutoBalancerMetrics> consumer;
Expand All @@ -103,30 +105,47 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller,
this.brokerIdsInUse = new HashSet<>();
this.lock = new ReentrantLock();
this.cond = lock.newCondition();
this.executorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("metric-topic-initializer"));
this.createTopicExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-create-topic"));
this.mainExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-main"));
leaderEpochInitialized = false;
metricReporterTopic = config.getString(AutoBalancerConfig.AUTO_BALANCER_TOPIC_CONFIG);
metricReporterTopicPartition = config.getInt(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG);
metricReporterTopicRetentionTime = config.getLong(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_RETENTION_MS_CONFIG);
metricReporterTopicCleanupPolicy = config.getString(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_CLEANUP_POLICY);
consumerPollTimeout = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT);
consumerClientIdPrefix = config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX);
consumerGroupIdPrefix = config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX);
consumerRetryBackOffMs = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS);
retrieveTask = KafkaThread.daemon("retrieve-load-task", this);
}

public void start() {
this.shutdown = false;
this.executorService.scheduleAtFixedRate(this::checkAndCreateTopic, 0, 1L, TimeUnit.MINUTES);
retrieveTask.start();
this.createTopicExecutorService.scheduleAtFixedRate(this::checkAndCreateTopic, 1, 1L, TimeUnit.MINUTES);
this.mainExecutorService.schedule(this::retrieve, 0, TimeUnit.MILLISECONDS);
logger.info("Started");
}

public void shutdown() {
this.shutdown = true;
this.executorService.shutdown();
retrieveTask.interrupt();
this.createTopicExecutorService.shutdown();
this.mainExecutorService.shutdown();
try {
if (!createTopicExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
this.createTopicExecutorService.shutdownNow();
}
if (!mainExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
this.mainExecutorService.shutdownNow();
}
} catch (InterruptedException ignored) {

}

if (this.consumer != null) {
try {
this.consumer.close(Duration.ofMillis(5000));
} catch (Exception e) {
logger.error("Exception when close consumer: {}", e.getMessage());
}
}
logger.info("Shutdown completed");
}

Expand All @@ -135,10 +154,9 @@ private KafkaConsumer<String, AutoBalancerMetrics> createConsumer(String bootstr
Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientIdPrefix + "-consumer-" + randomToken);
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupIdPrefix + "-group-" + randomToken);
consumerProps.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, Long.toString(consumerRetryBackOffMs));
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
return new KafkaConsumer<>(consumerProps);
Expand Down Expand Up @@ -274,6 +292,7 @@ private void checkAndCreateConsumer() {
logger.info("No available broker in use, try to close current consumer");
this.consumer.close(Duration.ofSeconds(5));
this.consumer = null;
this.currentAssignment.clear();
logger.info("Consumer closed");
}
while (!shutdown && !hasAvailableBroker()) {
Expand All @@ -296,7 +315,6 @@ private void checkAndCreateConsumer() {
checkAndCreateTopic();
//TODO: fetch metadata from controller
this.consumer = createConsumer(bootstrapServer);
this.consumer.subscribe(Collections.singleton(metricReporterTopic));
logger.info("Created consumer on {}", bootstrapServer);
}
}
Expand Down Expand Up @@ -341,14 +359,17 @@ private void checkAndCreateTopic() {
}
}

@Override
public void run() {
public void retrieve() {
while (!shutdown) {
checkAndCreateConsumer();
if (shutdown) {
break;
}
try {
if (!refreshAssignment()) {
this.mainExecutorService.schedule(this::retrieve, 1, TimeUnit.SECONDS);
break;
}
ConsumerRecords<String, AutoBalancerMetrics> records = this.consumer.poll(Duration.ofMillis(consumerPollTimeout));
for (ConsumerRecord<String, AutoBalancerMetrics> record : records) {
if (record == null) {
Expand All @@ -364,13 +385,22 @@ public void run() {
logger.error("Consumer poll error: {}", e.getMessage());
}
}
if (this.consumer != null) {
try {
this.consumer.close(Duration.ofMillis(5000));
} catch (Exception e) {
logger.error("Exception when close consumer: {}", e.getMessage());
}

private boolean refreshAssignment() {
List<PartitionInfo> partitionInfos = this.consumer.partitionsFor(metricReporterTopic);
if (partitionInfos.isEmpty()) {
logger.info("No partitions found for topic {}", metricReporterTopic);
return false;
}
if (partitionInfos.size() != currentAssignment.size()) {
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
currentAssignment.add(topicPartition);
}
this.consumer.assign(currentAssignment);
}
return true;
}

public void onLeaderChanged(boolean isLeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class AutoBalancerControllerConfig extends AutoBalancerConfig {
public static final String AUTO_BALANCER_CONTROLLER_ENABLE = PREFIX + "enable";
public static final String AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT = PREFIX + "consumer.poll.timeout";
public static final String AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = PREFIX + "consumer.client.id";
public static final String AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX = PREFIX + CommonClientConfigs.GROUP_ID_CONFIG;
public static final String AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS = PREFIX + CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
public static final String AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS = PREFIX + "metrics.delay.ms";
public static final String AUTO_BALANCER_CONTROLLER_GOALS = PREFIX + "goals";
Expand All @@ -54,7 +53,6 @@ public class AutoBalancerControllerConfig extends AutoBalancerConfig {
public static final boolean DEFAULT_AUTO_BALANCER_CONTROLLER_ENABLE = false;
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT = 1000L;
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = "AutoBalancerControllerConsumer";
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX = "AutoBalancerControllerConsumerGroup";
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS = 1000;
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS = Duration.ofMinutes(1).toMillis();
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_GOALS = new StringJoiner(",")
Expand Down Expand Up @@ -105,9 +103,6 @@ public class AutoBalancerControllerConfig extends AutoBalancerConfig {
.define(AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX, ConfigDef.Type.STRING,
DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX, ConfigDef.Importance.LOW,
AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX_DOC)
.define(AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX, ConfigDef.Type.STRING,
DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX, ConfigDef.Importance.HIGH,
AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX_DOC)
.define(AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS, ConfigDef.Type.LONG,
DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS, ConfigDef.Importance.HIGH,
AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS_DOC)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testLoadRetrieverShutdown() {
cluster.controllers().values().iterator().next().controller(), clusterModel);
loadRetriever.start();

Assertions.assertTimeout(Duration.ofMillis(5000), loadRetriever::shutdown);
Assertions.assertTimeout(Duration.ofMillis(15000), loadRetriever::shutdown);

LoadRetriever loadRetriever2 = new LoadRetriever(config,
cluster.controllers().values().iterator().next().controller(), clusterModel);
Expand All @@ -131,7 +131,7 @@ public void testLoadRetrieverShutdown() {
.setPort(endpoint.port())
.setSecurityProtocol(endpoint.securityProtocol().id)).iterator()));
loadRetriever2.onBrokerRegister(record);
Assertions.assertTimeout(Duration.ofMillis(5000), loadRetriever2::shutdown);
Assertions.assertTimeout(Duration.ofMillis(15000), loadRetriever2::shutdown);
}

@Test
Expand Down