-
Notifications
You must be signed in to change notification settings - Fork 21
/
job.go
100 lines (86 loc) · 1.88 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"flag"
"fmt"
"log"
"math/rand"
"strings"
"sync/atomic"
"time"
"github.com/funkygao/gafka/cmd/kateway/api/v1"
"github.com/funkygao/gafka/ctx"
)
var (
c int
addr string
n int64
msgSize int
debug bool
step int64
appid string
topic string
key string
workerId string
sleep time.Duration
tag string
)
func init() {
ip, _ := ctx.LocalIP()
flag.IntVar(&c, "c", 10, "client concurrency")
flag.IntVar(&msgSize, "sz", 100, "msg size")
flag.StringVar(&appid, "appid", "app1", "app id")
flag.DurationVar(&sleep, "sleep", 0, "sleep between pub")
flag.StringVar(&addr, "h", fmt.Sprintf("%s:9191", ip.String()), "pub http addr")
flag.Int64Var(&step, "step", 1, "display progress step")
flag.StringVar(&key, "key", "", "job key")
flag.BoolVar(&debug, "debug", false, "debug")
flag.StringVar(&tag, "tag", "", "add tag to each job")
flag.StringVar(&topic, "t", "foobar", "topic to pub")
flag.StringVar(&workerId, "id", "1", "worker id")
flag.Parse()
}
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
for i := 0; i < c; i++ {
go pubJobGatewayLoop(i)
}
select {}
}
func pubJobGatewayLoop(seq int) {
cf := api.DefaultConfig(appid, "mysecret")
cf.Pub.Endpoint = addr
cf.Debug = debug
client := api.NewClient(cf)
var (
err error
msg string
no int64
sz int
)
var opt api.PubOption
opt.Topic = topic
opt.Ver = "v1"
var jobId string
for {
sz = msgSize + rand.Intn(msgSize)
no = atomic.AddInt64(&n, 1)
msg = fmt.Sprintf("%s w:%s seq:%-2d no:%-10d payload:%s",
time.Now(),
workerId, seq, no, strings.Repeat("X", sz))
if tag != "" {
opt.Tag = tag
}
jobId, err = client.AddJob([]byte(msg), "1m", opt)
if err != nil {
fmt.Println(err)
no = atomic.AddInt64(&n, -1)
} else {
if no%step == 0 {
log.Println(jobId, msg)
}
}
if sleep > 0 {
time.Sleep(sleep)
}
}
}