diff --git a/client.go b/client.go index f01fce9b4..7d1501af5 100644 --- a/client.go +++ b/client.go @@ -43,9 +43,14 @@ type Client interface { // offset, OffsetNewest for the offset of the message that will be produced next, or a time. GetOffset(topic string, partitionID int32, time int64) (int64, error) - // Coordinator returns the coordinating broker for a consumer group. + // Coordinator returns the coordinating broker for a consumer group. It will return a locally cached + // value if it's available. This may be stale, in which case you should call RefreshCoordinator + // to update the locally cached value. Coordinator(consumerGroup string) (*Broker, error) + // RefreshCoordinator refreshes the coordinator for a consumer group. + RefreshCoordinator(consumerGroup string) error + // Close shuts down all broker connections managed by this client. It is required to call this function before // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers // using a client before you close the client. @@ -313,25 +318,38 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in func (client *client) Coordinator(consumerGroup string) (*Broker, error) { client.lock.RLock() - coordinator, ok := client.coordinators[consumerGroup] + coordinator := client.coordinators[consumerGroup] client.lock.RUnlock() - if !ok { + if coordinator == nil { var err error - coordinator, err = client.refreshCoordinator(consumerGroup, client.conf.Metadata.Retry.Max) + err = client.RefreshCoordinator(consumerGroup) if err != nil { return nil, err } - client.lock.Lock() - client.coordinators[consumerGroup] = client.registerBroker(coordinator) - client.lock.Unlock() + client.lock.RLock() + coordinator = client.coordinators[consumerGroup] + client.lock.RUnlock() } _ = coordinator.Open(client.conf) return coordinator, nil } +func (client *client) RefreshCoordinator(consumerGroup string) error { + coordinator, err := client.getCoordinator(consumerGroup, client.conf.Metadata.Retry.Max) + if err != nil { + return err + } + + client.lock.Lock() + client.coordinators[consumerGroup] = client.registerBroker(coordinator) + client.lock.Unlock() + + return nil +} + // private broker management helpers func (client *client) disconnectBroker(broker *Broker) { @@ -632,7 +650,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) { return ret, err } -func (client *client) refreshCoordinator(consumerGroup string, attemptsRemaining int) (*Broker, error) { +func (client *client) getCoordinator(consumerGroup string, attemptsRemaining int) (*Broker, error) { for broker := client.any(); broker != nil; broker = client.any() { Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr()) @@ -676,14 +694,8 @@ func (client *client) refreshCoordinator(consumerGroup string, attemptsRemaining time.Sleep(client.conf.Metadata.Retry.Backoff) client.resurrectDeadBrokers() - return client.refreshCoordinator(consumerGroup, attemptsRemaining-1) + return client.getCoordinator(consumerGroup, attemptsRemaining-1) } return nil, ErrOutOfBrokers } - -func (client *client) invalidateCoordinator(consumerGroup string) { - client.lock.Lock() - delete(client.coordinators, consumerGroup) - client.lock.Unlock() -} diff --git a/client_test.go b/client_test.go index b1cb1bfef..4e57d8d87 100644 --- a/client_test.go +++ b/client_test.go @@ -425,7 +425,8 @@ func TestClientResurrectDeadSeeds(t *testing.T) { func TestClientCoordinator(t *testing.T) { seedBroker := newMockBroker(t, 1) - coordinator := newMockBroker(t, 5) + staleCoordinator := newMockBroker(t, 2) + freshCoordinator := newMockBroker(t, 3) metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) @@ -440,9 +441,9 @@ func TestClientCoordinator(t *testing.T) { seedBroker.Returns(coordinatorResponse1) coordinatorResponse2 := new(ConsumerMetadataResponse) - coordinatorResponse2.CoordinatorID = coordinator.BrokerID() + coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID() coordinatorResponse2.CoordinatorHost = "127.0.0.1" - coordinatorResponse2.CoordinatorPort = coordinator.Port() + coordinatorResponse2.CoordinatorPort = staleCoordinator.Port() seedBroker.Returns(coordinatorResponse2) @@ -451,15 +452,48 @@ func TestClientCoordinator(t *testing.T) { t.Error(err) } - if coordinator.Addr() != broker.Addr() { - t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr()) + if staleCoordinator.Addr() != broker.Addr() { + t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr()) } - if coordinator.BrokerID() != broker.ID() { - t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID()) + if staleCoordinator.BrokerID() != broker.ID() { + t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID()) } - coordinator.Close() + // Grab the cached value + broker2, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + + if broker2.Addr() != broker.Addr() { + t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr()) + } + + coordinatorResponse3 := new(ConsumerMetadataResponse) + coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID() + coordinatorResponse3.CoordinatorHost = "127.0.0.1" + coordinatorResponse3.CoordinatorPort = freshCoordinator.Port() + + seedBroker.Returns(coordinatorResponse3) + + // Refresh the locally cahced value because it's stale + if err := client.RefreshCoordinator("my_group"); err != nil { + t.Error(err) + } + + // Grab the fresh value + broker3, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + + if broker3.Addr() != freshCoordinator.Addr() { + t.Errorf("Expected the freshCoordinator to be returned, but found %s vs. %s", broker3.Addr()) + } + + freshCoordinator.Close() + staleCoordinator.Close() seedBroker.Close() safeClose(t, client) }