Skip to content

Commit

Permalink
KIP-392: Consumer support for sarama
Browse files Browse the repository at this point in the history
  • Loading branch information
dneralla committed May 11, 2020
1 parent 9c1c364 commit 71e02c7
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 11 deletions.
14 changes: 14 additions & 0 deletions client.go
Expand Up @@ -29,6 +29,9 @@ type Client interface {
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

// Broker returns the active Broker if available for the brokerID
Broker(brokerID int32) (*Broker, error)

// Topics returns the set of available topics as retrieved from cluster metadata.
Topics() ([]string, error)

Expand Down Expand Up @@ -196,6 +199,17 @@ func (client *client) Brokers() []*Broker {
return brokers
}

func (client *client) Broker(brokerID int32) (*Broker, error) {
client.lock.RLock()
defer client.lock.RUnlock()
broker, ok := client.brokers[brokerID]
if !ok {
return nil, ErrBrokerNotFound
}
_ = broker.Open(client.conf)
return broker, nil
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {
Expand Down
36 changes: 36 additions & 0 deletions client_test.go
Expand Up @@ -545,6 +545,42 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
}
}

func TestClientGetBroker(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}

broker, err := client.Broker(leader.BrokerID())
if err != nil {
t.Fatal(err)
}

if broker.Addr() != leader.Addr() {
t.Errorf("Expected broker to have address %s, found %s", leader.Addr(), broker.Addr())
}

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse2)

if err := client.RefreshMetadata(); err != nil {
t.Error(err)
}
broker, err = client.Broker(leader.BrokerID())
if err != ErrBrokerNotFound {
t.Errorf("Expected Broker(brokerID) to return %v found %v", ErrBrokerNotFound, err)
}
}

func TestClientResurrectDeadSeeds(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
Expand Down
44 changes: 33 additions & 11 deletions consumer.go
Expand Up @@ -299,6 +299,9 @@ type partitionConsumer struct {
errors chan *ConsumerError
feeder chan *FetchResponse

replicaInited bool
preferredReadReplica int32

trigger, dying chan none
closeOnce sync.Once
topic string
Expand Down Expand Up @@ -359,21 +362,29 @@ func (child *partitionConsumer) dispatcher() {
close(child.feeder)
}

func (child *partitionConsumer) preferedBroker() (*Broker, error) {
if child.replicaInited {
broker, err := child.consumer.client.Broker(child.preferredReadReplica)
if err == nil {
return broker, nil
}
}

// if prefered replica cannot be found fallback to leader
return child.consumer.client.Leader(child.topic, child.partition)
}

func (child *partitionConsumer) dispatch() error {
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
return err
}

var leader *Broker
var node *Broker
var err error
if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
if node, err = child.preferedBroker(); err != nil {
return err
}

child.broker = child.consumer.refBrokerConsumer(leader)

child.broker = child.consumer.refBrokerConsumer(node)
child.broker.input <- child

return nil
}

Expand Down Expand Up @@ -445,7 +456,6 @@ func (child *partitionConsumer) responseFeeder() {
feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)

if child.responseResult == nil {
atomic.StoreInt32(&child.retries, 0)
}
Expand Down Expand Up @@ -480,7 +490,6 @@ feederLoop:
}
}
}

child.broker.acks.Done()
}

Expand Down Expand Up @@ -617,6 +626,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

if response.Version == 11 && len(child.consumer.conf.RackID) > 0 {
// we got a valid response with messages. update child's preferredReadReplica from the FetchResponseBlock
child.replicaInited = true
child.preferredReadReplica = block.PreferredReadReplica
}

// abortedProducerIDs contains producerID which message should be ignored as uncommitted
// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
Expand Down Expand Up @@ -815,10 +830,16 @@ func (bc *brokerConsumer) handleResponses() {
for child := range bc.subscriptions {
result := child.responseResult
child.responseResult = nil

switch result {
case nil:
// no-op
if !child.replicaInited {
return
}
if bc.broker.ID() != child.preferredReadReplica {
// not an error but needs redispatching to consume from prefered replica
child.trigger <- none{}
delete(bc.subscriptions, child)
}
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
Expand All @@ -834,6 +855,7 @@ func (bc *brokerConsumer) handleResponses() {
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.replicaInited = false
child.trigger <- none{}
delete(bc.subscriptions, child)
default:
Expand Down
68 changes: 68 additions & 0 deletions consumer_test.go
Expand Up @@ -618,6 +618,74 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) {
}
}

func TestConsumeMessagesFromReadReplica(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 11}
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
block1 := fetchResponse1.GetBlock("my_topic", 0)
block1.PreferredReadReplica = 1

fetchResponse2 := &FetchResponse{Version: 11}
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse2.AddMessage("my_topic", 0, nil, testMsg, 4)
block2 := fetchResponse2.GetBlock("my_topic", 0)
block2.PreferredReadReplica = 1

cfg := NewConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

leader := NewMockBroker(t, 0)
broker0 := NewMockBroker(t, 1)

leader.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1),
})

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse2),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

assertMessageOffset(t, <-consumer.Messages(), 1)
assertMessageOffset(t, <-consumer.Messages(), 2)
assertMessageOffset(t, <-consumer.Messages(), 3)
assertMessageOffset(t, <-consumer.Messages(), 4)

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
leader.Close()

}

// It is fine if offsets of fetched messages are not sequential (although
// strictly increasing!).
func TestConsumerNonSequentialOffsets(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions errors.go
Expand Up @@ -9,6 +9,9 @@ import (
// or otherwise failed to respond.
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")

// ErrBrokerNotFound is returned when there's no broker found for the requested id.
var ErrBrokerNotFound = errors.New("kafka: broker for id is not found")

// ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

Expand Down

0 comments on commit 71e02c7

Please sign in to comment.