/
client.go
executable file
·354 lines (303 loc) · 7.65 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
package rc_go
import (
"fmt"
"log"
"os"
"time"
"bufio"
"io"
"encoding/binary"
"encoding/json"
"sync"
)
func unix_seconds(t time.Time) float64 {
return float64(t.UnixNano()) / 1e9
}
type Client interface {
Put(string, string) (string, error)
Get(string) (string, string, error)
Close ()
}
type jsonmap = map[string]interface{}
type Operation struct {
Payload map[string]string `json:"payload"`
Time float64 `json:"time"`
}
func decode_operation(m jsonmap) Operation {
var res Operation
payload := make(map[string]string)
for key, value := range m {
switch key {
case "payload":
for k,v := range value.(map[string]interface{}) {
payload[k] = v.(string)
}
res.Payload = payload
case "time":
res.Time = value.(float64)
default:
log.Fatal("Cannot parse operation: %v", m)
panic("Failed to parse packet")
}
}
return res
}
type Preload struct {
Prereq bool `json:"prereq"`
Operation Operation `json:"operation"`
}
func decode_preload(m jsonmap) Preload {
var res Preload
for key, value := range m {
switch key {
case "prereq":
res.Prereq = value.(bool)
case "operation":
res.Operation = decode_operation(value.(jsonmap))
case "kind":
default:
log.Fatal("Cannot parse preload: %v", m)
panic("Failed to parse packet")
}
}
return res
}
func response(
t_submitted float64,
t_result float64,
result string,
kind string,
clientid string,
other jsonmap) jsonmap {
return map[string]interface{}{
"kind": "result",
"t_submitted": t_submitted,
"t_result": t_result,
"result": result,
"op_kind": kind,
"clientid": clientid,
"other": other,
}
}
func put(cli Client, key string, value string, clientid string, expected_start float64) jsonmap{
st := unix_seconds(time.Now())
target, err := cli.Put(key, value)
end := unix_seconds(time.Now())
err_msg := "Success"
if err != nil {
err_msg = err.Error()
}
resp := response(st, end, err_msg, "write", clientid, map[string]interface{}{
"target": target,
"expected_start": expected_start,
})
return resp
}
func get(cli Client, key string, clientid string, expected_start float64) jsonmap{
st := unix_seconds(time.Now())
_, target, err := cli.Get(key)
end := unix_seconds(time.Now())
err_msg := "Success"
if err != nil {
err_msg = err.Error()
}
resp := response(st, end, err_msg, "read", clientid, map[string]interface{}{
"target": target,
"expected_start": expected_start,
})
return resp
}
func check(e error, s string) {
if e != nil {
log.Fatal("Fatal error: %s: %v",s, e)
}
}
func perform(op Operation, cli Client, clientid string, test_start_time float64, new_client_per_request bool, client_gen func () (Client, error)) jsonmap {
//Create a new client if desired
func_cli := &cli
if new_client_per_request {
cli, err := client_gen()
check(err, "generating client")
defer cli.Close()
func_cli = &cli
}
expected_start := op.Time + test_start_time
switch op.Payload["kind"] {
case "write":
return put(*func_cli, op.Payload["key"], op.Payload["value"], clientid, expected_start)
case "read":
return get(*func_cli, op.Payload["key"], clientid, expected_start)
default:
return response(
-1.0,
-1.0,
fmt.Sprintf("Error operation (%v) was not found or supported", op),
op.Payload["kind"],
clientid,
nil)
}
}
func recv(reader *bufio.Reader) jsonmap {
var size int32
if err := binary.Read(reader, binary.LittleEndian, &size); err != nil {
log.Fatal(err)
}
payload := make([]byte, size)
if _, err := io.ReadFull(reader, payload); err != nil {
log.Fatal(err)
}
var msg map[string]interface{}
json.Unmarshal(payload, &msg)
return msg
}
func send(msg jsonmap) {
payload, err := json.Marshal(msg)
check(err, "marshalling json")
var size int32
size = int32(len(payload))
size_part := make([]byte, 4)
binary.LittleEndian.PutUint32(size_part, uint32(size))
output := append(size_part[:], payload[:]...)
_, err = os.Stdout.Write(output)
check(err, "writing packet")
}
func init() {
log.SetOutput(os.Stderr)
}
func result_loop(res_ch chan jsonmap, done chan struct{}) {
var results []jsonmap
log.Print("Starting result loop")
for res := range res_ch {
results = append(results, res)
}
log.Print("Got all results, writing to load generator")
for _, res := range results {
send(res)
}
close(done)
}
func waitGroupChannel(wg *sync.WaitGroup) (<-chan struct{}) {
complete := make(chan struct{})
go func() {
wg.Wait()
close(complete)
} ()
return complete
}
// Buffering channel, to force delayed clients to quit
func make_messenger(input <-chan jsonmap, output chan jsonmap) chan struct{}{
close_this := make(chan struct{})
go func(messenger_close chan struct {}, input <-chan jsonmap, output chan jsonmap) {
for {
select {
case <- close_this:
close(output)
return
case result := <-input:
output <- result
}
}
}(close_this, input, output)
return close_this
}
func Run(client_gen func() (Client, error), clientid string, new_client_per_request bool) {
log.Print("Client: Starting run")
cli, err := client_gen()
defer cli.Close()
check(err, "create client")
reader := bufio.NewReader(os.Stdin)
//Phase 1: preload
log.Print("Phase 1: preload")
var ops []Operation
got_finalise := false
for !got_finalise {
op := recv(reader)
switch op["kind"] {
case "preload":
preload := decode_preload(op)
if preload.Prereq {
perform(preload.Operation, cli, clientid, unix_seconds(time.Now()), false, client_gen)
} else {
ops = append(ops, preload.Operation)
}
case "finalise":
got_finalise = true
default:
log.Print("Got unrecognised message...")
log.Print(op)
log.Fatal("Quitting due to unrecognised message")
}
}
//Phase 2: Readying
log.Print("Phase 2: Readying")
//Dispatch results loop
final_res_ch := make(chan jsonmap)
results_complete := make(chan struct{})
go result_loop(final_res_ch, results_complete)
res_ch := make(chan jsonmap, 50000)
messenger_complete := make_messenger(res_ch, final_res_ch)
//signal ready
log.Print("Sending ready")
send(map[string]interface{}{"kind": "ready"})
log.Print("Sent ready")
got_start := false
for !got_start{
op := recv(reader)
switch op["kind"]{
case "start":
log.Print("Got start_request")
got_start = true
default:
log.Fatal("Got a message which wasn't a start!")
}
}
//Phase 3: Execute
log.Print("Phase 3: Execute")
var wg_perform sync.WaitGroup
stopCh := make(chan struct{})
op_ch := make(chan Operation)
log.Print("Starting to perform ops")
start_time := time.Now()
for _, op := range ops {
end_time := start_time.Add(time.Duration(op.Time * float64(time.Second)))
t := time.Now()
time.Sleep(end_time.Sub(t))
select {
case op_ch <- op:
continue
default:
wg_perform.Add(1)
//If can't start op currently create a new worker to do so
go func(op_ch <-chan Operation, wg *sync.WaitGroup) {
defer wg.Done()
for op := range op_ch {
resp := perform(op, cli, clientid, unix_seconds(start_time), new_client_per_request, client_gen)
select {
case <- stopCh:
continue
case res_ch <- resp:
}
}
} (op_ch, &wg_perform)
op_ch <- op
}
}
log.Print("Finished sending ops")
log.Print("Phase 4: Collate")
// Tell threads that there are no more ops
close(op_ch)
select {
case <- waitGroupChannel(&wg_perform):
case <- time.After(30 * time.Second):
}
// Cut off trailing results
close(stopCh)
log.Print("Closing result pipe")
//Signal end of results and force any remaining clients to not write to it
close(messenger_complete)
log.Print("Waiting for results to be sent")
//Wait for results to be returned to generator
<-results_complete
send(map[string]interface{}{"kind":"finished"})
log.Print("Results sent, exiting")
}