Skip to content

Commit

Permalink
Fix Rebalance events behavior for static membership and fix for consu… (
Browse files Browse the repository at this point in the history
  • Loading branch information
jliunyu authored and emasab committed May 26, 2022
1 parent 92c6e40 commit a521d28
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 17 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,12 @@ This is a feature release:
See [examples/mock_cluster](examples/mock_cluster).


### Fixes

* Fix Rebalance events behavior for static membership (@jliunyu, #757).
* Fix consumer close taking 10 seconds when there's no rebalance
needed (@jliunyu, #757).

confluent-kafka-go is based on librdkafka FUTUREFIXME, see the
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.
Expand Down
37 changes: 20 additions & 17 deletions kafka/consumer.go
Expand Up @@ -424,30 +424,33 @@ func (c *Consumer) Close() (err error) {
close(c.events)
}

// librdkafka's rd_kafka_consumer_close() will block
// and trigger the rebalance_cb() if one is set, if not, which is the
// case with the Go client since it registers EVENTs rather than callbacks,
// librdkafka will shortcut the rebalance_cb and do a forced unassign.
// But we can't have that since the application might need the final RevokePartitions
// before shutting down. So we trigger an Unsubscribe() first, wait for that to
// propagate (in the Poll loop below), and then close the consumer.
c.Unsubscribe()

// Poll for rebalance events
for {
c.Poll(10 * 1000)
if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
break
doneChan := make(chan bool)

go func() {
C.rd_kafka_consumer_close(c.handle.rk)
// wake up Poll()
C.rd_kafka_queue_yield(c.handle.rkq)
doneChan <- true
}()

// wait for consumer_close() to finish while serving c.Poll() for rebalance callbacks/events
run := true
for run {
select {
case <-doneChan:
run = false

default:
c.Poll(100)
}
}

close(doneChan)

// Destroy our queue
C.rd_kafka_queue_destroy(c.handle.rkq)
c.handle.rkq = nil

// Close the consumer
C.rd_kafka_consumer_close(c.handle.rk)

c.handle.cleanup()

C.rd_kafka_destroy(c.handle.rk)
Expand Down
273 changes: 273 additions & 0 deletions kafka/consumer_test.go
Expand Up @@ -22,6 +22,8 @@ import (
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -429,3 +431,274 @@ func TestConsumerLog(t *testing.T) {
}
}
}

func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T) func(c *Consumer, event Event) error {
return func(c *Consumer, event Event) error {
switch ev := event.(type) {
case AssignedPartitions:
atomic.AddInt32(assignedEvents, 1)

t.Logf("%v, %s rebalance: %d new partition(s) assigned: %v\n",
c, c.GetRebalanceProtocol(), len(ev.Partitions),
ev.Partitions)
err := c.Assign(ev.Partitions)
if err != nil {
panic(err)
}

case RevokedPartitions:
atomic.AddInt32(revokedEvents, 1)

t.Logf("%v, %s rebalance: %d partition(s) revoked: %v\n",
c, c.GetRebalanceProtocol(), len(ev.Partitions),
ev.Partitions)
if c.AssignmentLost() {
// Our consumer has been kicked out of the group and the
// entire assignment is thus lost.
t.Logf("%v, Current assignment lost!\n", c)
}

// The client automatically calls Unassign() unless
// the callback has already called that method.
}
return nil
}
}

func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()

run := true
for run {
select {
case <-doneChan:
run = false

default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *Message:
t.Logf("Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
t.Logf("Headers: %v\n", e.Headers)
}

case Error:
// Errors should generally be
// considered informational, the client
// will try to automatically recover.
t.Logf("Error: %v: %v for "+
"consumer %v\n", e.Code(), e, c)

default:
t.Logf("Ignored %v for consumer %v\n",
e, c)
}
}
}
}

