/
describe_consumer_groups.go
89 lines (74 loc) · 2.31 KB
/
describe_consumer_groups.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package minion
import (
"context"
"fmt"
"time"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
)
type DescribeConsumerGroupsResponse struct {
BrokerMetadata kgo.BrokerMetadata
Groups *kmsg.DescribeGroupsResponse
}
func (s *Service) listConsumerGroupsCached(ctx context.Context) (*kmsg.ListGroupsResponse, error) {
reqId := ctx.Value("requestId").(string)
key := "list-consumer-groups-" + reqId
if cachedRes, exists := s.getCachedItem(key); exists {
return cachedRes.(*kmsg.ListGroupsResponse), nil
}
res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) {
res, err := s.listConsumerGroups(ctx)
if err != nil {
return nil, err
}
s.setCachedItem(key, res, 120*time.Second)
return res, nil
})
if err != nil {
return nil, err
}
return res.(*kmsg.ListGroupsResponse), nil
}
func (s *Service) listConsumerGroups(ctx context.Context) (*kmsg.ListGroupsResponse, error) {
listReq := kmsg.NewListGroupsRequest()
res, err := listReq.RequestWith(ctx, s.client)
if err != nil {
return nil, fmt.Errorf("failed to list consumer groups: %w", err)
}
err = kerr.ErrorForCode(res.ErrorCode)
if err != nil {
return nil, fmt.Errorf("failed to list consumer groups. inner kafka error: %w", err)
}
return res, nil
}
func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsumerGroupsResponse, error) {
listRes, err := s.listConsumerGroupsCached(ctx)
if err != nil {
return nil, err
}
groupIDs := make([]string, len(listRes.Groups))
for i, group := range listRes.Groups {
groupIDs[i] = group.Group
}
describeReq := kmsg.NewDescribeGroupsRequest()
describeReq.Groups = groupIDs
describeReq.IncludeAuthorizedOperations = false
shardedResp := s.client.RequestSharded(ctx, &describeReq)
describedGroups := make([]DescribeConsumerGroupsResponse, 0)
for _, kresp := range shardedResp {
if kresp.Err != nil {
s.logger.Warn("broker failed to respond to the described groups request",
zap.Int32("broker_id", kresp.Meta.NodeID),
zap.Error(kresp.Err))
continue
}
res := kresp.Resp.(*kmsg.DescribeGroupsResponse)
describedGroups = append(describedGroups, DescribeConsumerGroupsResponse{
BrokerMetadata: kresp.Meta,
Groups: res,
})
}
return describedGroups, nil
}