From 69da5cea44d95a57ab32777d52c08a0e19e75adb Mon Sep 17 00:00:00 2001 From: Kostiantyn Masliuk <1pkg@protonmail.com> Date: Thu, 31 Jul 2025 16:22:46 -0700 Subject: [PATCH 1/2] kafka: queuemanager add api to list all topics --- kafka/manager.go | 20 ++++++++++++++++++++ kafka/manager_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/kafka/manager.go b/kafka/manager.go index 98fb6f77..15cd266d 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "regexp" + "slices" "strings" "github.com/twmb/franz-go/pkg/kadm" @@ -341,3 +342,22 @@ func (m *Manager) CreateACLs(ctx context.Context, acls *kadm.ACLBuilder) error { } return errors.Join(errs...) } + +// ListTopics returns all topics in lexicographical order from the Kafka broker. +func (m *Manager) ListTopics(ctx context.Context) ([]string, error) { + details, err := m.adminClient.ListTopics(ctx) + if err != nil { + return nil, err + } + topics := make([]string, 0, len(details)) + var errs []error + for _, t := range details { + if t.Err != nil { + errs = append(errs, fmt.Errorf("%s %w", t.Topic, t.Err)) + continue + } + topics = append(topics, t.Topic) + } + slices.Sort(topics) + return topics, errors.Join(errs...) +} diff --git a/kafka/manager_test.go b/kafka/manager_test.go index 6249cbe2..0df0c6a8 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -705,6 +705,39 @@ func TestManagerCreateACLs(t *testing.T) { }) } +func TestListTopics(t *testing.T) { + cluster, commonConfig := newFakeCluster(t) + m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) + require.NoError(t, err) + t.Cleanup(func() { m.Close() }) + var metadataRequest *kmsg.MetadataRequest + cluster.ControlKey(kmsg.Metadata.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + metadataRequest = req.(*kmsg.MetadataRequest) + cluster.KeepControl() + return &kmsg.MetadataResponse{ + Version: metadataRequest.Version, + Brokers: []kmsg.MetadataResponseBroker{}, + Topics: []kmsg.MetadataResponseTopic{{ + Topic: kmsg.StringPtr("name_space-topic1"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 1}, {Partition: 2}}, + }, { + Topic: kmsg.StringPtr("name_space-topic2"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 3}}, + ErrorCode: 3, // UNKNOWN_TOPIC_OR_PARTITION + }, { + Topic: kmsg.StringPtr("name_space-topic3"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 4}}, + }, { + Topic: kmsg.StringPtr("name_space-mytopic"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 1}}, + }}, + }, nil, true + }) + topics, err := m.ListTopics(context.Background()) + assert.EqualError(t, err, "name_space-topic2 UNKNOWN_TOPIC_OR_PARTITION: This server does not host this topic-partition.") + assert.Equal(t, []string{"name_space-mytopic", "name_space-topic1", "name_space-topic3"}, topics) +} + func newFakeCluster(t testing.TB) (*kfake.Cluster, CommonConfig) { cluster, err := kfake.NewCluster( // Just one broker to simplify dealing with sharded requests. From 2d0be0db86a49aed9bca5a62b10d59eaa2a76fec Mon Sep 17 00:00:00 2001 From: Kostiantyn Masliuk <1pkg@protonmail.com> Date: Fri, 1 Aug 2025 12:57:32 -0700 Subject: [PATCH 2/2] kafka: list topics api review feedback --- kafka/manager.go | 7 +++++-- kafka/manager_test.go | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/kafka/manager.go b/kafka/manager.go index 15cd266d..ec26b188 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -343,8 +343,8 @@ func (m *Manager) CreateACLs(ctx context.Context, acls *kadm.ACLBuilder) error { return errors.Join(errs...) } -// ListTopics returns all topics in lexicographical order from the Kafka broker. -func (m *Manager) ListTopics(ctx context.Context) ([]string, error) { +// ListTopics returns all topics that begin with prefix in lexicographical order from the Kafka broker. +func (m *Manager) ListTopics(ctx context.Context, prefix string) ([]string, error) { details, err := m.adminClient.ListTopics(ctx) if err != nil { return nil, err @@ -352,6 +352,9 @@ func (m *Manager) ListTopics(ctx context.Context) ([]string, error) { topics := make([]string, 0, len(details)) var errs []error for _, t := range details { + if !strings.HasPrefix(t.Topic, prefix) { + continue + } if t.Err != nil { errs = append(errs, fmt.Errorf("%s %w", t.Topic, t.Err)) continue diff --git a/kafka/manager_test.go b/kafka/manager_test.go index 0df0c6a8..57061728 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -723,17 +723,20 @@ func TestListTopics(t *testing.T) { }, { Topic: kmsg.StringPtr("name_space-topic2"), Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 3}}, - ErrorCode: 3, // UNKNOWN_TOPIC_OR_PARTITION + ErrorCode: kerr.UnknownTopicOrPartition.Code, }, { Topic: kmsg.StringPtr("name_space-topic3"), Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 4}}, }, { Topic: kmsg.StringPtr("name_space-mytopic"), Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 1}}, + }, { + Topic: kmsg.StringPtr("rnd-topic"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 4}}, }}, }, nil, true }) - topics, err := m.ListTopics(context.Background()) + topics, err := m.ListTopics(context.Background(), "name_space") assert.EqualError(t, err, "name_space-topic2 UNKNOWN_TOPIC_OR_PARTITION: This server does not host this topic-partition.") assert.Equal(t, []string{"name_space-mytopic", "name_space-topic1", "name_space-topic3"}, topics) }