forked from twmb/franz-go
/
37_create_partitions.go
66 lines (57 loc) · 1.53 KB
/
37_create_partitions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package kfake
import (
"github.com/burningass23/franz-go/pkg/kerr"
"github.com/burningass23/franz-go/pkg/kmsg"
)
func init() { regKey(37, 0, 3) }
func (c *Cluster) handleCreatePartitions(b *broker, kreq kmsg.Request) (kmsg.Response, error) {
req := kreq.(*kmsg.CreatePartitionsRequest)
resp := req.ResponseKind().(*kmsg.CreatePartitionsResponse)
if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}
donet := func(t string, errCode int16) *kmsg.CreatePartitionsResponseTopic {
st := kmsg.NewCreatePartitionsResponseTopic()
st.Topic = t
st.ErrorCode = errCode
resp.Topics = append(resp.Topics, st)
return &resp.Topics[len(resp.Topics)-1]
}
donets := func(errCode int16) {
for _, rt := range req.Topics {
donet(rt.Topic, errCode)
}
}
if b != c.controller {
donets(kerr.NotController.Code)
return resp, nil
}
uniq := make(map[string]struct{})
for _, rt := range req.Topics {
if _, ok := uniq[rt.Topic]; ok {
donets(kerr.InvalidRequest.Code)
return resp, nil
}
uniq[rt.Topic] = struct{}{}
}
for _, rt := range req.Topics {
t, ok := c.data.tps.gett(rt.Topic)
if !ok {
donet(rt.Topic, kerr.UnknownTopicOrPartition.Code)
continue
}
if len(rt.Assignment) > 0 {
donet(rt.Topic, kerr.InvalidReplicaAssignment.Code)
continue
}
if rt.Count < int32(len(t)) {
donet(rt.Topic, kerr.InvalidPartitions.Code)
continue
}
for i := int32(len(t)); i < rt.Count; i++ {
c.data.tps.mkp(rt.Topic, i, c.newPartData)
}
donet(rt.Topic, 0)
}
return resp, nil
}