Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ConsumerGroups #1083

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions balance_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package sarama

import (
"math"
"sort"
)

// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
// It contains an allocation of topic/partitions by memberID in the form of
// a `memberID -> topic -> partitions` map.
type BalanceStrategyPlan map[string]map[string][]int32

// Add assigns a topic with a number partitions to a member.
func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
if len(partitions) == 0 {
return
}
if _, ok := p[memberID]; !ok {
p[memberID] = make(map[string][]int32, 1)
}
p[memberID][topic] = append(p[memberID][topic], partitions...)
}

// --------------------------------------------------------------------

// BalanceStrategy is used to balance topics and partitions
// across memebers of a consumer group
type BalanceStrategy interface {
// Name uniquely identifies the strategy.
Name() string

// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
// and returns a distribution plan.
Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
}

// --------------------------------------------------------------------

// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
// Example with one topic T with six partitions (0..5) and two members (M1, M2) :
// M1: {T: [0, 1, 2]}
// M2: {T: [3, 4, 5]}
var BalanceStrategyRange = &balanceStrategy{
name: "range",
coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
step := float64(len(partitions)) / float64(len(memberIDs))

for i, memberID := range memberIDs {
pos := float64(i)
min := int(math.Floor(pos*step + 0.5))
max := int(math.Floor((pos+1)*step + 0.5))
plan.Add(memberID, topic, partitions[min:max]...)
}
},
}

// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
// Example with topic T with six partitions (0..5) and two members (M1, M2):
// M1: {T: [0, 2, 4]}
// M2: {T: [1, 3, 5]}
var BalanceStrategyRoundRobin = &balanceStrategy{
name: "roundrobin",
coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
for i, part := range partitions {
memberID := memberIDs[i%len(memberIDs)]
plan.Add(memberID, topic, part)
}
},
}

// --------------------------------------------------------------------

type balanceStrategy struct {
name string
coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
}

// Name implements BalanceStrategy.
func (s *balanceStrategy) Name() string { return s.name }

// Balance implements BalanceStrategy.
func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
// Build members by topic map
mbt := make(map[string][]string)
for memberID, meta := range members {
for _, topic := range meta.Topics {
mbt[topic] = append(mbt[topic], memberID)
}
}

// Sort members for each topic
for topic, memberIDs := range mbt {
sort.Sort(&balanceStrategySortable{
topic: topic,
memberIDs: memberIDs,
})
}

// Assemble plan
plan := make(BalanceStrategyPlan, len(members))
for topic, memberIDs := range mbt {
s.coreFn(plan, memberIDs, topic, topics[topic])
}
return plan, nil
}

type balanceStrategySortable struct {
topic string
memberIDs []string
}

func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
func (p balanceStrategySortable) Swap(i, j int) {
p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
}
func (p balanceStrategySortable) Less(i, j int) bool {
return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
}

func balanceStrategyHashValue(vv ...string) uint32 {
h := uint32(2166136261)
for _, s := range vv {
for _, c := range s {
h ^= uint32(c)
h *= 16777619
}
}
return h
}
102 changes: 102 additions & 0 deletions balance_strategy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package sarama

import (
"reflect"
"testing"
)

func TestBalanceStrategyRange(t *testing.T) {
tests := []struct {
members map[string][]string
topics map[string][]int32
expected BalanceStrategyPlan
}{
{
members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 1}, "T2": {2, 3}},
"M2": map[string][]int32{"T1": {2, 3}, "T2": {0, 1}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 1}, "T2": {2}},
"M2": map[string][]int32{"T1": {2}, "T2": {0, 1}},
},
},
{
members: map[string][]string{"M1": {"T1"}, "M2": {"T1", "T2"}},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can the broker coordinate if the different group members subscribe to different topics entirely?

Here's an example of that. Topics can be distributed unevenly even under the same group ID.

topics: map[string][]int32{"T1": {0, 1}, "T2": {0, 1}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}},
"M2": map[string][]int32{"T1": {1}, "T2": {0, 1}},
},
},
}

strategy := BalanceStrategyRange
if strategy.Name() != "range" {
t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
}

for _, test := range tests {
members := make(map[string]ConsumerGroupMemberMetadata)
for memberID, topics := range test.members {
members[memberID] = ConsumerGroupMemberMetadata{Topics: topics}
}

actual, err := strategy.Plan(members, test.topics)
if err != nil {
t.Errorf("Unexpected error %v", err)
} else if !reflect.DeepEqual(actual, test.expected) {
t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual)
}
}
}

func TestBalanceStrategyRoundRobin(t *testing.T) {
tests := []struct {
members map[string][]string
topics map[string][]int32
expected BalanceStrategyPlan
}{
{
members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 2}, "T2": {1, 3}},
"M2": map[string][]int32{"T1": {1, 3}, "T2": {0, 2}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 2}, "T2": {1}},
"M2": map[string][]int32{"T1": {1}, "T2": {0, 2}},
},
},
}

