diff --git a/admin.go b/admin.go index a7d733da1..44e7780b0 100644 --- a/admin.go +++ b/admin.go @@ -101,6 +101,9 @@ type ClusterAdmin interface { // Get information about the nodes in the cluster DescribeCluster() (brokers []*Broker, controllerID int32, err error) + // Get information about all log directories on the given set of brokers + DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error) + // Close shuts down the admin and closes underlying client. Close() error } @@ -878,3 +881,48 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { return nil } + +func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) { + allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata) + + // Query brokers in parallel, since we may have to query multiple brokers + logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds)) + errors := make(chan error, len(brokerIds)) + wg := sync.WaitGroup{} + + for _, b := range brokerIds { + wg.Add(1) + broker, err := ca.findBroker(b) + if err != nil { + Logger.Printf("Unable to find broker with ID = %v\n", b) + continue + } + go func(b *Broker, conf *Config) { + defer wg.Done() + _ = b.Open(conf) // Ensure that broker is opened + + response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{}) + if err != nil { + errors <- err + return + } + logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata) + logDirs[b.ID()] = response.LogDirs + logDirsMaps <- logDirs + }(broker, ca.conf) + } + + wg.Wait() + close(logDirsMaps) + close(errors) + + for logDirsMap := range logDirsMaps { + for id, logDirs := range logDirsMap { + allLogDirs[id] = logDirs + } + } + + // Intentionally return only the first error for simplicity + err = <-errors + return +} diff --git a/admin_test.go b/admin_test.go index bcec47f61..ce7e46f58 100644 --- a/admin_test.go +++ b/admin_test.go @@ -1309,3 +1309,51 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) { seedBroker2.BrokerID(), b.ID()) } } + +func TestDescribeLogDirs(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t). + SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + logDirsPerBroker, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID()}) + if err != nil { + t.Fatal(err) + } + + if len(logDirsPerBroker) != 1 { + t.Fatalf("Expected %v results, got %v", 1, len(logDirsPerBroker)) + } + logDirs := logDirsPerBroker[seedBroker.BrokerID()] + if len(logDirs) != 1 { + t.Fatalf("Expected log dirs for broker %v to be returned, but it did not, got %v", seedBroker.BrokerID(), len(logDirs)) + } + logDirsBroker := logDirs[0] + if logDirsBroker.ErrorCode != ErrNoError { + t.Fatalf("Expected no error for broker %v, but it was %v", seedBroker.BrokerID(), logDirsBroker.ErrorCode) + } + if logDirsBroker.Path != "/tmp/logs" { + t.Fatalf("Expected log dirs for broker %v to be '/tmp/logs', but it was %v", seedBroker.BrokerID(), logDirsBroker.Path) + } + if len(logDirsBroker.Topics) != 2 { + t.Fatalf("Expected log dirs for broker %v to have 2 topics, but it had %v", seedBroker.BrokerID(), len(logDirsBroker.Topics)) + } + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/describe_log_dirs_response.go b/describe_log_dirs_response.go index a9a747615..411da38ad 100644 --- a/describe_log_dirs_response.go +++ b/describe_log_dirs_response.go @@ -84,6 +84,9 @@ func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error { return err } + if err := pe.putArrayLength(len(r.Topics)); err != nil { + return err + } for _, topic := range r.Topics { if err := topic.encode(pe); err != nil { return err @@ -137,6 +140,9 @@ func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error { return err } + if err := pe.putArrayLength(len(r.Partitions)); err != nil { + return err + } for _, partition := range r.Partitions { if err := partition.encode(pe); err != nil { return err diff --git a/mockresponses.go b/mockresponses.go index e77463a58..dcb809dae 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -1029,3 +1029,45 @@ func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHead } return resp } + +type MockDescribeLogDirsResponse struct { + t TestReporter + logDirs []DescribeLogDirsResponseDirMetadata +} + +func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse { + return &MockDescribeLogDirsResponse{t: t} +} + +func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse { + topics := []DescribeLogDirsResponseTopic{} + for topic := range topicPartitions { + partitions := []DescribeLogDirsResponsePartition{} + for i := 0; i < topicPartitions[topic]; i++ { + partitions = append(partitions, DescribeLogDirsResponsePartition{ + PartitionID: int32(i), + IsTemporary: false, + OffsetLag: int64(0), + Size: int64(1234), + }) + } + topics = append(topics, DescribeLogDirsResponseTopic{ + Topic: topic, + Partitions: partitions, + }) + } + logDir := DescribeLogDirsResponseDirMetadata{ + ErrorCode: ErrNoError, + Path: logDirPath, + Topics: topics, + } + m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir} + return m +} + +func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader { + resp := &DescribeLogDirsResponse{ + LogDirs: m.logDirs, + } + return resp +}