-
Notifications
You must be signed in to change notification settings - Fork 0
/
gen.go
275 lines (252 loc) · 7.15 KB
/
gen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package LoadGenerator
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"sync/atomic"
"time"
"LoadGenerator/lib"
"LoadGenerator/helper/log"
)
// 日志记录器。
var logger = log.DLogger()
// myGenerator 代表载荷发生器的实现类型。
type myGenerator struct {
caller lib.Caller // 调用器。
timeoutNS time.Duration // 处理超时时间,单位:纳秒。
lps uint32 // 每秒载荷量。
durationNS time.Duration // 负载持续时间,单位:纳秒。
concurrency uint32 // 载荷并发量。
tickets lib.GoTickets // Goroutine票池。
ctx context.Context // 上下文。
cancelFunc context.CancelFunc // 取消函数。
callCount int64 // 调用计数。
status uint32 // 状态。
resultCh chan *lib.CallResult // 调用结果通道。
}
// NewGenerator 会新建一个载荷发生器。
func NewGenerator(pset ParamSet) (lib.Generator, error) {
logger.Infoln("New a load generator...")
if err := pset.Check(); err != nil {
return nil, err
}
gen := &myGenerator{
caller: pset.Caller,
timeoutNS: pset.TimeoutNS,
lps: pset.LPS,
durationNS: pset.DurationNS,
status: lib.STATUS_ORIGINAL,
resultCh: pset.ResultCh,
}
if err := gen.init(); err != nil {
return nil, err
}
return gen, nil
}
// 初始化载荷发生器。
func (gen *myGenerator) init() error {
var buf bytes.Buffer
buf.WriteString("Initializing the load generator...")
// 载荷的并发量 ≈ 载荷的响应超时时间 / 载荷的发送间隔时间
var total64 = int64(gen.timeoutNS)/int64(1e9/gen.lps) + 1
if total64 > math.MaxInt32 {
total64 = math.MaxInt32
}
gen.concurrency = uint32(total64)
tickets, err := lib.NewGoTickets(gen.concurrency)
if err != nil {
return err
}
gen.tickets = tickets
buf.WriteString(fmt.Sprintf("Done. (concurrency=%d)", gen.concurrency))
logger.Infoln(buf.String())
return nil
}
// callOne 会向载荷承受方发起一次调用。
func (gen *myGenerator) callOne(rawReq *lib.RawReq) *lib.RawResp {
atomic.AddInt64(&gen.callCount, 1)
if rawReq == nil {
return &lib.RawResp{ID: -1, Err: errors.New("Invalid raw request.")}
}
start := time.Now().UnixNano()
resp, err := gen.caller.Call(rawReq.Req, gen.timeoutNS)
end := time.Now().UnixNano()
elapsedTime := time.Duration(end - start)
var rawResp lib.RawResp
if err != nil {
errMsg := fmt.Sprintf("Sync Call Error: %s.", err)
rawResp = lib.RawResp{
ID: rawReq.ID,
Err: errors.New(errMsg),
Elapse: elapsedTime}
} else {
rawResp = lib.RawResp{
ID: rawReq.ID,
Resp: resp,
Elapse: elapsedTime}
}
return &rawResp
}
// asyncSend 会异步地调用承受方接口。
func (gen *myGenerator) asyncCall() {
gen.tickets.Take()
go func() {
defer func() {
if p := recover(); p != nil {
err, ok := interface{}(p).(error)
var errMsg string
if ok {
errMsg = fmt.Sprintf("Async Call Panic! (error: %s)", err)
} else {
errMsg = fmt.Sprintf("Async Call Panic! (clue: %#v)", p)
}
logger.Errorln(errMsg)
result := &lib.CallResult{
ID: -1,
Code: lib.RET_CODE_FATAL_CALL,
Msg: errMsg}
gen.sendResult(result)
}
gen.tickets.Return()
}()
rawReq := gen.caller.BuildReq()
// 调用状态:0-未调用或调用中;1-调用完成;2-调用超时。
var callStatus uint32
timer := time.AfterFunc(gen.timeoutNS, func() {
if !atomic.CompareAndSwapUint32(&callStatus, 0, 2) {
return
}
result := &lib.CallResult{
ID: rawReq.ID,
Req: rawReq,
Code: lib.RET_CODE_WARNING_CALL_TIMEOUT,
Msg: fmt.Sprintf("Timeout! (expected: < %v)", gen.timeoutNS),
Elapse: gen.timeoutNS,
}
gen.sendResult(result)
})
rawResp := gen.callOne(&rawReq)
if !atomic.CompareAndSwapUint32(&callStatus, 0, 1) {
return
}
timer.Stop()
var result *lib.CallResult
if rawResp.Err != nil {
result = &lib.CallResult{
ID: rawResp.ID,
Req: rawReq,
Code: lib.RET_CODE_ERROR_CALL,
Msg: rawResp.Err.Error(),
Elapse: rawResp.Elapse}
} else {
result = gen.caller.CheckResp(rawReq, *rawResp)
result.Elapse = rawResp.Elapse
}
gen.sendResult(result)
}()
}
// sendResult 用于发送调用结果。
func (gen *myGenerator) sendResult(result *lib.CallResult) bool {
if atomic.LoadUint32(&gen.status) != lib.STATUS_STARTED {
gen.printIgnoredResult(result, "stopped load generator")
return false
}
select {
case gen.resultCh <- result:
return true
default:
gen.printIgnoredResult(result, "full result channel")
return false
}
}
// printIgnoredResult 打印被忽略的结果。
func (gen *myGenerator) printIgnoredResult(result *lib.CallResult, cause string) {
resultMsg := fmt.Sprintf(
"ID=%d, Code=%d, Msg=%s, Elapse=%v",
result.ID, result.Code, result.Msg, result.Elapse)
logger.Warnf("Ignored result: %s. (cause: %s)\n", resultMsg, cause)
}
// prepareStop 用于为停止载荷发生器做准备。
func (gen *myGenerator) prepareToStop(ctxError error) {
logger.Infof("Prepare to stop load generator (cause: %s)...", ctxError)
atomic.CompareAndSwapUint32(
&gen.status, lib.STATUS_STARTED, lib.STATUS_STOPPING)
logger.Infof("Closing result channel...")
close(gen.resultCh)
atomic.StoreUint32(&gen.status, lib.STATUS_STOPPED)
}
// genLoad 会产生载荷并向承受方发送。
func (gen *myGenerator) genLoad(throttle <-chan time.Time) {
for {
select {
case <-gen.ctx.Done():
gen.prepareToStop(gen.ctx.Err())
return
default:
}
gen.asyncCall()
if gen.lps > 0 {
select {
case <-throttle:
case <-gen.ctx.Done():
gen.prepareToStop(gen.ctx.Err())
return
}
}
}
}
// Start 会启动载荷发生器。
func (gen *myGenerator) Start() bool {
logger.Infoln("Starting load generator...")
// 检查是否具备可启动的状态,顺便设置状态为正在启动
if !atomic.CompareAndSwapUint32(
&gen.status, lib.STATUS_ORIGINAL, lib.STATUS_STARTING) {
if !atomic.CompareAndSwapUint32(
&gen.status, lib.STATUS_STOPPED, lib.STATUS_STARTING) {
return false
}
}
// 设定节流阀。
var throttle <-chan time.Time
if gen.lps > 0 {
interval := time.Duration(1e9 / gen.lps)
logger.Infof("Setting throttle (%v)...", interval)
throttle = time.Tick(interval)
}
// 初始化上下文和取消函数。
gen.ctx, gen.cancelFunc = context.WithTimeout(
context.Background(), gen.durationNS)
// 初始化调用计数。
gen.callCount = 0
// 设置状态为已启动。
atomic.StoreUint32(&gen.status, lib.STATUS_STARTED)
go func() {
// 生成并发送载荷。
logger.Infoln("Generating loads...")
gen.genLoad(throttle)
logger.Infof("Stopped. (call count: %d)", gen.callCount)
}()
return true
}
func (gen *myGenerator) Stop() bool {
if !atomic.CompareAndSwapUint32(
&gen.status, lib.STATUS_STARTED, lib.STATUS_STOPPING) {
return false
}
gen.cancelFunc()
for {
if atomic.LoadUint32(&gen.status) == lib.STATUS_STOPPED {
break
}
time.Sleep(time.Microsecond)
}
return true
}
func (gen *myGenerator) Status() uint32 {
return atomic.LoadUint32(&gen.status)
}
func (gen *myGenerator) CallCount() int64 {
return atomic.LoadInt64(&gen.callCount)
}