// TestConsumerCloseForStaticMember verifies the rebalance
// for static membership.
// According to KIP-345, the consumer group will not trigger rebalance unless
// 1) A new member joins
// 2) A leader rejoins (possibly due to topic assignment change)
// 3) An existing member offline time is over session timeout
// 4) Broker receives a leave group request containing alistof
// `group.instance.id`s (details later)
//
// This test uses 3 consumers while each consumer joins after the assignment
// finished for the previous consumers.
// The expected behavior for these consumers are:
// 1) First consumer joins, AssignedPartitions happens. Assign all the
// partitions to it.
// 2) Second consumer joins, RevokedPartitions happens from the first consumer,
// then AssignedPartitions happens to both consumers.
// 3) Third consumer joins, RevokedPartitions happens from the previous two
// consumers, then AssignedPartitions happens to all the three consumers.
// 4) Close the second consumer, RevokedPartitions should not happen.
// 5) Rejoin the second consumer, rebalance should not happen to all the other
// consumers since it's not the leader, AssignedPartitions only happened
// to this consumer to assign the partitions.
//
// The total number of AssignedPartitions for the first consumer is 3,
// and the total number of RevokedPartitions for the first consumer is 2.
// The total number of AssignedPartitions for the second consumer is 2,
// and the total number of RevokedPartitions for the second consumer is 1.
// The total number of AssignedPartitions for the third consumer is 1,
// and the total number of RevokedPartitions for the second consumer is 0.
// The total number of AssignedPartitions for the rejoined consumer
// (originally second consumer) is 1,
// and the total number of RevokedPartitions for the rejoined consumer
// (originally second consumer) is 0.
func TestConsumerCloseForStaticMember(t *testing.T) {
if !testconfRead() {
t.Skipf("Missing testconf.json")
}
broker := testconf.Brokers
topic := testconf.Topic

var assignedEvents1 int32
var revokedEvents1 int32

var assignedEvents2 int32
var revokedEvents2 int32

var assignedEvents3 int32
var revokedEvents3 int32

var assignedEvents4 int32
var revokedEvents4 int32

conf1 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember1",
}
c1, err := NewConsumer(&conf1)

conf2 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember2",
}
c2, err := NewConsumer(&conf2)
if err != nil {
t.Fatalf("%s", err)
}

conf3 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember3",
}

c3, err := NewConsumer(&conf3)
if err != nil {
t.Fatalf("%s", err)
}
wrapRebalancecb1 := wrapRebalanceCb(&assignedEvents1, &revokedEvents1, t)
err = c1.Subscribe(topic, wrapRebalancecb1)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
}

wg := sync.WaitGroup{}
doneChan := make(chan bool, 3)

wg.Add(1)
go testPoll(c1, doneChan, t, &wg)
testConsumerWaitAssignment(c1, t)

closeChan := make(chan bool)
wrapRebalancecb2 := wrapRebalanceCb(&assignedEvents2, &revokedEvents2, t)
err = c2.Subscribe(topic, wrapRebalancecb2)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
}
wg.Add(1)
go testPoll(c2, closeChan, t, &wg)
testConsumerWaitAssignment(c2, t)

wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t)
err = c3.Subscribe(topic, wrapRebalancecb3)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
}
wg.Add(1)
go testPoll(c3, doneChan, t, &wg)
testConsumerWaitAssignment(c3, t)

closeChan <- true
close(closeChan)
c2.Close()

c2, err = NewConsumer(&conf2)
if err != nil {
t.Fatalf("%s", err)
}

wrapRebalancecb4 := wrapRebalanceCb(&assignedEvents4, &revokedEvents4, t)
err = c2.Subscribe(topic, wrapRebalancecb4)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
}

wg.Add(1)
go testPoll(c2, doneChan, t, &wg)
testConsumerWaitAssignment(c2, t)

doneChan <- true
close(doneChan)

c3.Close()
c2.Close()
c1.Close()

wg.Wait()

// Wait 2 * session.timeout.ms to make sure no revokedEvents happens
time.Sleep(2 * 6000 * time.Millisecond)

if atomic.LoadInt32(&assignedEvents1) != 3 {
t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents1))
}

if atomic.LoadInt32(&revokedEvents1) != 2 {
t.Fatalf("2 revokedEvents are Expected to happen for the first consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents1))
}

if atomic.LoadInt32(&assignedEvents2) != 2 {
t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents2))
}
if atomic.LoadInt32(&revokedEvents2) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents2))
}

if atomic.LoadInt32(&assignedEvents3) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents3))
}
if atomic.LoadInt32(&revokedEvents3) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents3))
}

if atomic.LoadInt32(&assignedEvents4) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&assignedEvents4))
}
if atomic.LoadInt32(&revokedEvents4) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&revokedEvents4))
}
}

func testConsumerWaitAssignment(c *Consumer, t *testing.T) {
run := true
for run {
assignment, err := c.Assignment()
if err != nil {
t.Fatalf("Assignment failed: %s\n", err)
}

if len(assignment) != 0 {
t.Logf("%v Assigned partitions are: %v\n", c, assignment)
run = false
}
}
}

0 comments on commit a521d28

Please sign in to comment.