Skip to content

Commit

Permalink
feat: merge ESK (#54)
Browse files Browse the repository at this point in the history
* fix: fix checkstyle and add ci flow for checkstyle and spotbugs (#106)

ci(s3): add checkstyle and spotbugs in ci

1. add checkstyle and spotbugs in ci
2. fix to pass checkstyle

Signed-off-by: TheR1sing3un <ther1sing3un@163.com>

* fix: fix fetching problem brought by thread pool separating; fix style problems; add more thread for fetching and appending thread pool (#107)

* fix: fix fetching problem brought by thread pool separating; fix style problems; add more thread for fetching and appending thread pool

Signed-off-by: Curtis Wan <wcy9988@163.com>

* remove 'SLOW_FETCH_TIMEOUT_MILLIS - 1' case in quickFetch test and change 2 -> 'SLOW_FETCH_TIMEOUT_MILLIS / 2' in slowFetch test

Signed-off-by: Curtis Wan <wcy9988@163.com>

* refactor: add more comments to make logic more clear

Signed-off-by: Curtis Wan <wcy9988@163.com>

---------

Signed-off-by: Curtis Wan <wcy9988@163.com>

* refactor: close #105; add more threads for partition opening or closing (#109)

Signed-off-by: Curtis Wan <wcy9988@163.com>

* feat(es): client factory SPI (#112)

Signed-off-by: Robin Han <hanxvdovehx@gmail.com>

* fix: close #108; make sure topics are all cleaned at the end of the test (#115)

Signed-off-by: Curtis Wan <wcy9988@163.com>

* fix: close #110; shutdown additional thread pools right now when broker is in shutdown (#111)

* fix: close #110; shutdown additional thread pools right now when broker is in shutdown

Signed-off-by: Curtis Wan <wcy9988@163.com>

* fix: move thread pools into AlwaysSuccessClient

Signed-off-by: Curtis Wan <wcy9988@163.com>

* fix: add more logs; fix partition leading or following

Signed-off-by: Curtis Wan <wcy9988@163.com>

---------

Signed-off-by: Curtis Wan <wcy9988@163.com>

* fix: close #117; fix return too early; add stack for log closing (#118)

Signed-off-by: Curtis Wan <wcy9988@163.com>

* fix: close #114; use position rather than offset as nextOffset for indexes; close metastream when log is closing (#116)

* wip: add more logs

Signed-off-by: Curtis Wan <wcy9988@163.com>

* fix: add ExceptionUtil; use position rather than offset as nextOffset for indexes

Signed-off-by: Curtis Wan <wcy9988@163.com>

* fix: fix style check

Signed-off-by: Curtis Wan <wcy9988@163.com>

---------

Signed-off-by: Curtis Wan <wcy9988@163.com>

* fix: close #119; skip deleting segments when metaStream is closed (#120)

Signed-off-by: Curtis Wan <wcy9988@163.com>

* feat: close #123; Handle no-retryable exceptions thrown by Elastic Stream SDK (#124)

* feat: close #123; Handle no-retryable exceptions thrown by Elastic Stream SDK. The corresponding partitions will be offline; Add context info to pass unit tests for
cases that fetching just follows appending.

Signed-off-by: Curtis Wan <wcy9988@163.com>

* feat: enhancement codes refered in discussions

Signed-off-by: Curtis Wan <wcy9988@163.com>

* fix: style problem

Signed-off-by: Curtis Wan <wcy9988@163.com>

* test: wait for more time for quick/slow fetch tests

Signed-off-by: Curtis Wan <wcy9988@163.com>

* test: start fetching after appending finished

Signed-off-by: Curtis Wan <wcy9988@163.com>

---------

Signed-off-by: Curtis Wan <wcy9988@163.com>

* test: add slowFetchDelay (#135)

Signed-off-by: Curtis Wan <wcy9988@163.com>

* feat(core): Add implementation of AutoBalancer components and unit tests  (#134)

* feat(core): Add customized MetricsReporter and partition-level metrics

- add partition level metrics for BytesInPerSec and BytesOutPerSec
- implement CruiseControlMetrics to monitor and report interested Yammer
metrics

Closes #78

Signed-off-by: sc.nieh <s.c.ney516@gmail.com>

* feat(core): AutoBalancerMetricsReporter optimization

- pre-aggregate for broker and partition level metrics
- fill with empty value for probably missing metrics

#78

Signed-off-by: sc.nieh <s.c.ney516@gmail.com>

* feat(core): Implement load retriever for auto balancer

#77

Signed-off-by: sc.nieh <s.c.ney516@gmail.com>

* feat(core): Implement AutoBalancerManager and AnomalyDetector

#77

Signed-off-by: sc.nieh <s.c.ney516@gmail.com>

* fix(core): Fix inconsistent in-flight request check

Closes #121

Signed-off-by: sc.nieh <s.c.ney516@gmail.com>

* feat(core): Create metrics reporter topic on controller

Signed-off-by: sc.nieh <s.c.ney516@gmail.com>

* fix(core): fix execute interval of ExecutionManager

Signed-off-by: sc.nieh <s.c.ney516@gmail.com>

* fix: add esUnit tag to unit tests of autobalancer

Signed-off-by: sc.nieh <s.c.ney516@gmail.com>

---------

Signed-off-by: sc.nieh <s.c.ney516@gmail.com>

---------

Signed-off-by: TheR1sing3un <ther1sing3un@163.com>
Signed-off-by: Curtis Wan <wcy9988@163.com>
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
Signed-off-by: sc.nieh <s.c.ney516@gmail.com>
Co-authored-by: TheR1sing3un <87409330+TheR1sing3un@users.noreply.github.com>
Co-authored-by: Robin Han <hanxvdovehx@gmail.com>
Co-authored-by: Shichao Nie <s.c.ney516@gmail.com>
  • Loading branch information
4 people committed Sep 4, 2023
1 parent 844f2f8 commit c60bc7d
Show file tree
Hide file tree
Showing 60 changed files with 8,247 additions and 0 deletions.
128 changes: 128 additions & 0 deletions core/src/main/java/kafka/autobalancer/AnomalyDetector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.autobalancer;

import kafka.autobalancer.common.Action;
import kafka.autobalancer.common.AutoBalancerThreadFactory;
import kafka.autobalancer.config.AutoBalancerControllerConfig;
import kafka.autobalancer.goals.AbstractGoal;
import kafka.autobalancer.model.BrokerUpdater;
import kafka.autobalancer.model.ClusterModel;
import kafka.autobalancer.model.ClusterModelSnapshot;
import kafka.autobalancer.model.TopicPartitionReplicaUpdater;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.QuorumController;
import org.slf4j.Logger;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class AnomalyDetector {
private final Logger logger;
private final List<AbstractGoal> goalsByPriority;
private final ClusterModel clusterModel;
private final ScheduledExecutorService executorService;
private final ExecutionManager executionManager;
private final long detectInterval;
private final Set<Integer> excludedBrokers = new HashSet<>();
private final Set<String> excludedTopics = new HashSet<>();
private volatile boolean running;

public AnomalyDetector(AutoBalancerControllerConfig config, QuorumController quorumController, ClusterModel clusterModel, LogContext logContext) {
if (logContext == null) {
logContext = new LogContext("[AnomalyDetector] ");
}
this.logger = logContext.logger(AnomalyDetector.class);
this.goalsByPriority = config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, AbstractGoal.class);
Collections.sort(this.goalsByPriority);
logger.info("Goals: {}", this.goalsByPriority);
this.detectInterval = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS);
fetchExcludedConfig(config);
this.clusterModel = clusterModel;
this.executorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("anomaly-detector"));
this.executionManager = new ExecutionManager(config, quorumController,
new LogContext(String.format("[ExecutionManager id=%d] ", quorumController.nodeId())));
this.running = false;
}

private void fetchExcludedConfig(AutoBalancerControllerConfig config) {
List<String> brokerIds = config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS);
for (String brokerIdStr : brokerIds) {
try {
excludedBrokers.add(Integer.parseInt(brokerIdStr));
} catch (NumberFormatException ignored) {

}
}
List<String> topics = config.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS);
excludedTopics.addAll(topics);
logger.info("Excluded brokers: {}, excluded topics: {}", excludedBrokers, excludedTopics);
}

public void start() {
this.executionManager.start();
this.executorService.scheduleWithFixedDelay(this::detect, detectInterval, detectInterval, TimeUnit.MILLISECONDS);
logger.info("Started");
}

public void shutdown() throws InterruptedException {
this.running = false;
this.executorService.shutdown();
this.executionManager.shutdown();
logger.info("Shutdown completed");
}

public void pause() {
this.running = false;
}

public void resume() {
this.running = true;
}

private void detect() {
if (!this.running) {
return;
}
logger.info("Start detect");
// The delay in processing kraft log could result in outdated cluster snapshot
ClusterModelSnapshot snapshot = this.clusterModel.snapshot(excludedBrokers, excludedTopics);

for (BrokerUpdater.Broker broker : snapshot.brokers()) {
logger.info("Broker status: {}", broker);
for (TopicPartitionReplicaUpdater.TopicPartitionReplica replica : snapshot.replicasFor(broker.getBrokerId())) {
logger.debug("Replica status {}", replica);
}
}

for (AbstractGoal goal : goalsByPriority) {
if (!this.running) {
break;
}
List<Action> actions = goal.optimize(snapshot, goalsByPriority);
logger.debug("Optimized actions {} for goal {}", actions, goal.name());
this.executionManager.appendActions(actions);
}
// TODO: wait for completion of all actions before next detect
}
}
194 changes: 194 additions & 0 deletions core/src/main/java/kafka/autobalancer/AutoBalancerManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.autobalancer;

import kafka.autobalancer.config.AutoBalancerControllerConfig;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.listeners.ClusterStatusListenerRegistry;
import kafka.autobalancer.listeners.TopicPartitionStatusListener;
import kafka.autobalancer.model.ClusterModel;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.KafkaRaftClient;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;

import java.util.List;

public class AutoBalancerManager {
private final Logger logger;
private final LoadRetriever loadRetriever;
private final AnomalyDetector anomalyDetector;
private final KafkaEventQueue queue;
private final QuorumController quorumController;

public AutoBalancerManager(Time time, KafkaConfig kafkaConfig, QuorumController quorumController, KafkaRaftClient<ApiMessageAndVersion> raftClient) {
LogContext logContext = new LogContext(String.format("[AutoBalancerManager id=%d] ", quorumController.nodeId()));
logger = logContext.logger(AutoBalancerManager.class);
AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(kafkaConfig.props(), false);
ClusterModel clusterModel = new ClusterModel(config, new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId())));
this.loadRetriever = new LoadRetriever(config, quorumController, clusterModel,
new LogContext(String.format("[LoadRetriever id=%d] ", quorumController.nodeId())));
this.anomalyDetector = new AnomalyDetector(config, quorumController, clusterModel,
new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId())));
this.queue = new KafkaEventQueue(time, new LogContext(), "auto-balancer-");
this.quorumController = quorumController;
ClusterStatusListenerRegistry registry = new ClusterStatusListenerRegistry();
registry.register((BrokerStatusListener) clusterModel);
registry.register((TopicPartitionStatusListener) clusterModel);
registry.register(this.loadRetriever);
raftClient.register(new AutoBalancerListener(registry, this.loadRetriever, this.anomalyDetector));
}

