Skip to content

Commit

Permalink
Add RefreshCoordinator, remove invalidateCoordinator.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Apr 9, 2015
1 parent 5720591 commit 79953ea
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 23 deletions.
42 changes: 27 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ type Client interface {
// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
GetOffset(topic string, partitionID int32, time int64) (int64, error)

// Coordinator returns the coordinating broker for a consumer group.
// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached
// value if it's available. This may be stale, in which case you should call RefreshCoordinator
// to update the locally cached value.
Coordinator(consumerGroup string) (*Broker, error)

// RefreshCoordinator refreshes the coordinator for a consumer group.
RefreshCoordinator(consumerGroup string) error

// Close shuts down all broker connections managed by this client. It is required to call this function before
// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
// using a client before you close the client.
Expand Down Expand Up @@ -313,25 +318,38 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
client.lock.RLock()
coordinator, ok := client.coordinators[consumerGroup]
coordinator := client.coordinators[consumerGroup]
client.lock.RUnlock()

if !ok {
if coordinator == nil {
var err error
coordinator, err = client.refreshCoordinator(consumerGroup, client.conf.Metadata.Retry.Max)
err = client.RefreshCoordinator(consumerGroup)
if err != nil {
return nil, err
}

client.lock.Lock()
client.coordinators[consumerGroup] = client.registerBroker(coordinator)
client.lock.Unlock()
client.lock.RLock()
coordinator = client.coordinators[consumerGroup]
client.lock.RUnlock()
}

_ = coordinator.Open(client.conf)
return coordinator, nil
}

func (client *client) RefreshCoordinator(consumerGroup string) error {
coordinator, err := client.getCoordinator(consumerGroup, client.conf.Metadata.Retry.Max)
if err != nil {
return err
}

client.lock.Lock()
client.coordinators[consumerGroup] = client.registerBroker(coordinator)
client.lock.Unlock()

return nil
}

// private broker management helpers

func (client *client) disconnectBroker(broker *Broker) {
Expand Down Expand Up @@ -632,7 +650,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
return ret, err
}

func (client *client) refreshCoordinator(consumerGroup string, attemptsRemaining int) (*Broker, error) {
func (client *client) getCoordinator(consumerGroup string, attemptsRemaining int) (*Broker, error) {
for broker := client.any(); broker != nil; broker = client.any() {

Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr())
Expand Down Expand Up @@ -676,14 +694,8 @@ func (client *client) refreshCoordinator(consumerGroup string, attemptsRemaining

time.Sleep(client.conf.Metadata.Retry.Backoff)
client.resurrectDeadBrokers()
return client.refreshCoordinator(consumerGroup, attemptsRemaining-1)
return client.getCoordinator(consumerGroup, attemptsRemaining-1)
}

return nil, ErrOutOfBrokers
}

func (client *client) invalidateCoordinator(consumerGroup string) {
client.lock.Lock()
delete(client.coordinators, consumerGroup)
client.lock.Unlock()
}
50 changes: 42 additions & 8 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ func TestClientResurrectDeadSeeds(t *testing.T) {

func TestClientCoordinator(t *testing.T) {
seedBroker := newMockBroker(t, 1)
coordinator := newMockBroker(t, 5)
staleCoordinator := newMockBroker(t, 2)
freshCoordinator := newMockBroker(t, 3)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)
Expand All @@ -440,9 +441,9 @@ func TestClientCoordinator(t *testing.T) {
seedBroker.Returns(coordinatorResponse1)

coordinatorResponse2 := new(ConsumerMetadataResponse)
coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
coordinatorResponse2.CoordinatorHost = "127.0.0.1"
coordinatorResponse2.CoordinatorPort = coordinator.Port()
coordinatorResponse2.CoordinatorPort = staleCoordinator.Port()

seedBroker.Returns(coordinatorResponse2)

Expand All @@ -451,15 +452,48 @@ func TestClientCoordinator(t *testing.T) {
t.Error(err)
}

if coordinator.Addr() != broker.Addr() {
t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
if staleCoordinator.Addr() != broker.Addr() {
t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
}

if coordinator.BrokerID() != broker.ID() {
t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
if staleCoordinator.BrokerID() != broker.ID() {
t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
}

coordinator.Close()
// Grab the cached value
broker2, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if broker2.Addr() != broker.Addr() {
t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
}

coordinatorResponse3 := new(ConsumerMetadataResponse)
coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
coordinatorResponse3.CoordinatorHost = "127.0.0.1"
coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()

seedBroker.Returns(coordinatorResponse3)

// Refresh the locally cahced value because it's stale
if err := client.RefreshCoordinator("my_group"); err != nil {
t.Error(err)
}

// Grab the fresh value
broker3, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if broker3.Addr() != freshCoordinator.Addr() {
t.Errorf("Expected the freshCoordinator to be returned, but found %s vs. %s", broker3.Addr())
}

freshCoordinator.Close()
staleCoordinator.Close()
seedBroker.Close()
safeClose(t, client)
}

0 comments on commit 79953ea

Please sign in to comment.