From 5b86e1e517b2e0a752b530259587f0d1e91b940c Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Mon, 15 Jan 2024 15:37:03 +0800 Subject: [PATCH] feat(core): remove consumer group management for ab consumer Signed-off-by: Shichao Nie --- .../kafka/autobalancer/LoadRetriever.java | 74 +++++++++++++------ .../config/AutoBalancerControllerConfig.java | 5 -- .../kafka/autobalancer/LoadRetrieverTest.java | 4 +- 3 files changed, 54 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java index d503a57e43..47152d4841 100644 --- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java +++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java @@ -31,6 +31,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsResponseData; @@ -39,7 +41,6 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.Controller; import org.apache.kafka.controller.ControllerRequestContext; @@ -51,6 +52,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.OptionalLong; import java.util.Properties; @@ -64,7 +66,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class LoadRetriever implements Runnable, BrokerStatusListener { +public class LoadRetriever implements BrokerStatusListener { public static final Random RANDOM = new Random(); private final Logger logger; private final Map bootstrapServerMap; @@ -74,15 +76,15 @@ public class LoadRetriever implements Runnable, BrokerStatusListener { private final String metricReporterTopicCleanupPolicy; private final long consumerPollTimeout; private final String consumerClientIdPrefix; - private final String consumerGroupIdPrefix; private final long consumerRetryBackOffMs; private final ClusterModel clusterModel; - private final KafkaThread retrieveTask; private final Lock lock; private final Condition cond; private final Controller controller; - private final ScheduledExecutorService executorService; + private final ScheduledExecutorService createTopicExecutorService; + private final ScheduledExecutorService mainExecutorService; private final Set brokerIdsInUse; + private final Set currentAssignment = new HashSet<>(); private volatile boolean leaderEpochInitialized; private volatile boolean isLeader; private volatile Consumer consumer; @@ -103,7 +105,8 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller, this.brokerIdsInUse = new HashSet<>(); this.lock = new ReentrantLock(); this.cond = lock.newCondition(); - this.executorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("metric-topic-initializer")); + this.createTopicExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-create-topic")); + this.mainExecutorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("load-retriever-main")); leaderEpochInitialized = false; metricReporterTopic = config.getString(AutoBalancerConfig.AUTO_BALANCER_TOPIC_CONFIG); metricReporterTopicPartition = config.getInt(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG); @@ -111,22 +114,38 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller, metricReporterTopicCleanupPolicy = config.getString(AutoBalancerConfig.AUTO_BALANCER_METRICS_TOPIC_CLEANUP_POLICY); consumerPollTimeout = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT); consumerClientIdPrefix = config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX); - consumerGroupIdPrefix = config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX); consumerRetryBackOffMs = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS); - retrieveTask = KafkaThread.daemon("retrieve-load-task", this); } public void start() { this.shutdown = false; - this.executorService.scheduleAtFixedRate(this::checkAndCreateTopic, 0, 1L, TimeUnit.MINUTES); - retrieveTask.start(); + this.createTopicExecutorService.scheduleAtFixedRate(this::checkAndCreateTopic, 1, 1L, TimeUnit.MINUTES); + this.mainExecutorService.schedule(this::retrieve, 0, TimeUnit.MILLISECONDS); logger.info("Started"); } public void shutdown() { this.shutdown = true; - this.executorService.shutdown(); - retrieveTask.interrupt(); + this.createTopicExecutorService.shutdown(); + this.mainExecutorService.shutdown(); + try { + if (!createTopicExecutorService.awaitTermination(5, TimeUnit.SECONDS)) { + this.createTopicExecutorService.shutdownNow(); + } + if (!mainExecutorService.awaitTermination(5, TimeUnit.SECONDS)) { + this.mainExecutorService.shutdownNow(); + } + } catch (InterruptedException ignored) { + + } + + if (this.consumer != null) { + try { + this.consumer.close(Duration.ofMillis(5000)); + } catch (Exception e) { + logger.error("Exception when close consumer: {}", e.getMessage()); + } + } logger.info("Shutdown completed"); } @@ -135,10 +154,9 @@ private KafkaConsumer createConsumer(String bootstr Properties consumerProps = new Properties(); consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientIdPrefix + "-consumer-" + randomToken); - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupIdPrefix + "-group-" + randomToken); consumerProps.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, Long.toString(consumerRetryBackOffMs)); consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName()); return new KafkaConsumer<>(consumerProps); @@ -274,6 +292,7 @@ private void checkAndCreateConsumer() { logger.info("No available broker in use, try to close current consumer"); this.consumer.close(Duration.ofSeconds(5)); this.consumer = null; + this.currentAssignment.clear(); logger.info("Consumer closed"); } while (!shutdown && !hasAvailableBroker()) { @@ -296,7 +315,6 @@ private void checkAndCreateConsumer() { checkAndCreateTopic(); //TODO: fetch metadata from controller this.consumer = createConsumer(bootstrapServer); - this.consumer.subscribe(Collections.singleton(metricReporterTopic)); logger.info("Created consumer on {}", bootstrapServer); } } @@ -341,14 +359,17 @@ private void checkAndCreateTopic() { } } - @Override - public void run() { + public void retrieve() { while (!shutdown) { checkAndCreateConsumer(); if (shutdown) { break; } try { + if (!refreshAssignment()) { + this.mainExecutorService.schedule(this::retrieve, 1, TimeUnit.SECONDS); + break; + } ConsumerRecords records = this.consumer.poll(Duration.ofMillis(consumerPollTimeout)); for (ConsumerRecord record : records) { if (record == null) { @@ -364,13 +385,22 @@ public void run() { logger.error("Consumer poll error: {}", e.getMessage()); } } - if (this.consumer != null) { - try { - this.consumer.close(Duration.ofMillis(5000)); - } catch (Exception e) { - logger.error("Exception when close consumer: {}", e.getMessage()); + } + + private boolean refreshAssignment() { + List partitionInfos = this.consumer.partitionsFor(metricReporterTopic); + if (partitionInfos.isEmpty()) { + logger.info("No partitions found for topic {}", metricReporterTopic); + return false; + } + if (partitionInfos.size() != currentAssignment.size()) { + for (PartitionInfo partitionInfo : partitionInfos) { + TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); + currentAssignment.add(topicPartition); } + this.consumer.assign(currentAssignment); } + return true; } public void onLeaderChanged(boolean isLeader) { diff --git a/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java b/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java index b033b1fb2d..5445ccc71c 100644 --- a/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java +++ b/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java @@ -34,7 +34,6 @@ public class AutoBalancerControllerConfig extends AutoBalancerConfig { public static final String AUTO_BALANCER_CONTROLLER_ENABLE = PREFIX + "enable"; public static final String AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT = PREFIX + "consumer.poll.timeout"; public static final String AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = PREFIX + "consumer.client.id"; - public static final String AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX = PREFIX + CommonClientConfigs.GROUP_ID_CONFIG; public static final String AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS = PREFIX + CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; public static final String AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS = PREFIX + "metrics.delay.ms"; public static final String AUTO_BALANCER_CONTROLLER_GOALS = PREFIX + "goals"; @@ -54,7 +53,6 @@ public class AutoBalancerControllerConfig extends AutoBalancerConfig { public static final boolean DEFAULT_AUTO_BALANCER_CONTROLLER_ENABLE = false; public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT = 1000L; public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = "AutoBalancerControllerConsumer"; - public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX = "AutoBalancerControllerConsumerGroup"; public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS = 1000; public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS = Duration.ofMinutes(1).toMillis(); public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_GOALS = new StringJoiner(",") @@ -105,9 +103,6 @@ public class AutoBalancerControllerConfig extends AutoBalancerConfig { .define(AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX, ConfigDef.Importance.LOW, AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX_DOC) - .define(AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX, ConfigDef.Type.STRING, - DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX, ConfigDef.Importance.HIGH, - AUTO_BALANCER_CONTROLLER_CONSUMER_GROUP_ID_PREFIX_DOC) .define(AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS, ConfigDef.Type.LONG, DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS, ConfigDef.Importance.HIGH, AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS_DOC) diff --git a/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java b/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java index cd37562de5..9760a2c5d9 100644 --- a/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java +++ b/core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java @@ -114,7 +114,7 @@ public void testLoadRetrieverShutdown() { cluster.controllers().values().iterator().next().controller(), clusterModel); loadRetriever.start(); - Assertions.assertTimeout(Duration.ofMillis(5000), loadRetriever::shutdown); + Assertions.assertTimeout(Duration.ofMillis(15000), loadRetriever::shutdown); LoadRetriever loadRetriever2 = new LoadRetriever(config, cluster.controllers().values().iterator().next().controller(), clusterModel); @@ -131,7 +131,7 @@ public void testLoadRetrieverShutdown() { .setPort(endpoint.port()) .setSecurityProtocol(endpoint.securityProtocol().id)).iterator())); loadRetriever2.onBrokerRegister(record); - Assertions.assertTimeout(Duration.ofMillis(5000), loadRetriever2::shutdown); + Assertions.assertTimeout(Duration.ofMillis(15000), loadRetriever2::shutdown); } @Test