Skip to content

Commit

Permalink
feat[balance_strategy]: announcing a new round robin balance strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoyu.lin committed Aug 23, 2020
1 parent 5992c4c commit be9fdc6
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 0 deletions.
92 changes: 92 additions & 0 deletions balance_strategy.go
Expand Up @@ -2,6 +2,8 @@ package sarama

import (
"container/heap"
"errors"
"fmt"
"math"
"sort"
"strings"
Expand All @@ -14,6 +16,10 @@ const (
// RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
RoundRobinBalanceStrategyName = "roundrobin"

// EvenRoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy base on all topic's partitions,
// which assigns partitions more evenly in comparison to the`RoundRobinBalanceStrategyName` strategy
EvenRoundRobinBalanceStrategyName = "even_roundrobin"

// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
StickyBalanceStrategyName = "sticky"

Expand Down Expand Up @@ -353,6 +359,92 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart
}
}

// BalanceStrategyEvenRoundRobin assigns partitions to members in alternating order.
// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
// M0: [t0p0, t0p2, t1p1]
// M1: [t0p1, t1p0, t1p2]
var BalanceStrategyEvenRoundRobin = new(evenRoundRobinBalancer)

type evenRoundRobinBalancer struct{}

func (b *evenRoundRobinBalancer) Name() string {
return EvenRoundRobinBalanceStrategyName
}

func (b *evenRoundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
if len(memberAndMetadata) == 0 || len(topics) == 0 {
return nil, errors.New("members and topics are not provided")
}
// sort partitions
var topicPartitions []topicAndPartition
for topic, partitions := range topics {
for _, partition := range partitions {
topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
}
}
sort.SliceStable(topicPartitions, func(i, j int) bool {
pi := topicPartitions[i]
pj := topicPartitions[j]
return pi.comparedValue() < pj.comparedValue()
})

// sort members
var members []memberAndTopic
for memberID, meta := range memberAndMetadata {
m := memberAndTopic{
memberID: memberID,
topics: make(map[string]struct{}),
}
for _, t := range meta.Topics {
m.topics[t] = struct{}{}
}
members = append(members, m)
}
sort.SliceStable(members, func(i, j int) bool {
mi := members[i]
mj := members[j]
return mi.memberID < mj.memberID
})

// assign partitions
plan := make(BalanceStrategyPlan, len(members))
i := 0
n := len(members)
for _, tp := range topicPartitions {
m := members[i%n]
for !m.hasTopic(tp.topic) {
i++
m = members[i%n]
}
plan.Add(m.memberID, tp.topic, tp.partition)
i++
}
return plan, nil
}

func (b *evenRoundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return nil, nil // do nothing for now
}

type topicAndPartition struct {
topic string
partition int32
}

func (tp *topicAndPartition) comparedValue() string {
return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
}

type memberAndTopic struct {
memberID string
topics map[string]struct{}
}

func (m *memberAndTopic) hasTopic(topic string) bool {
_, isExist := m.topics[topic]
return isExist
}

// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
// Lower balance score indicates a more balanced assignment.
Expand Down
91 changes: 91 additions & 0 deletions balance_strategy_test.go
Expand Up @@ -105,6 +105,15 @@ func TestBalanceStrategyRoundRobin(t *testing.T) {
"M2": map[string][]int32{"T1": {1}, "T2": {0, 2}},
},
},
{
// case that partitions are assigned unevenly:
// there are three members and topics, however all the topics and partitions are assign to a single member
members: map[string][]string{"M": {"T1", "T2", "TT2"}, "M2": {"T1", "T2", "TT2"}, "M3": {"T1", "T2", "TT2"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}},
expected: BalanceStrategyPlan{
"M": map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}},
},
},
}

strategy := BalanceStrategyRoundRobin
Expand All @@ -127,6 +136,88 @@ func TestBalanceStrategyRoundRobin(t *testing.T) {
}
}

func TestBalanceStrategyEvenRoundRobin(t *testing.T) {
tests := []struct {
members map[string][]string
topics map[string][]int32
expected BalanceStrategyPlan
}{
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T3": {0}},
"M2": map[string][]int32{"T2": {0}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0, 1}, "T3": {0, 1, 2, 3}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {1}, "T3": {1, 3}},
"M2": map[string][]int32{"T2": {0}, "T3": {0, 2}},
},
},
{
members: map[string][]string{"M1": {"T1"}, "M2": {"T1"}},
topics: map[string][]int32{"T1": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {0}},
"M2": map[string][]int32{"T3": {0}},
},
},
{
members: map[string][]string{"M": {"T1", "T2", "TT2"}, "M2": {"T1", "T2", "TT2"}, "M3": {"T1", "T2", "TT2"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}},
expected: BalanceStrategyPlan{
"M": map[string][]int32{"T1": {0}},
"M2": map[string][]int32{"T2": {0}},
"M3": map[string][]int32{"TT2": {0}},
},
},
}

strategy := BalanceStrategyEvenRoundRobin
if strategy.Name() != "even_roundrobin" {
t.Errorf("Unexpected stategy name\nexpected: even_roundrobin\nactual: %v", strategy.Name())
}

for _, test := range tests {
members := make(map[string]ConsumerGroupMemberMetadata)
for memberID, topics := range test.members {
members[memberID] = ConsumerGroupMemberMetadata{Topics: topics}
}

actual, err := strategy.Plan(members, test.topics)
if err != nil {
t.Errorf("Unexpected error %v", err)
} else if !reflect.DeepEqual(actual, test.expected) {
t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual)
}
}
}

func Test_deserializeTopicPartitionAssignment(t *testing.T) {
type args struct {
userDataBytes []byte
Expand Down

0 comments on commit be9fdc6

Please sign in to comment.