/
consumer.go
66 lines (61 loc) · 1.36 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package behavior
import (
"encoding/json"
"time"
"github.com/cupen/game-anti-addiction/auth"
"go.uber.org/ratelimit"
)
func DecodeLoginOutRequest(msgList [][]byte, batchSize int) ([]*LoginOutRequest, error) {
var rsList = []*LoginOutRequest{}
var events = []LoginOutEvent{}
var i = 0
var lastErr error = nil
for _, msg := range msgList {
obj := LoginOutEvent{}
if err := json.Unmarshal(msg, &obj); err != nil {
lastErr = err
continue
}
i %= batchSize
i++
obj.Num = i
events = append(events, obj)
if len(events) >= batchSize {
rsList = append(rsList, &LoginOutRequest{Collections: events})
events = []LoginOutEvent{}
}
}
if len(events) > 0 {
rsList = append(rsList, &LoginOutRequest{Collections: events})
}
return rsList, lastErr
}
func ConsumerFunc(c *auth.Client, batchSize, rate int) func([][]byte) error {
limiter := ratelimit.New(rate)
return func(msgList [][]byte) error {
if len(msgList) <= 0 {
return nil
}
reqList, err := DecodeLoginOutRequest(msgList, batchSize)
if reqList == nil {
return err
}
for _, req := range reqList {
for i := 0; i < 3; i++ {
if i > 0 {
time.Sleep(time.Duration(i) * time.Second)
}
_ = limiter.Take()
resp, err := req.Do(c)
if err != nil {
continue
}
if resp.ErrCode != 0 && resp.CanRetry() {
continue
}
break
}
}
return nil
}
}