Skip to content

Commit

Permalink
feat(core): refactor AbstractGoal for better scalability (#924)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <niesc@automq.com>
  • Loading branch information
SCNieh authored and superhx committed Mar 14, 2024
1 parent fef8305 commit 45e99e2
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 171 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

21 changes: 6 additions & 15 deletions core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.model.BrokerUpdater;
import kafka.autobalancer.model.ClusterModelSnapshot;
import kafka.autobalancer.model.ModelUtils;
import kafka.autobalancer.model.TopicPartitionReplicaUpdater;
import org.slf4j.Logger;

Expand Down Expand Up @@ -117,9 +116,9 @@ protected double calculateAcceptanceScore(BrokerUpdater.Broker srcBrokerBefore,
}

if (!isSrcBrokerAcceptedBefore && !isSrcBrokerAcceptedAfter) {
return score <= POSITIVE_ACTION_SCORE_THRESHOLD ? 0.0 : score;
return score < POSITIVE_ACTION_SCORE_THRESHOLD ? 0.0 : score;
} else if (!isDestBrokerAcceptedBefore && !isDestBrokerAcceptedAfter) {
return score <= POSITIVE_ACTION_SCORE_THRESHOLD ? 0.0 : score;
return score < POSITIVE_ACTION_SCORE_THRESHOLD ? 0.0 : score;
}
return score;
}
Expand Down Expand Up @@ -162,24 +161,16 @@ public double actionAcceptanceScore(Action action, ClusterModelSnapshot cluster)
BrokerUpdater.Broker destBrokerBefore = cluster.broker(action.getDestBrokerId());
BrokerUpdater.Broker srcBrokerAfter = srcBrokerBefore.copy();
BrokerUpdater.Broker destBrokerAfter = destBrokerBefore.copy();
TopicPartitionReplicaUpdater.TopicPartitionReplica srcReplica = cluster.replica(action.getSrcBrokerId(), action.getSrcTopicPartition());

switch (action.getType()) {
case MOVE:
ModelUtils.moveReplicaLoad(srcBrokerAfter, destBrokerAfter, srcReplica);
break;
case SWAP:
ModelUtils.moveReplicaLoad(srcBrokerAfter, destBrokerAfter, srcReplica);
ModelUtils.moveReplicaLoad(destBrokerAfter, srcBrokerAfter,
cluster.replica(action.getDestBrokerId(), action.getDestTopicPartition()));
break;
default:
return 0.0;
if (!moveReplica(action, cluster, srcBrokerAfter, destBrokerAfter)) {
return 0.0;
}

return calculateAcceptanceScore(srcBrokerBefore, destBrokerBefore, srcBrokerAfter, destBrokerAfter);
}

protected abstract boolean moveReplica(Action action, ClusterModelSnapshot cluster, BrokerUpdater.Broker src, BrokerUpdater.Broker dest);

@Override
public Set<BrokerUpdater.Broker> getEligibleBrokers(ClusterModelSnapshot cluster) {
return cluster.brokers().stream().filter(BrokerUpdater.Broker::isActive).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import kafka.autobalancer.common.Resource;
import kafka.autobalancer.model.BrokerUpdater;
import kafka.autobalancer.model.ClusterModelSnapshot;
import kafka.autobalancer.model.ModelUtils;
import kafka.autobalancer.model.TopicPartitionReplicaUpdater;
import org.slf4j.Logger;

Expand Down Expand Up @@ -55,6 +56,24 @@ protected Optional<Action> trySwapPartitionOut(ClusterModelSnapshot cluster,
return getAcceptableAction(candidateActionScores);
}

@Override
protected boolean moveReplica(Action action, ClusterModelSnapshot cluster, BrokerUpdater.Broker src, BrokerUpdater.Broker dest) {
TopicPartitionReplicaUpdater.TopicPartitionReplica srcReplica = cluster.replica(action.getSrcBrokerId(), action.getSrcTopicPartition());
switch (action.getType()) {
case MOVE:
ModelUtils.moveReplicaLoad(src, dest, srcReplica);
break;
case SWAP:
TopicPartitionReplicaUpdater.TopicPartitionReplica destReplica = cluster.replica(action.getDestBrokerId(), action.getDestTopicPartition());
ModelUtils.moveReplicaLoad(src, dest, srcReplica);
ModelUtils.moveReplicaLoad(dest, src, destReplica);
break;
default:
return false;
}
return true;
}

/**
* Try to reduce resource load by move or swap replicas out.
*
Expand Down

This file was deleted.

0 comments on commit 45e99e2

Please sign in to comment.