forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
describegroups.go
85 lines (70 loc) · 2.33 KB
/
describegroups.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package describegroups
import (
"github.com/segmentio/kafka-go/protocol"
)
func init() {
protocol.Register(&Request{}, &Response{})
}
// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_DescribeGroups
type Request struct {
Groups []string `kafka:"min=v0,max=v4"`
IncludeAuthorizedOperations bool `kafka:"min=v3,max=v4"`
}
func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }
func (r *Request) Group() string {
return r.Groups[0]
}
func (r *Request) Split(cluster protocol.Cluster) (
[]protocol.Message,
protocol.Merger,
error,
) {
messages := []protocol.Message{}
// Split requests by group since they'll need to go to different coordinators.
for _, group := range r.Groups {
messages = append(
messages,
&Request{
Groups: []string{group},
IncludeAuthorizedOperations: r.IncludeAuthorizedOperations,
},
)
}
return messages, new(Response), nil
}
type Response struct {
ThrottleTimeMs int32 `kafka:"min=v1,max=v4"`
Groups []ResponseGroup `kafka:"min=v0,max=v4"`
}
type ResponseGroup struct {
ErrorCode int16 `kafka:"min=v0,max=v4"`
GroupID string `kafka:"min=v0,max=v4"`
GroupState string `kafka:"min=v0,max=v4"`
ProtocolType string `kafka:"min=v0,max=v4"`
ProtocolData string `kafka:"min=v0,max=v4"`
Members []ResponseGroupMember `kafka:"min=v0,max=v4"`
AuthorizedOperations int32 `kafka:"min=v3,max=v4"`
}
type ResponseGroupMember struct {
MemberID string `kafka:"min=v0,max=v4"`
GroupInstanceID string `kafka:"min=v4,max=v4,nullable"`
ClientID string `kafka:"min=v0,max=v4"`
ClientHost string `kafka:"min=v0,max=v4"`
MemberMetadata []byte `kafka:"min=v0,max=v4"`
MemberAssignment []byte `kafka:"min=v0,max=v4"`
}
func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }
func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
protocol.Message,
error,
) {
response := &Response{}
for _, result := range results {
m, err := protocol.Result(result)
if err != nil {
return nil, err
}
response.Groups = append(response.Groups, m.(*Response).Groups...)
}
return response, nil
}