strategy := BalanceStrategyRoundRobin
if strategy.Name() != "roundrobin" {
t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
}

for _, test := range tests {
members := make(map[string]ConsumerGroupMemberMetadata)
for memberID, topics := range test.members {
members[memberID] = ConsumerGroupMemberMetadata{Topics: topics}
}

actual, err := strategy.Plan(members, test.topics)
if err != nil {
t.Errorf("Unexpected error %v", err)
} else if !reflect.DeepEqual(actual, test.expected) {
t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual)
}
}
}
88 changes: 81 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,55 @@ type Config struct {

// Consumer is the namespace for configuration related to consuming messages,
// used by the Consumer.
//
// Note that Sarama's Consumer type does not currently support automatic
// consumer-group rebalancing and offset tracking. For Zookeeper-based
// tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka
// library builds on Sarama to add this support. For Kafka-based tracking
// (Kafka 0.9 and later), the https://github.com/bsm/sarama-cluster library
// builds on Sarama to add this support.
Consumer struct {

// Group is the namespace for configuring consumer group.
Group struct {
Session struct {
// The timeout used to detect consumer failures when using Kafka's group management facility.
// The consumer sends periodic heartbeats to indicate its liveness to the broker.
// If no heartbeats are received by the broker before the expiration of this session timeout,
// then the broker will remove this consumer from the group and initiate a rebalance.
// Note that the value must be in the allowable range as configured in the broker configuration
// by `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (default 10s)
Timeout time.Duration
}
Heartbeat struct {
// The expected time between heartbeats to the consumer coordinator when using Kafka's group
// management facilities. Heartbeats are used to ensure that the consumer's session stays active and
// to facilitate rebalancing when new consumers join or leave the group.
// The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no
// higher than 1/3 of that value.
// It can be adjusted even lower to control the expected time for normal rebalances (default 3s)
Interval time.Duration
}
Rebalance struct {
// Strategy for allocating topic partitions to members (default BalanceStrategyRange)
Strategy BalanceStrategy
// The maximum allowed time for each worker to join the group once a rebalance has begun.
// This is basically a limit on the amount of time needed for all tasks to flush any pending
// data and commit offsets. If the timeout is exceeded, then the worker will be removed from
// the group, which will cause offset commit failures (default 60s.
Timeout time.Duration

Retry struct {
// When a new consumer joins a consumer group the set of consumers attempt to "rebalance"
// the load to assign partitions to each consumer. If the set of consumers changes while
// this assignment is taking place the rebalance will fail and retry. This setting controls
// the maximum number of attempts before giving up (default 4).
Max int
// Backoff time between retries during rebalance (default 2s)
Backoff time.Duration
}
}
Member struct {
// Custom metadata to include when joining the group. The user data for all joined members
// can be retrieved by sending a DescribeGroupRequest to the broker that is the
// coordinator for the group.
UserData []byte
}
}

Retry struct {
// How long to wait after a failing to read from a partition before
// trying again (default 2s).
Expand Down Expand Up @@ -308,6 +349,13 @@ func NewConfig() *Config {
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest

c.Consumer.Group.Session.Timeout = 10 * time.Second
c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
c.Consumer.Group.Rebalance.Retry.Max = 4
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.Version = MinVersion
Expand Down Expand Up @@ -355,6 +403,15 @@ func (c *Config) Validate() error {
if c.Consumer.Offsets.Retention%time.Millisecond != 0 {
Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Consumer.Group.Session.Timeout%time.Millisecond != 0 {
Logger.Println("Consumer.Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Consumer.Group.Heartbeat.Interval%time.Millisecond != 0 {
Logger.Println("Consumer.Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Consumer.Group.Rebalance.Timeout%time.Millisecond != 0 {
Logger.Println("Consumer.Group.Rebalance.Timeout only supports millisecond precision; nanoseconds will be truncated.")
}
if c.ClientID == defaultClientID {
Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
}
Expand Down Expand Up @@ -443,7 +500,24 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
}

// validate the Consumer Group values
switch {
case c.Consumer.Group.Session.Timeout <= 2*time.Millisecond:
return ConfigurationError("Consumer.Group.Session.Timeout must be >= 2ms")
case c.Consumer.Group.Heartbeat.Interval < 1*time.Millisecond:
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms")
case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout:
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout")
case c.Consumer.Group.Rebalance.Strategy == nil:
return ConfigurationError("Consumer.Group.Rebalance.Strategy must not be empty")
case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond:
return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms")
case c.Consumer.Group.Rebalance.Retry.Max < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Max must be >= 0")
case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
}

// validate misc shared values
Expand Down
Loading