-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
testserver.go
203 lines (181 loc) · 5.91 KB
/
testserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2020 Datadog, Inc.
package writer
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// uid is an atomically incremented ID, used by the expectResponses function to
// create payload IDs for the test server.
var uid uint64
// expectResponses creates a new payload for the test server. The test server will
// respond with the given status codes, in the given order, for each subsequent
// request, in rotation.
func expectResponses(codes ...int) *payload {
if len(codes) == 0 {
codes = []int{http.StatusOK}
}
p := newPayload(nil)
p.body.WriteString(strconv.FormatUint(atomic.AddUint64(&uid, 1), 10))
p.body.WriteString("|")
for i, code := range codes {
if i > 0 {
p.body.WriteString(",")
}
p.body.WriteString(strconv.Itoa(code))
}
return p
}
// newTestServerWithLatency returns a test server that takes duration d
// to respond to each request.
func newTestServerWithLatency(d time.Duration) *testServer {
ts := newTestServer()
ts.latency = d
return ts
}
// newTestServer returns a new, started HTTP test server. Its URL is available
// as a field. To control its responses, send it payloads created by expectResponses.
// By default, the testServer always returns http.StatusOK.
func newTestServer() *testServer {
srv := &testServer{
seen: make(map[string]*requestStatus),
}
srv.server = httptest.NewServer(srv)
srv.URL = srv.server.URL
return srv
}
// testServer is an http.Handler and http.Server which records the number of total,
// failed, retriable and accepted requests. It also allows manipulating it's HTTTP
// status code response by means of the request's body (see expectResponses).
type testServer struct {
t *testing.T
URL string
server *httptest.Server
latency time.Duration
mu sync.Mutex // guards below
seen map[string]*requestStatus
payloads []*payload
// stats
total, accepted uint64
retried, failed uint64
peak, active int64
}
// requestStatus keeps track of how many times a custom payload was seen and what
// the next HTTP status code response should be.
type requestStatus struct {
count int
codes []int
}
// nextResponse returns the next HTTP response code and advances the count.
func (rs *requestStatus) nextResponse() int {
statusCode := rs.codes[rs.count%len(rs.codes)]
rs.count++
return statusCode
}
// Peak returns the maximum number of simultaneous connections that were active
// while the server was running.
func (ts *testServer) Peak() int { return int(atomic.LoadInt64(&ts.peak)) }
// Failed returns the number of connections to which the server responded with an
// HTTP status code that is non-2xx and non-5xx.
func (ts *testServer) Failed() int { return int(atomic.LoadUint64(&ts.failed)) }
// Failed returns the number of connections to which the server responded with a
// 5xx HTTP status code.
func (ts *testServer) Retried() int { return int(atomic.LoadUint64(&ts.retried)) }
// Total returns the total number of connections which reached the server.
func (ts *testServer) Total() int { return int(atomic.LoadUint64(&ts.total)) }
// Failed returns the number of connections to which the server responded with a
// 2xx HTTP status code.
func (ts *testServer) Accepted() int { return int(atomic.LoadUint64(&ts.accepted)) }
// Payloads returns the payloads that were accepted by the server, as received.
func (ts *testServer) Payloads() []*payload {
ts.mu.Lock()
defer ts.mu.Unlock()
return ts.payloads
}
// ServeHTTP responds based on the request body.
func (ts *testServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
atomic.AddUint64(&ts.total, 1)
if v := atomic.AddInt64(&ts.active, 1); v > atomic.LoadInt64(&ts.peak) {
atomic.SwapInt64(&ts.peak, v)
}
defer atomic.AddInt64(&ts.active, -1)
if ts.latency > 0 {
time.Sleep(ts.latency)
}
slurp, err := ioutil.ReadAll(req.Body)
if err != nil {
panic(fmt.Sprintf("error reading request body: %v", err))
}
defer req.Body.Close()
statusCode := ts.getNextCode(slurp)
w.WriteHeader(statusCode)
switch statusCode / 100 {
case 5: // 5xx
atomic.AddUint64(&ts.retried, 1)
case 2: // 2xx
atomic.AddUint64(&ts.accepted, 1)
// for 2xx, we store the payload contents too
headers := make(map[string]string, len(req.Header))
for k, vs := range req.Header {
for _, v := range vs {
headers[k] = v
}
}
ts.mu.Lock()
defer ts.mu.Unlock()
ts.payloads = append(ts.payloads, &payload{
body: bytes.NewBuffer(slurp),
headers: headers,
})
default:
atomic.AddUint64(&ts.failed, 1)
}
}
// getNextCode returns the next HTTP status code that should be responded with
// to the given request body. If the request body does not originate from a
// payload created with expectResponse, it returns http.StatusOK.
func (ts *testServer) getNextCode(reqBody []byte) int {
parts := strings.Split(string(reqBody), "|")
if len(parts) != 2 {
// not a special body
return http.StatusOK
}
id := parts[0]
ts.mu.Lock()
defer ts.mu.Unlock()
p, ok := ts.seen[id]
if !ok {
parts := strings.Split(parts[1], ",")
codes := make([]int, len(parts))
for i, part := range parts {
code, err := strconv.Atoi(part)
if err != nil {
// this is likely a real proto request or something else; never the less, let's
// ensure the user knows, just in case it wasn't meant to be.
log.Println("testServer: warning: possibly malformed request body")
return http.StatusOK
}
if http.StatusText(code) == "" {
panic(fmt.Sprintf("testServer: invalid status code: %d", code))
}
codes[i] = code
}
ts.seen[id] = &requestStatus{codes: codes}
p = ts.seen[id]
}
return p.nextResponse()
}
// Close closes the underlying http.Server.
func (ts *testServer) Close() { ts.server.Close() }