diff --git a/kafka/manager.go b/kafka/manager.go index 98fb6f77..ec26b188 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "regexp" + "slices" "strings" "github.com/twmb/franz-go/pkg/kadm" @@ -341,3 +342,25 @@ func (m *Manager) CreateACLs(ctx context.Context, acls *kadm.ACLBuilder) error { } return errors.Join(errs...) } + +// ListTopics returns all topics that begin with prefix in lexicographical order from the Kafka broker. +func (m *Manager) ListTopics(ctx context.Context, prefix string) ([]string, error) { + details, err := m.adminClient.ListTopics(ctx) + if err != nil { + return nil, err + } + topics := make([]string, 0, len(details)) + var errs []error + for _, t := range details { + if !strings.HasPrefix(t.Topic, prefix) { + continue + } + if t.Err != nil { + errs = append(errs, fmt.Errorf("%s %w", t.Topic, t.Err)) + continue + } + topics = append(topics, t.Topic) + } + slices.Sort(topics) + return topics, errors.Join(errs...) +} diff --git a/kafka/manager_test.go b/kafka/manager_test.go index 6249cbe2..57061728 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -705,6 +705,42 @@ func TestManagerCreateACLs(t *testing.T) { }) } +func TestListTopics(t *testing.T) { + cluster, commonConfig := newFakeCluster(t) + m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) + require.NoError(t, err) + t.Cleanup(func() { m.Close() }) + var metadataRequest *kmsg.MetadataRequest + cluster.ControlKey(kmsg.Metadata.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + metadataRequest = req.(*kmsg.MetadataRequest) + cluster.KeepControl() + return &kmsg.MetadataResponse{ + Version: metadataRequest.Version, + Brokers: []kmsg.MetadataResponseBroker{}, + Topics: []kmsg.MetadataResponseTopic{{ + Topic: kmsg.StringPtr("name_space-topic1"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 1}, {Partition: 2}}, + }, { + Topic: kmsg.StringPtr("name_space-topic2"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 3}}, + ErrorCode: kerr.UnknownTopicOrPartition.Code, + }, { + Topic: kmsg.StringPtr("name_space-topic3"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 4}}, + }, { + Topic: kmsg.StringPtr("name_space-mytopic"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 1}}, + }, { + Topic: kmsg.StringPtr("rnd-topic"), + Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 4}}, + }}, + }, nil, true + }) + topics, err := m.ListTopics(context.Background(), "name_space") + assert.EqualError(t, err, "name_space-topic2 UNKNOWN_TOPIC_OR_PARTITION: This server does not host this topic-partition.") + assert.Equal(t, []string{"name_space-mytopic", "name_space-topic1", "name_space-topic3"}, topics) +} + func newFakeCluster(t testing.TB) (*kfake.Cluster, CommonConfig) { cluster, err := kfake.NewCluster( // Just one broker to simplify dealing with sharded requests.