public void start() {
loadRetriever.start();
anomalyDetector.start();
logger.info("Started");
}

public void shutdown() throws InterruptedException {
anomalyDetector.shutdown();
loadRetriever.shutdown();
queue.close();
logger.info("Shutdown completed");
}

class AutoBalancerListener implements RaftClient.Listener<ApiMessageAndVersion> {
private final ClusterStatusListenerRegistry registry;
private final LoadRetriever loadRetriever;
private final AnomalyDetector anomalyDetector;

public AutoBalancerListener(ClusterStatusListenerRegistry registry, LoadRetriever loadRetriever, AnomalyDetector anomalyDetector) {
this.registry = registry;
this.loadRetriever = loadRetriever;
this.anomalyDetector = anomalyDetector;
}

private void handleMessage(ApiMessage message) {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
for (BrokerStatusListener listener : this.registry.brokerListeners()) {
listener.onBrokerRegister((RegisterBrokerRecord) message);
}
break;
case UNREGISTER_BROKER_RECORD:
for (BrokerStatusListener listener : this.registry.brokerListeners()) {
listener.onBrokerUnregister((UnregisterBrokerRecord) message);
}
break;
case BROKER_REGISTRATION_CHANGE_RECORD:
for (BrokerStatusListener listener : this.registry.brokerListeners()) {
listener.onBrokerRegistrationChanged((BrokerRegistrationChangeRecord) message);
}
break;
case TOPIC_RECORD:
for (TopicPartitionStatusListener listener : this.registry.topicPartitionListeners()) {
listener.onTopicCreate((TopicRecord) message);
}
break;
case REMOVE_TOPIC_RECORD:
for (TopicPartitionStatusListener listener : this.registry.topicPartitionListeners()) {
listener.onTopicDelete((RemoveTopicRecord) message);
}
break;
case PARTITION_RECORD:
for (TopicPartitionStatusListener listener : this.registry.topicPartitionListeners()) {
listener.onPartitionCreate((PartitionRecord) message);
}
break;
case PARTITION_CHANGE_RECORD:
for (TopicPartitionStatusListener listener : this.registry.topicPartitionListeners()) {
listener.onPartitionChange((PartitionChangeRecord) message);
}
break;
default:
break;
}
}

@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
queue.append(() -> {
try (reader) {
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
List<ApiMessageAndVersion> messages = batch.records();
for (ApiMessageAndVersion apiMessage : messages) {
handleMessage(apiMessage.message());
}
}
}
});
}

@Override
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
queue.append(() -> {
try (reader) {
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
List<ApiMessageAndVersion> messages = batch.records();
for (ApiMessageAndVersion apiMessage : messages) {
handleMessage(apiMessage.message());
}
}
}
});
}

@Override
public void handleLeaderChange(LeaderAndEpoch leader) {
queue.append(() -> {
if (leader.leaderId().isEmpty()) {
return;
}
boolean isLeader = leader.isLeader(quorumController.nodeId());
if (isLeader) {
this.anomalyDetector.resume();
} else {
this.anomalyDetector.pause();
}
this.loadRetriever.onLeaderChanged(isLeader);
});
}

@Override
public void beginShutdown() {
RaftClient.Listener.super.beginShutdown();
}
}

}
Loading

0 comments on commit c60bc7d

Please sign in to comment.