forked from fortio/fortio
/
tcprunner.go
295 lines (279 loc) · 9.25 KB
/
tcprunner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
// Copyright 2020 Fortio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tcprunner
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"sort"
"syscall"
"time"
"fortio.org/fortio/fhttp"
"fortio.org/fortio/fnet"
"fortio.org/fortio/periodic"
"fortio.org/log"
)
type TCPResultMap map[string]int64
// RunnerResults is the aggregated result of an TCPRunner.
// Also is the internal type used per thread/goroutine.
type RunnerResults struct {
periodic.RunnerResults
TCPOptions
RetCodes TCPResultMap
SocketCount int
BytesSent int64
BytesReceived int64
client *TCPClient
aborter *periodic.Aborter
}
// Run tests tcp request fetching. Main call being run at the target QPS.
// To be set as the Function in RunnerOptions.
func (tcpstate *RunnerResults) Run(_ context.Context, t periodic.ThreadID) (bool, string) {
log.Debugf("Calling in %d", t)
_, err := tcpstate.client.Fetch()
if err != nil {
errStr := err.Error()
tcpstate.RetCodes[errStr]++
return false, errStr
}
tcpstate.RetCodes[TCPStatusOK]++
return true, TCPStatusOK
}
// TCPOptions are options to the TCPClient.
type TCPOptions struct {
Destination string
Payload []byte // what to send (and check)
UnixDomainSocket string // Path of unix domain socket to use instead of host:port from URL
ReqTimeout time.Duration
}
// RunnerOptions includes the base RunnerOptions plus tcp specific
// options.
type RunnerOptions struct {
periodic.RunnerOptions
TCPOptions // Need to call Init() to initialize
}
// TCPClient is the client used for tcp echo testing.
type TCPClient struct {
buffer []byte
req []byte
dest net.Addr
socket net.Conn
connID int // 0-9999
messageCount int64
bytesSent int64
bytesReceived int64
socketCount int
destination string
doGenerate bool
reqTimeout time.Duration
}
var (
// TCPURLPrefix is the URL prefix for triggering tcp load.
TCPURLPrefix = "tcp://"
// TCPStatusOK is the map key on success.
TCPStatusOK = "OK"
errShortRead = fmt.Errorf("short read")
errLongRead = fmt.Errorf("bug: long read")
errMismatch = fmt.Errorf("read not echoing writes")
)
// GeneratePayload generates a default 24 bytes unique payload for each runner thread and message sent
// when no other payload is set.
func GeneratePayload(t int, i int64) []byte {
// up to 9999 connections and 999 999 999 999 (999B) request
s := fmt.Sprintf("Fortio\n%04d\n%012d", t, i) // 6+2+4+12 = 24 bytes
return []byte(s)
}
// NewTCPClient creates and initialize and returns a client based on the TCPOptions.
func NewTCPClient(o *TCPOptions) (*TCPClient, error) {
c := TCPClient{}
d := o.Destination
c.destination = d
tAddr, err := fnet.ResolveDestination(context.Background(), d)
if tAddr == nil {
return nil, err
}
c.dest = tAddr
c.req = o.Payload
if len(c.req) == 0 { // len(nil) array is also valid and 0
c.doGenerate = true
c.req = GeneratePayload(0, 0)
}
c.buffer = make([]byte, len(c.req))
c.reqTimeout = o.ReqTimeout
if o.ReqTimeout == 0 {
log.Debugf("Request timeout not set, using default %v", fhttp.HTTPReqTimeOutDefaultValue)
c.reqTimeout = fhttp.HTTPReqTimeOutDefaultValue
}
if c.reqTimeout < 0 {
log.Warnf("Invalid timeout %v, setting to %v", c.reqTimeout, fhttp.HTTPReqTimeOutDefaultValue)
c.reqTimeout = fhttp.HTTPReqTimeOutDefaultValue
}
return &c, nil
}
func (c *TCPClient) connect() (net.Conn, error) {
c.socketCount++
socket, err := net.Dial(c.dest.Network(), c.dest.String())
if err != nil {
log.Errf("Unable to connect to %v : %v", c.dest, err)
return nil, err
}
fnet.SetSocketBuffers(socket, len(c.buffer), len(c.req))
return socket, nil
}
func (c *TCPClient) Fetch() ([]byte, error) {
// Connect or reuse existing socket:
conn := c.socket
c.messageCount++
reuse := (conn != nil)
if !reuse {
var err error
conn, err = c.connect()
if conn == nil {
return nil, err
}
} else {
log.Debugf("[%d] Reusing socket %+v", c.connID, conn)
}
c.socket = nil // because of error returns and single retry
conErr := conn.SetReadDeadline(time.Now().Add(c.reqTimeout))
// Send the request:
if c.doGenerate {
c.req = GeneratePayload(c.connID, c.messageCount) // TODO write directly in buffer to avoid generating garbage for GC to clean
}
expectedLen := len(c.req)
n, err := conn.Write(c.req)
c.bytesSent += int64(n)
if log.LogDebug() {
log.Debugf("[%d] wrote %d (%s): %v", c.connID, n, fnet.DebugSummary(c.req, 256), err)
}
if err != nil || conErr != nil {
if reuse {
// it's ok for the (idle) socket to die once, auto reconnect:
log.Infof("Closing dead socket %v (%v)", conn, err)
conn.Close()
return c.Fetch() // recurse once
}
log.Errf("[%d] Unable to write to %v: %v", c.connID, c.dest, err)
return nil, err
}
if n != len(c.req) {
log.Errf("[%d] Short write to %v: %d instead of %d", c.connID, c.dest, n, expectedLen)
return nil, io.ErrShortWrite
}
// assert that len(c.buffer) == len(c.req)
totalRead := 0
for {
n, err = conn.Read(c.buffer[totalRead:])
if log.LogDebug() {
log.Debugf("[%d] read %d (%s): %v", c.connID, n, fnet.DebugSummary(c.buffer[totalRead:totalRead+n], 256), err)
}
c.bytesReceived += int64(n)
totalRead += n
if totalRead == expectedLen { // break first, assuming no err, so we don't test that for EOF case
break
}
if err != nil {
log.Errf("[%d] Unable to read: %v", c.connID, err)
if errors.Is(err, io.EOF) || errors.Is(err, syscall.ECONNRESET) {
return c.buffer[:totalRead], errShortRead
}
return c.buffer[:totalRead], err
}
if totalRead > expectedLen {
log.Errf("[%d] BUG: read more than possible +%d to %d vs %d", c.connID, n, totalRead, expectedLen)
return c.buffer[:totalRead], errLongRead
}
}
if !bytes.Equal(c.buffer, c.req) {
log.Infof("Mismatch between sent %q and received %q", string(c.req), string(c.buffer))
return c.buffer, errMismatch
}
c.socket = conn // reuse on success
return c.buffer[:n], nil
}
// Close closes the last connection and returns the total number of sockets used for the run.
func (c *TCPClient) Close() int {
log.Debugf("Closing %p: %s socket count %d", c, c.destination, c.socketCount)
if c.socket != nil {
if err := c.socket.Close(); err != nil {
log.Warnf("Error closing tcp client's socket: %v", err)
}
c.socket = nil
}
return c.socketCount
}
// RunTCPTest runs an tcp test and returns the aggregated stats.
// Some refactoring to avoid copy-pasta between the now 3 runners would be good.
func RunTCPTest(o *RunnerOptions) (*RunnerResults, error) {
o.RunType = "TCP"
log.Infof("Starting tcp test for %s with %d threads at %.1f qps", o.Destination, o.NumThreads, o.QPS)
r := periodic.NewPeriodicRunner(&o.RunnerOptions)
defer r.Options().Abort()
numThreads := r.Options().NumThreads
o.TCPOptions.Destination = o.Destination
out := r.Options().Out // Important as the default value is set from nil to stdout inside NewPeriodicRunner
total := RunnerResults{
aborter: r.Options().Stop,
RetCodes: make(TCPResultMap),
}
total.Destination = o.Destination
tcpstate := make([]RunnerResults, numThreads)
var err error
for i := 0; i < numThreads; i++ {
r.Options().Runners[i] = &tcpstate[i]
// Create a client (and transport) and connect once for each 'thread'
tcpstate[i].client, err = NewTCPClient(&o.TCPOptions)
if tcpstate[i].client == nil {
return nil, fmt.Errorf("unable to create client %d for %s: %w", i, o.Destination, err)
}
tcpstate[i].client.connID = i
if o.Exactly <= 0 {
data, err := tcpstate[i].client.Fetch()
if i == 0 && log.LogVerbose() {
log.LogVf("first hit of %s: err %v, received %d: %q", o.Destination, err, len(data), data)
}
}
// Setup the stats for each 'thread'
tcpstate[i].aborter = total.aborter
tcpstate[i].RetCodes = make(TCPResultMap)
}
total.RunnerResults = r.Run()
// Numthreads may have reduced but it should be ok to accumulate 0s from
// unused ones. We also must cleanup all the created clients.
keys := []string{}
for i := 0; i < numThreads; i++ {
total.SocketCount += tcpstate[i].client.Close()
total.BytesReceived += tcpstate[i].client.bytesReceived
total.BytesSent += tcpstate[i].client.bytesSent
for k := range tcpstate[i].RetCodes {
if _, exists := total.RetCodes[k]; !exists {
keys = append(keys, k)
}
total.RetCodes[k] += tcpstate[i].RetCodes[k]
}
}
// Cleanup state:
r.Options().ReleaseRunners()
totalCount := float64(total.DurationHistogram.Count)
_, _ = fmt.Fprintf(out, "Sockets used: %d (for perfect no error run, would be %d)\n", total.SocketCount, r.Options().NumThreads)
_, _ = fmt.Fprintf(out, "Total Bytes sent: %d, received: %d\n", total.BytesSent, total.BytesReceived)
sort.Strings(keys)
for _, k := range keys {
_, _ = fmt.Fprintf(out, "tcp %s : %d (%.1f %%)\n", k, total.RetCodes[k], 100.*float64(total.RetCodes[k])/totalCount)
}
return &total, nil
}