Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DescribeLogDirs to admin client #1646

Merged
merged 1 commit into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type ClusterAdmin interface {
// Get information about the nodes in the cluster
DescribeCluster() (brokers []*Broker, controllerID int32, err error)

// Get information about all log directories on the given set of brokers
DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -878,3 +881,48 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {

return nil
}

func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)

// Query brokers in parallel, since we may have to query multiple brokers
logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
errors := make(chan error, len(brokerIds))
wg := sync.WaitGroup{}

for _, b := range brokerIds {
wg.Add(1)
broker, err := ca.findBroker(b)
if err != nil {
Logger.Printf("Unable to find broker with ID = %v\n", b)
continue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of a missing broker, will the waitgroup ever finish waiting since a done was never called at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case a broker does not reply, it will block for config.Net.ReadTimeout and return with an error read tcp 127.0.0.1:60810->127.0.0.1:9092: i/o timeout.

The same happens in ListConsumerGroups.

}
go func(b *Broker, conf *Config) {
defer wg.Done()
_ = b.Open(conf) // Ensure that broker is opened

response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
if err != nil {
errors <- err
return
}
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
logDirs[b.ID()] = response.LogDirs
logDirsMaps <- logDirs
}(broker, ca.conf)
}

wg.Wait()
close(logDirsMaps)
close(errors)

for logDirsMap := range logDirsMaps {
for id, logDirs := range logDirsMap {
allLogDirs[id] = logDirs
}
}

// Intentionally return only the first error for simplicity
err = <-errors
return
}
48 changes: 48 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,3 +1309,51 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
seedBroker2.BrokerID(), b.ID())
}
}

func TestDescribeLogDirs(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t).
SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}),
})

config := NewConfig()
config.Version = V1_0_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

logDirsPerBroker, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID()})
if err != nil {
t.Fatal(err)
}

if len(logDirsPerBroker) != 1 {
t.Fatalf("Expected %v results, got %v", 1, len(logDirsPerBroker))
}
logDirs := logDirsPerBroker[seedBroker.BrokerID()]
if len(logDirs) != 1 {
t.Fatalf("Expected log dirs for broker %v to be returned, but it did not, got %v", seedBroker.BrokerID(), len(logDirs))
}
logDirsBroker := logDirs[0]
if logDirsBroker.ErrorCode != ErrNoError {
t.Fatalf("Expected no error for broker %v, but it was %v", seedBroker.BrokerID(), logDirsBroker.ErrorCode)
}
if logDirsBroker.Path != "/tmp/logs" {
t.Fatalf("Expected log dirs for broker %v to be '/tmp/logs', but it was %v", seedBroker.BrokerID(), logDirsBroker.Path)
}
if len(logDirsBroker.Topics) != 2 {
t.Fatalf("Expected log dirs for broker %v to have 2 topics, but it had %v", seedBroker.BrokerID(), len(logDirsBroker.Topics))
}
err = admin.Close()
if err != nil {
t.Fatal(err)
}
}
6 changes: 6 additions & 0 deletions describe_log_dirs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
return err
}

if err := pe.putArrayLength(len(r.Topics)); err != nil {
return err
}
for _, topic := range r.Topics {
if err := topic.encode(pe); err != nil {
return err
Expand Down Expand Up @@ -137,6 +140,9 @@ func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
return err
}

if err := pe.putArrayLength(len(r.Partitions)); err != nil {
return err
}
for _, partition := range r.Partitions {
if err := partition.encode(pe); err != nil {
return err
Expand Down
42 changes: 42 additions & 0 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,3 +1029,45 @@ func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHead
}
return resp
}

type MockDescribeLogDirsResponse struct {
t TestReporter
logDirs []DescribeLogDirsResponseDirMetadata
}

func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
return &MockDescribeLogDirsResponse{t: t}
}

func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
topics := []DescribeLogDirsResponseTopic{}
for topic := range topicPartitions {
partitions := []DescribeLogDirsResponsePartition{}
for i := 0; i < topicPartitions[topic]; i++ {
partitions = append(partitions, DescribeLogDirsResponsePartition{
PartitionID: int32(i),
IsTemporary: false,
OffsetLag: int64(0),
Size: int64(1234),
})
}
topics = append(topics, DescribeLogDirsResponseTopic{
Topic: topic,
Partitions: partitions,
})
}
logDir := DescribeLogDirsResponseDirMetadata{
ErrorCode: ErrNoError,
Path: logDirPath,
Topics: topics,
}
m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
return m
}

func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
resp := &DescribeLogDirsResponse{
LogDirs: m.logDirs,
}
return resp
}