/
flow.go
345 lines (317 loc) · 9.76 KB
/
flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
// This file focuses on a structure called ProxiedFlow. It's useful for us to
// run multiple roundtrippers in parallel and have a "preference" for one or
// more of them.
//
// To breakdown this concept further, let's go with an example: Assume we want
// to run a request through our Lantern proxies (called the "chained
// roundtripper") **and** through domain fronting (called "fronted"
// roundtripper) where the fastest response is taken.
//
// Let's also assume we want to have a preference for "chained roundtripper",
// meaning that if running a request through the "chained roundtripper" was the
// fastest roundtripper we've found (as opposed to the "fronted roundtripper"
// in this example), the **next** request you run will automatically go through
// "chained", and we wouldn't bother "fronted" roundtripper, unless it fails.
//
// The code for this example will look like this:
//
// chainedRoundTripper, err := proxied.ChainedNonPersistent("")
// require.NoError(t, err)
//
// req, err := http.NewRequest("GET", "http://example.com", nil)
// require.NoError(t, err)
// flow := NewProxiedFlow(
// &ProxiedFlowInput{
// AddDebugHeaders: true,
// },
// )
//
// flow.
// Add(proxied.FlowComponentID_Chained, chained, true).
// Add(proxied.FlowComponentID_Fronted, proxied.Fronted(masqueradeTimeout), false)
// resp, err := flow.RoundTrip(req)
// require.Error(t, err)
package proxied
import (
"bytes"
"fmt"
"io"
"net/http"
"sync"
)
const roundTripperHeaderKey = "Roundtripper"
type FlowComponentID string
// Enum of most used roundtrippers
var (
FlowComponentID_P2P FlowComponentID = "p2p"
FlowComponentID_Fronted FlowComponentID = "fronted"
FlowComponentID_Chained FlowComponentID = "chained"
)
func (id FlowComponentID) String() string {
return string(id)
}
// ProxiedFlowComponent is basically a wrapper around an http.RoundTripper that
// includes an ID and some additional flags
type ProxiedFlowComponent struct {
http.RoundTripper
id FlowComponentID
addDebugHeaders bool
shouldPrefer bool
}
// ProxiedFlowResponse is a wrapper around an http.Response and an error, both
// coming from ProxiedFlowComponent.RoundTrip()
type ProxiedFlowResponse struct {
id FlowComponentID
resp *http.Response
err error
}
// OnStartRoundTrip is called by the flow when it starts a new roundtrip.
type OnStartRoundTrip func(FlowComponentID, *http.Request)
// OnCompleteRoundTrip is called by the flow when it completes a roundtrip.
type OnCompleteRoundTrip func(FlowComponentID)
// ProxiedFlowInput is the input to NewProxiedFlow()
type ProxiedFlowInput struct {
// Can be set to true to add the value of
// "roundTripperHeaderKey" to the response headers (not request). It's purely
// used for assertions during unit tests.
AddDebugHeaders bool
// Runs when a flow component is about to start roundtripping
OnStartRoundTripFunc OnStartRoundTrip
// Runs when a flow component is is done roundtripping
OnCompleteRoundTripFunc OnCompleteRoundTrip
}
type ProxiedFlow struct {
// List of components in the flow
components []*ProxiedFlowComponent
input *ProxiedFlowInput
// Most preferred component. Can be nil, which means either that no
// component wants to be preferred or that this flow was never run
// successfully before
preferredComponent *ProxiedFlowComponent
}
// NewProxiedFlow returns a new ProxiedFlow
func NewProxiedFlow(input *ProxiedFlowInput) *ProxiedFlow {
return &ProxiedFlow{input: input}
}
// Add adds new roundtrippers to the flow.
// The highest priority components should be added first (i.e., 0 is the
// highest priority)
func (f *ProxiedFlow) Add(
id FlowComponentID,
rt http.RoundTripper,
shouldPrefer bool,
) *ProxiedFlow {
f.components = append(f.components, &ProxiedFlowComponent{
RoundTripper: rt,
id: id,
shouldPrefer: shouldPrefer,
addDebugHeaders: f.input.AddDebugHeaders,
})
// Returning f so function calls can be chained nicely in a builder pattern
return f
}
// SetPreferredComponent sets the component with "id" as the preferred
// component.
// This function doesn't fail if the component doesn't exist.
//
// Return *ProxiedFlow to chain function calls in a builder pattern.
func (f *ProxiedFlow) SetPreferredComponent(id FlowComponentID) *ProxiedFlow {
for _, c := range f.components {
if c.id == id {
f.preferredComponent = c
break
}
}
return f
}
// RoundTrip makes ProxiedFlow implement the http.RoundTripper interface.
// This function works in two ways:
// - the "runner" code occurs in "f.runAllComponents()" and it's responsible
// for running all the roundtrippers in the flow (or just the preferred one, if
// one exists) and send the responses through "recvFlowRespCh"
// - the "reciever" code occurs in the "looper" block below and it's
// responsible for handling responses and errors
//
// This function respects the request's original context
func (f *ProxiedFlow) RoundTrip(
originalReq *http.Request,
) (*http.Response, error) {
recvFlowRespCh := make(chan *ProxiedFlowResponse, len(f.components))
go f.runAllComponents(originalReq, recvFlowRespCh)
collectedErrors := []error{}
looper:
select {
case flowResp := <-recvFlowRespCh:
// fmt.Printf("flowResp = %+v\n", flowResp)
if flowResp.err != nil {
var url string
if originalReq.URL != nil {
url = originalReq.URL.String()
} else {
url = "nil"
}
log.Errorf(
"Error from FlowComponent %s during request: %v: %v",
flowResp.id, url, flowResp.err)
collectedErrors = append(collectedErrors, flowResp.err)
}
if flowResp.resp != nil {
// fmt.Printf("flowResp.resp = %+v\n", flowResp.resp)
// Set this component as a preferredComponent, only if the
// component wants to (i.e., shouldPrefer is true)
for _, c := range f.components {
if c.id == flowResp.id && c.shouldPrefer {
f.preferredComponent = c
}
}
return flowResp.resp, nil
}
case <-originalReq.Context().Done():
// If we're done, we need to exit now.
// Try to return the highest priority response we've seen.
// Else, try to return the latest response we've seen.
// Else, return an error that all roundtrippers failed.
collectedErrors = append(collectedErrors, originalReq.Context().Err())
return nil, fmt.Errorf(
"flow.go:RoundTrip: All roundtrippers failed with errs: %+v",
collectedErrors,
)
}
goto looper
}
// Run runs component "comp" by basically cloning the request and
// then roundtripping
func (comp *ProxiedFlowComponent) Run(
originalReq *http.Request,
originalReqMu *sync.Mutex,
onStartRoundTripFunc OnStartRoundTrip,
onCompleteRoundTripFunc OnCompleteRoundTrip,
) *ProxiedFlowResponse {
// Copy original request
originalReqMu.Lock()
_, copiedReq, err := copyRequest(originalReq)
originalReqMu.Unlock()
if err != nil {
return &ProxiedFlowResponse{
resp: nil,
err: fmt.Errorf(
"flow.go:runAllComponents while copying request [%+v]: %w",
originalReq, err,
), id: comp.id}
}
// Setup the onStart and onComplete callbacks
if onStartRoundTripFunc != nil {
onStartRoundTripFunc(comp.id, copiedReq)
}
defer func() {
if onCompleteRoundTripFunc != nil {
onCompleteRoundTripFunc(comp.id)
}
}()
// Get the URL (useful for logs and debugging)
var url string
if copiedReq.URL != nil {
url = copiedReq.URL.String()
} else {
url = "nil"
}
// Run the roundtripper
resp, err := comp.RoundTripper.RoundTrip(copiedReq)
// Handle errors and whatnots
if err != nil {
return &ProxiedFlowResponse{
resp: nil,
err: fmt.Errorf(
"with roundtripper [%v] during FlowRoundTrip towards [%v]: %v",
comp.id,
url,
err,
),
id: comp.id}
}
if resp == nil {
return &ProxiedFlowResponse{
resp: nil,
err: fmt.Errorf(
"with roundtripper [%v] during FlowRoundTrip towards [%v]: no response",
comp.id,
url,
),
id: comp.id}
}
if resp.StatusCode >= 400 {
body := "nil"
if copiedReq.Body != nil {
b, err := io.ReadAll(resp.Body)
if err == nil {
body = string(b)
resp.Body.Close()
}
}
return &ProxiedFlowResponse{
resp: nil,
err: fmt.Errorf(
"with roundtripper [%v] during FlowRoundTrip towards [%v]: status code [%v]: body: %v",
comp.id,
url,
resp.StatusCode,
body,
),
id: comp.id}
}
// Add a header mentioning the used roundtripper.
// Only useful for tests.
if comp.addDebugHeaders {
resp.Header.Set(roundTripperHeaderKey, comp.id.String())
}
return &ProxiedFlowResponse{
resp: resp,
err: nil,
id: comp.id}
}
func copyRequest(req *http.Request) (*http.Request, *http.Request, error) {
req2 := req.Clone(req.Context())
if req.Body != nil {
b, err := io.ReadAll(req.Body)
if err != nil {
return nil, nil, fmt.Errorf("while reading request body %v", err)
}
req.Body = io.NopCloser(bytes.NewReader(b))
req2.Body = io.NopCloser(bytes.NewReader(b))
}
return req, req2, nil
}
// runAllComponents runs the components in parallel, while favoring the
// "preferred" component
func (f *ProxiedFlow) runAllComponents(
originalReq *http.Request,
recvFlowRespCh chan<- *ProxiedFlowResponse,
) {
// If there's a preferred component, run it first
var originalReqMu sync.Mutex
if f.preferredComponent != nil {
flowResp := f.preferredComponent.Run(
originalReq, &originalReqMu,
f.input.OnStartRoundTripFunc,
f.input.OnCompleteRoundTripFunc,
)
recvFlowRespCh <- flowResp
if flowResp.err != nil {
// If it failed, remove it as our preferred component
f.preferredComponent = nil
} else if flowResp.resp != nil {
// If it succeeded, just go with it
return
}
}
// Else, run the rest of the components asynchronously
for _, _comp := range f.components {
comp := _comp
go func() {
recvFlowRespCh <- comp.Run(
originalReq, &originalReqMu,
f.input.OnStartRoundTripFunc,
f.input.OnCompleteRoundTripFunc)
}()
}
}