-
Notifications
You must be signed in to change notification settings - Fork 21
/
ack.go
87 lines (74 loc) · 1.71 KB
/
ack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main
import (
"flag"
"fmt"
"log"
"strconv"
"time"
"github.com/funkygao/gafka/cmd/kateway/api/v1"
"github.com/funkygao/gafka/ctx"
)
var (
addr string
n int
appid string
group string
topic string
step int
sleep time.Duration
)
func init() {
ip, _ := ctx.LocalIP()
flag.StringVar(&addr, "addr", fmt.Sprintf("%s:9192", ip), "sub kateway addr")
flag.StringVar(&group, "g", "group1", "consumer group name")
flag.StringVar(&appid, "appid", "app1", "consume whose topic")
flag.IntVar(&step, "step", 1, "display progress step")
flag.StringVar(&topic, "t", "foobar", "topic to sub")
flag.DurationVar(&sleep, "sleep", 0, "sleep between pub")
flag.IntVar(&n, "n", 1000000, "run sub how many times")
flag.Parse()
}
func main() {
cf := api.DefaultConfig("app2", "mysecret")
cf.Debug = true
cf.Sub.Endpoint = addr
c := api.NewClient(cf)
i := 0
t0 := time.Now()
var err error
opt := api.SubOption{
AppId: appid,
Topic: topic,
Ver: "v1",
Group: group,
}
err = c.SubX(opt, func(statusCode int, msg []byte, r *api.SubXResult) error {
log.Printf("i=%d, status:%d, r:%+v msg:%s", i, statusCode, *r, string(msg))
offset, _ := strconv.Atoi(r.Offset)
if i < 3 {
i++
r.Partition = "-1"
r.Offset = "-1"
return nil
}
r.Offset = fmt.Sprintf("%d", offset+10)
log.Println("try error: commit too large offset")
if i == 4 {
r.Offset = fmt.Sprintf("%d", offset-10)
log.Println("try error: commit too small offset")
}
i++
if i > 5 {
return api.ErrSubStop
}
if false {
time.Sleep(time.Second * 2)
}
return nil
})
if err != nil {
log.Println(err)
}
elapsed := time.Since(t0)
log.Printf("%d msgs in %s, tps: %.2f\n", n, elapsed, float64(n)/elapsed.Seconds())
}