From c57f8cfa660aacfa5c9dd2d58885ec7e3f286e04 Mon Sep 17 00:00:00 2001 From: funkygao Date: Thu, 12 Nov 2015 22:12:43 +0800 Subject: [PATCH 1/3] client will export isr info of a topic/partition --- client.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/client.go b/client.go index 210dd9a38..ef19b2992 100644 --- a/client.go +++ b/client.go @@ -295,6 +295,31 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) return dupeAndSort(metadata.Replicas), nil } +func (client *client) Isr(topic string, partitionID int32) ([]int32, error) { + if client.Closed() { + return nil, ErrClosedClient + } + + metadata := client.cachedMetadata(topic, partitionID) + + if metadata == nil { + err := client.RefreshMetadata(topic) + if err != nil { + return nil, err + } + metadata = client.cachedMetadata(topic, partitionID) + } + + if metadata == nil { + return nil, ErrUnknownTopicOrPartition + } + + if metadata.Err == ErrReplicaNotAvailable { + return nil, metadata.Err + } + return dupeAndSort(metadata.Isr), nil +} + func (client *client) Leader(topic string, partitionID int32) (*Broker, error) { if client.Closed() { return nil, ErrClosedClient From f79a4d6153050d0d1e223e5d3d254542e26d466b Mon Sep 17 00:00:00 2001 From: Kyle Hargraves Date: Mon, 1 May 2017 11:48:28 -0500 Subject: [PATCH 2/3] Add Isr to Client interface --- client.go | 3 +++ client_test.go | 11 +++++++++++ 2 files changed, 14 insertions(+) diff --git a/client.go b/client.go index ef19b2992..615ef9761 100644 --- a/client.go +++ b/client.go @@ -38,6 +38,9 @@ type Client interface { // Replicas returns the set of all replica IDs for the given partition. Replicas(topic string, partitionID int32) ([]int32, error) + // Isr returns the set of in-sync replica IDs for the given partition. + Isr(topic string, partitionID int32) ([]int32, error) + // RefreshMetadata takes a list of topics and queries the cluster to refresh the // available metadata for those topics. If no topics are provided, it will refresh // metadata for all topics. diff --git a/client_test.go b/client_test.go index b0559466f..85a97d174 100644 --- a/client_test.go +++ b/client_test.go @@ -196,6 +196,17 @@ func TestClientMetadata(t *testing.T) { t.Error("Incorrect (or unsorted) replica") } + isr, err = client.Isr("my_topic", 0) + if err != nil { + t.Error(err) + } else if len(isr) != 2 { + t.Error("Client returned incorrect ISRs for partition:", isr) + } else if isr[0] != 1 { + t.Error("Incorrect (or unsorted) ISR:", isr) + } else if isr[1] != 5 { + t.Error("Incorrect (or unsorted) ISR:", isr) + } + leader.Close() seedBroker.Close() safeClose(t, client) From dbc0382c178bf0da82e37b98b9b9434feddfc264 Mon Sep 17 00:00:00 2001 From: Kyle Hargraves Date: Mon, 1 May 2017 14:19:41 -0500 Subject: [PATCH 3/3] Rename client.Isr to client.InSyncReplicas --- client.go | 8 +++++--- client_test.go | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/client.go b/client.go index 615ef9761..45de3973d 100644 --- a/client.go +++ b/client.go @@ -38,8 +38,10 @@ type Client interface { // Replicas returns the set of all replica IDs for the given partition. Replicas(topic string, partitionID int32) ([]int32, error) - // Isr returns the set of in-sync replica IDs for the given partition. - Isr(topic string, partitionID int32) ([]int32, error) + // InSyncReplicas returns the set of all in-sync replica IDs for the given + // partition. In-sync replicas are replicas which are fully caught up with + // the partition leader. + InSyncReplicas(topic string, partitionID int32) ([]int32, error) // RefreshMetadata takes a list of topics and queries the cluster to refresh the // available metadata for those topics. If no topics are provided, it will refresh @@ -298,7 +300,7 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) return dupeAndSort(metadata.Replicas), nil } -func (client *client) Isr(topic string, partitionID int32) ([]int32, error) { +func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) { if client.Closed() { return nil, ErrClosedClient } diff --git a/client_test.go b/client_test.go index 85a97d174..0bac1b405 100644 --- a/client_test.go +++ b/client_test.go @@ -196,7 +196,7 @@ func TestClientMetadata(t *testing.T) { t.Error("Incorrect (or unsorted) replica") } - isr, err = client.Isr("my_topic", 0) + isr, err = client.InSyncReplicas("my_topic", 0) if err != nil { t.Error(err) } else if len(isr) != 2 {