forked from travisjeffery/jocko
/
create_topic_requests.go
113 lines (104 loc) · 2.23 KB
/
create_topic_requests.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package protocol
type CreateTopicRequest struct {
Topic string
NumPartitions int32
ReplicationFactor int16
ReplicaAssignment map[int32][]int32
Configs map[string]string
}
type CreateTopicRequests struct {
Requests []*CreateTopicRequest
Timeout int32
}
func (c *CreateTopicRequests) Encode(e PacketEncoder) error {
e.PutArrayLength(len(c.Requests))
for _, r := range c.Requests {
e.PutString(r.Topic)
e.PutInt32(r.NumPartitions)
e.PutInt16(r.ReplicationFactor)
e.PutArrayLength(len(r.ReplicaAssignment))
for pid, ass := range r.ReplicaAssignment {
e.PutInt32(pid)
for _, a := range ass {
e.PutInt32(a)
}
}
e.PutArrayLength(len(r.Configs))
for k, v := range r.Configs {
e.PutString(k)
e.PutString(v)
}
}
e.PutInt32(c.Timeout)
return nil
}
func (c *CreateTopicRequests) Decode(d PacketDecoder) error {
var err error
requestCount, err := d.ArrayLength()
if err != nil {
return err
}
c.Requests = make([]*CreateTopicRequest, requestCount)
for i := range c.Requests {
req := new(CreateTopicRequest)
c.Requests[i] = req
req.Topic, err = d.String()
if err != nil {
return err
}
req.NumPartitions, err = d.Int32()
if err != nil {
return err
}
req.ReplicationFactor, err = d.Int16()
if err != nil {
return err
}
assignmentCount, err := d.ArrayLength()
ra := make(map[int32][]int32, assignmentCount)
for i := 0; i < assignmentCount; i++ {
pid, err := d.Int32()
if err != nil {
return err
}
replicaCount, err := d.ArrayLength()
if err != nil {
return err
}
reps := make([]int32, replicaCount)
for i := range reps {
reps[i], err = d.Int32()
if err != nil {
return err
}
}
ra[pid] = reps
}
req.ReplicaAssignment = ra
configCount, err := d.ArrayLength()
if err != nil {
return err
}
c := make(map[string]string, configCount)
for j := 0; j < configCount; j++ {
k, err := d.String()
if err != nil {
return err
}
v, err := d.String()
if err != nil {
return err
}
c[k] = v
}
req.Configs = c
}
c.Timeout, err = d.Int32()
return err
}
func (c *CreateTopicRequests) Key() int16 {
return CreateTopicsKey
}
func (c *CreateTopicRequests) Version() int16 {
return 0
}