-
Notifications
You must be signed in to change notification settings - Fork 0
/
probe.go
175 lines (156 loc) · 5.05 KB
/
probe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package chained
import (
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync/atomic"
"time"
"github.com/getlantern/errors"
"github.com/getlantern/flashlight/v7/common"
"github.com/getlantern/flashlight/v7/ops"
"github.com/getlantern/mtime"
)
var (
// Probes determines how many times to probe on each call to Probe() unless
// it's for performance. Should be at least 3 to generate enough packets,
// as the censor may allow the first a few packets but block the rest.
Probes = 3
// PerformanceProbes determines how many times to probe for performance on
// each call to Probe()
PerformanceProbes = 5
// BasePerformanceProbeKB is the minimum number of KB to request from ping
// endpoint when probing for performance
BasePerformanceProbeKB = 50
)
func (p *proxy) ProbeStats() (successes uint64, successKBs uint64, failures uint64, failedKBs uint64) {
return atomic.LoadUint64(&p.probeSuccesses), atomic.LoadUint64(&p.probeSuccessKBs),
atomic.LoadUint64(&p.probeFailures), atomic.LoadUint64(&p.probeFailedKBs)
}
func (p *proxy) Probe(forPerformance bool) bool {
// We don't really need the context or the op here. The only purpose is to
// create a new beam and implicitly pass it down to the child op.
op := ops.Begin("probe_root")
op.Set("for_performance", forPerformance)
defer op.End()
forPerformanceString := ""
if forPerformance {
forPerformanceString = " for performance"
}
log.Debugf("Actively probing %v%v", p.Label(), forPerformanceString)
elapsed := mtime.Stopwatch()
logResult := func(err error) bool {
result := "succeeded"
if err != nil {
result = "failed: " + err.Error()
}
log.Debugf("Actively probing %v%v took %v and %v", p.Label(), forPerformanceString, elapsed(), result)
return err == nil
}
err := p.doProbe(forPerformance)
if err != nil {
p.MarkFailure()
} else {
p.markSuccess()
}
return logResult(err)
}
func (p *proxy) doProbe(forPerformance bool) error {
var dialEnd time.Time
dial := func(ctx context.Context, network, addr string) (net.Conn, error) {
pc, _, err := p.DialContext(ctx, network, addr)
dialEnd = time.Now()
return pc, err
}
rt := &http.Transport{
DisableKeepAlives: true,
DialContext: dial,
ResponseHeaderTimeout: 20 * time.Second,
}
probes := Probes
if forPerformance {
probes = PerformanceProbes
}
for i := 0; i < probes; i++ {
kb := 1
resetBBR := false
if forPerformance {
// Probing for performance, do several increasingly large pings.
// We vary the size of the ping request to help the BBR curve-fitting
// logic on the server.
kb = BasePerformanceProbeKB + i*25
// Ask the proxy to reset BBR stats to have an up-to-date estimation
// after the probe.
resetBBR = i == 0
}
tofb, err := p.httpPing(rt, kb, resetBBR)
if err != nil {
return err
}
if i == 0 {
p.updateEstRTT(tofb.Sub(dialEnd))
}
}
return nil
}
func (p *proxy) httpPing(rt http.RoundTripper, kb int, resetBBR bool) (time.Time, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
op := ops.Begin("probe").ChainedProxy(p.Name(), p.Addr(), p.Protocol(), p.Network(), p.multiplexed)
defer op.End()
// Also include a probe_details op that's sampled but includes details like
// the actual error.
detailOp := ops.Begin("probe_details")
defer detailOp.End()
start := time.Now()
tofb, err := p.doHttpPing(ctx, rt, kb, resetBBR)
rtt := time.Since(start).Nanoseconds()
if err != nil {
atomic.AddUint64(&p.probeFailures, 1)
atomic.AddUint64(&p.probeFailedKBs, uint64(kb))
} else {
atomic.AddUint64(&p.probeSuccesses, 1)
atomic.AddUint64(&p.probeSuccessKBs, uint64(kb))
}
detailOp.FailIf(err)
op.FailIf(err)
op.Set("success", err == nil)
op.Set("probe_kb", kb)
op.SetMetricAvg("probe_rtt", float64(rtt)/float64(time.Millisecond))
return tofb, err
}
func (p *proxy) doHttpPing(ctx context.Context, rt http.RoundTripper, kb int, resetBBR bool) (tofb time.Time, err error) {
deadline, _ := ctx.Deadline()
req, e := http.NewRequest("GET", "http://ping-chained-server", nil)
if e != nil {
return deadline, fmt.Errorf("Could not create HTTP request: %v", e)
}
req.Header.Set(common.PingHeader, fmt.Sprint(kb))
p.onRequest(req)
if resetBBR {
req.Header.Set("X-BBR", "clear")
}
reqTime := time.Now()
resp, rtErr := rt.RoundTrip(req.WithContext(ctx))
if rtErr != nil {
return deadline, errors.New("Error testing dialer %s: %s", p.Addr(), rtErr)
}
// Time of first byte. Note that it is updated before reading the body in
// hope to measure more accurate RTT on the wire.
tofb = time.Now()
if resp.Body != nil {
// Read the body to include this in our timing.
defer resp.Body.Close()
if _, copyErr := io.Copy(ioutil.Discard, resp.Body); copyErr != nil {
return deadline, errors.New("Unable to read response body: %v", copyErr)
}
}
log.Tracef("PING through chained server at %s, status code %d", p.Addr(), resp.StatusCode)
if sameStatusCodeClass(http.StatusOK, resp.StatusCode) {
p.collectBBRInfo(reqTime, resp)
return tofb, nil
}
return deadline, errors.New("Unexpected HTTP status %v", resp.StatusCode)
}