Skip to content

Commit

Permalink
Merge pull request #872 from pd/isrs
Browse files Browse the repository at this point in the history
Add client.Isr to determine in-sync replicas
  • Loading branch information
eapache committed May 1, 2017
2 parents dd00cf9 + dbc0382 commit bdb312e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
30 changes: 30 additions & 0 deletions client.go
Expand Up @@ -38,6 +38,11 @@ type Client interface {
// Replicas returns the set of all replica IDs for the given partition.
Replicas(topic string, partitionID int32) ([]int32, error)

// InSyncReplicas returns the set of all in-sync replica IDs for the given
// partition. In-sync replicas are replicas which are fully caught up with
// the partition leader.
InSyncReplicas(topic string, partitionID int32) ([]int32, error)

// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
Expand Down Expand Up @@ -295,6 +300,31 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error)
return dupeAndSort(metadata.Replicas), nil
}

func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
}

metadata := client.cachedMetadata(topic, partitionID)

if metadata == nil {
err := client.RefreshMetadata(topic)
if err != nil {
return nil, err
}
metadata = client.cachedMetadata(topic, partitionID)
}

if metadata == nil {
return nil, ErrUnknownTopicOrPartition
}

if metadata.Err == ErrReplicaNotAvailable {
return nil, metadata.Err
}
return dupeAndSort(metadata.Isr), nil
}

func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
Expand Down
11 changes: 11 additions & 0 deletions client_test.go
Expand Up @@ -196,6 +196,17 @@ func TestClientMetadata(t *testing.T) {
t.Error("Incorrect (or unsorted) replica")
}

isr, err = client.InSyncReplicas("my_topic", 0)
if err != nil {
t.Error(err)
} else if len(isr) != 2 {
t.Error("Client returned incorrect ISRs for partition:", isr)
} else if isr[0] != 1 {
t.Error("Incorrect (or unsorted) ISR:", isr)
} else if isr[1] != 5 {
t.Error("Incorrect (or unsorted) ISR:", isr)
}

leader.Close()
seedBroker.Close()
safeClose(t, client)
Expand Down

0 comments on commit bdb312e

Please sign in to comment.