-
Notifications
You must be signed in to change notification settings - Fork 0
/
ratetracking.go
70 lines (61 loc) · 2.28 KB
/
ratetracking.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package chained
import (
"context"
"net"
"sync/atomic"
"time"
"github.com/dustin/go-humanize"
borda "github.com/getlantern/borda/client"
"github.com/getlantern/flashlight/v7/ops"
"github.com/getlantern/measured"
)
const (
rateInterval = 1 * time.Second
)
var totalReceived = uint64(0)
func (p *proxy) withRateTracking(wrapped net.Conn, origin string, ctx context.Context) net.Conn {
if wrapped == nil {
return nil
}
return measured.Wrap(wrapped, rateInterval, func(conn measured.Conn) {
stats := conn.Stats()
rwError := conn.FirstError()
if rwError == nil {
if stats.RecvTotal > 0 {
p.consecReadSuccesses.Inc()
}
} else {
// Decrease for whatever error as subsequent reads will fail anyway
// in this case.
p.consecReadSuccesses.Dec()
}
// record simple traffic without origin
op := ops.Begin("traffic").ChainedProxy(p.Name(), p.Addr(), p.Protocol(), p.Network(), p.multiplexed)
op.SetMetric("client_bytes_sent", borda.Sum(stats.SentTotal)).
SetMetric("client_bytes_recv", borda.Sum(stats.RecvTotal))
op.FailIf(rwError)
p.onFinish(op)
op.End()
// record xfer data with origin
op = ops.Begin("xfer").OriginPort(origin, "")
op.SetMetric("client_bytes_sent", borda.Sum(stats.SentTotal)).
SetMetric("client_bps_sent_min", borda.Min(stats.SentMin)).
SetMetric("client_bps_sent_max", borda.Max(stats.SentMax)).
SetMetric("client_bps_sent_avg", borda.WeightedAvg(stats.SentAvg, float64(stats.SentTotal))).
SetMetric("client_bytes_recv", borda.Sum(stats.RecvTotal)).
SetMetric("client_bps_recv_min", borda.Min(stats.RecvMin)).
SetMetric("client_bps_recv_max", borda.Max(stats.RecvMax)).
SetMetric("client_bps_recv_avg", borda.WeightedAvg(stats.RecvAvg, float64(stats.RecvTotal)))
op.FailIf(rwError)
p.onFinish(op)
atomic.AddUint64(&p.dataSent, uint64(stats.SentTotal))
atomic.AddUint64(&p.dataRecv, uint64(stats.RecvTotal))
// The below is a little verbose, but it allows us to see the transfer rates
// right within a user's logs, which is useful when someone submits their logs
// together with a complaint of Lantern being slow.
log.Debugf("Finished xfer, received %v, total received %v",
humanize.Bytes(uint64(stats.RecvTotal)),
humanize.Bytes(atomic.AddUint64(&totalReceived, uint64(stats.RecvTotal))))
op.End()
})
}