-
Notifications
You must be signed in to change notification settings - Fork 10
/
common_traffic.go
290 lines (247 loc) · 8.42 KB
/
common_traffic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package framework
import (
"bufio"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"github.com/fatih/color"
. "github.com/onsi/ginkgo"
)
const (
// StatusCodeWord is an identifier used on curl commands to print and parse REST Status codes
StatusCodeWord = "StatusCode"
)
// HTTPRequestDef defines a remote HTTP request intent
type HTTPRequestDef struct {
// Source pod where to run the HTTP request from
SourceNs string
SourcePod string
SourceContainer string
// The entire destination URL processed by curl, including host name and
// optionally protocol, port, and path
Destination string
}
// TCPRequestDef defines a remote TCP request intent
type TCPRequestDef struct {
// Source pod where to run the HTTP request from
SourceNs string
SourcePod string
SourceContainer string
// The destination server host (FQDN or IP address) and port the request is directed to
DestinationHost string
DestinationPort int
// Message to send as a part of the request
Message string
}
// GRPCRequestDef defines a remote GRPC request intent
type GRPCRequestDef struct {
// Source pod where to run the HTTP request from
SourceNs string
SourcePod string
SourceContainer string
// The entire destination URL processed by curl, including host name and
// optionally protocol, port, and path
Destination string
// JSONRequest is the JSON request body
JSONRequest string
// Symbol is the fully qualified grpc service name, ex. hello.HelloService/SayHello
Symbol string
// UseTLS indicates if the request should be encrypted with TLS
UseTLS bool
}
// HTTPRequestResult represents results of an HTTPRequest call
type HTTPRequestResult struct {
StatusCode int
Headers map[string]string
Err error
}
// TCPRequestResult represents the result of a TCPRequest call
type TCPRequestResult struct {
Response string
Err error
}
// GRPCRequestResult represents the result of a GRPCRequest call
type GRPCRequestResult struct {
Response string
Err error
}
// HTTPRequest runs a synchronous call to run the HTTPRequestDef and return a HTTPRequestResult
func (td *FsmTestData) HTTPRequest(ht HTTPRequestDef) HTTPRequestResult {
// -s silent progress, -o output to devnull, '-D -' dump headers to "-" (stdout), -i Status code
// -I skip body download, '-w StatusCode:%{http_code}' prints Status code label-like for easy parsing
// -L follow redirects
commandStr := fmt.Sprintf("/usr/bin/curl -s -o /dev/null -D - -I -w %s:%%{http_code} -L %s", StatusCodeWord, ht.Destination)
command := strings.Fields(commandStr)
stdout, stderr, err := td.RunRemote(ht.SourceNs, ht.SourcePod, ht.SourceContainer, command)
if err != nil {
// Error codes from the execution come through err
// Curl 'Connection refused' err code = 7
return HTTPRequestResult{
0,
nil,
fmt.Errorf("Remote exec err: %w | stderr: %s", err, stderr),
}
}
if len(stderr) > 0 {
// no error from execution and proper exit code, we got some stderr though
td.T.Logf("[warn] Stderr: %v", stderr)
}
// Expect predictable output at this point from the curl we executed
curlMappedReturn := mapCurlOuput(stdout)
statusCode, err := strconv.Atoi(curlMappedReturn[StatusCodeWord])
if err != nil {
return HTTPRequestResult{
0,
nil,
fmt.Errorf("Could not read status code as integer: %w", err),
}
}
delete(curlMappedReturn, StatusCodeWord)
return HTTPRequestResult{
statusCode,
curlMappedReturn,
nil,
}
}
// TCPRequest runs a synchronous TCP request to run the TCPRequestDef and return a TCPRequestResult
func (td *FsmTestData) TCPRequest(req TCPRequestDef) TCPRequestResult {
var command []string
commandArgs := fmt.Sprintf("echo \"%s\" | nc %s %d", req.Message, req.DestinationHost, req.DestinationPort)
command = []string{"sh", "-c", commandArgs}
stdout, stderr, err := td.RunRemote(req.SourceNs, req.SourcePod, req.SourceContainer, command)
if err != nil {
// Error codes from the execution come through err
return TCPRequestResult{
stdout,
fmt.Errorf("Remote exec err: %w | stderr: %s | cmd: %s", err, stderr, command),
}
}
if len(stderr) > 0 {
// no error from execution and proper exit code, we got some stderr though
td.T.Logf("[warn] Stderr: %v", stderr)
}
return TCPRequestResult{
stdout,
nil,
}
}
// GRPCRequest runs a GRPC request to run the GRPCRequestDef and return a GRPCRequestResult
func (td *FsmTestData) GRPCRequest(req GRPCRequestDef) GRPCRequestResult {
var command []string
if req.UseTLS {
// '-insecure' is to indicate to grpcurl to not validate the server certificate. This is suitable
// for testing purpose and does not mean the channel is not encrypted using TLS.
command = []string{"/grpcurl", "-d", req.JSONRequest, "-insecure", req.Destination, req.Symbol}
} else {
// '-plaintext' is to indicate to the grpcurl to send plaintext requests; not encrypted with TLS
command = []string{"/grpcurl", "-d", req.JSONRequest, "-plaintext", req.Destination, req.Symbol}
}
stdout, stderr, err := td.RunRemote(req.SourceNs, req.SourcePod, req.SourceContainer, command)
if err != nil {
// Error codes from the execution come through err
return GRPCRequestResult{
stdout,
fmt.Errorf("Remote exec err: %w | stderr: %s | cmd: %s", err, stderr, command),
}
}
if len(stderr) > 0 {
// no error from execution and proper exit code, we got some stderr though
td.T.Logf("[warn] Stderr: %v", stderr)
}
return GRPCRequestResult{
stdout,
nil,
}
}
// MapCurlOuput maps stdout from our specific curl,
// it expects headers on stdout like "<name>: <value...>"
func mapCurlOuput(curlOut string) map[string]string {
var ret = make(map[string]string)
scanner := bufio.NewScanner(strings.NewReader(curlOut))
for scanner.Scan() {
line := scanner.Text()
// Expect at most 2 substrings, separating by only the first colon
splitResult := strings.SplitN(line, ":", 2)
if len(splitResult) != 2 {
// other non-header data
continue
}
ret[strings.TrimSpace(splitResult[0])] = strings.TrimSpace(splitResult[1])
}
return ret
}
// HTTPMultipleRequest takes multiple HTTP request defs to issue them concurrently
type HTTPMultipleRequest struct {
// Request
Sources []HTTPRequestDef
}
// HTTPMultipleResults represents results from a multiple HTTP request call
// results come back as a map["srcNs/srcPod"]["dstNs/dstPod"] -> HTTPResults
type HTTPMultipleResults map[string]map[string]HTTPRequestResult
// MultipleHTTPRequest will issue a list of requests concurrently and return results when all requests have returned
func (td *FsmTestData) MultipleHTTPRequest(requests *HTTPMultipleRequest) HTTPMultipleResults {
results := HTTPMultipleResults{}
mtx := sync.Mutex{}
wg := sync.WaitGroup{}
// Prepare results
for idx, r := range requests.Sources {
srcKey := fmt.Sprintf("%s/%s", r.SourceNs, r.SourcePod)
dstKey := r.Destination
if _, ok := results[srcKey]; !ok {
results[srcKey] = map[string]HTTPRequestResult{}
}
if _, ok := results[srcKey][dstKey]; !ok {
results[srcKey][dstKey] = HTTPRequestResult{}
} else {
td.T.Logf("No support for more than one request from src to dst. (%s to %s).Ignoring.",
srcKey, dstKey)
continue
}
wg.Add(1)
go func(ns string, podname string, htReq HTTPRequestDef) {
defer GinkgoRecover()
defer wg.Done()
r := td.HTTPRequest(htReq)
// Need lock to avoid concurrent map writes
mtx.Lock()
results[ns][podname] = r
mtx.Unlock()
}(srcKey, dstKey, (*requests).Sources[idx])
}
wg.Wait()
return results
}
// PrettyPrintHTTPResults prints pod results per namespace
func (td *FsmTestData) PrettyPrintHTTPResults(results *HTTPMultipleResults) {
// We sort the keys to always walk the maps deterministically.
var namespaceKeys []string
for nsKey := range *results {
namespaceKeys = append(namespaceKeys, nsKey)
}
sort.Strings(namespaceKeys)
for _, ns := range namespaceKeys {
var podKeys []string
for podKey := range (*results)[ns] {
podKeys = append(podKeys, podKey)
}
sort.Strings(podKeys)
strLine := fmt.Sprintf("%s - ", color.CyanString(ns))
for _, pod := range podKeys {
strLine += fmt.Sprintf("%s: %s -", pod, getColoredStatusCode((*results)[ns][pod]))
}
td.T.Log(strLine)
}
}
func getColoredStatusCode(res HTTPRequestResult) string {
var coloredStatus string
if res.Err != nil {
coloredStatus = color.RedString("ERR")
} else if res.StatusCode != 200 {
coloredStatus = color.YellowString("%d ", res.StatusCode)
} else {
coloredStatus = color.HiGreenString("%d ", res.StatusCode)
}
return coloredStatus
}