-
Notifications
You must be signed in to change notification settings - Fork 0
/
relay.go
126 lines (108 loc) · 3.06 KB
/
relay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package rdv
import (
"context"
"errors"
"io"
"math"
"sync"
"time"
)
// A relayer handles a pair of rdv conns. The zero-value can be used.
type Relayer struct {
DialTap, AcceptTap io.Writer
// At least this much inactivity is allowed on both peers before terminating the connection.
// Recommended at least 30s to account for network conditions and
// application level heartbeats. Zero means no timeout.
// As relays may serve a lot of traffic, activity is checked at an interval.
IdleTimeout time.Duration
}
func (r *Relayer) Reject(dc, ac *Conn, statusCode int, reason string) error {
return errors.Join(
writeResponseErr(dc, statusCode, reason),
writeResponseErr(ac, statusCode, reason))
}
// Runs the relay service. Return actual data transferred and the first error that occurred.
// In case one end closed the connection in a normal manner, the error is io.EOF.
func (r *Relayer) Run(ctx context.Context, dc, ac *Conn) (dn int64, an int64, err error) {
ctx, cancel := context.WithCancelCause(ctx)
// Causes all IO to return timeout errors immediately
timeoutFn := sync.OnceFunc(func() {
dc.SetDeadline(past())
ac.SetDeadline(past())
})
stop := context.AfterFunc(ctx, timeoutFn)
defer stop()
it := newIdleTimer(r.idleTimeout(), timeoutFn)
defer it.Stop()
dTap, aTap := r.taps()
// Start only one extra goroutine to save resources
done := make(chan struct{})
go func() {
dn = copyRelay(ac, dc, dTap, it, cancel)
close(done)
}()
an = copyRelay(dc, ac, aTap, it, cancel)
<-done
err = context.Cause(ctx)
return
}
func copyRelay(to, from *Conn, tap io.Writer, it *idleTimer, cancel context.CancelCauseFunc) (n int64) {
defer to.Close()
err := initiateRelay(to, from)
if err != nil {
return
}
n, err = copyRelayInner(to, from, tap, it)
cancel(err)
return
}
// Sends response header containing addresses from the other conn,
// reads the rdv header line and relays it. Returns EOF if the rdv header line
// wasn't received, which typically indicates that p2p was established out-of-bounds.
func initiateRelay(to, from *Conn) error {
to.meta.setPeerAddrsFrom(from.meta)
resp := to.meta.toResp()
err := resp.Write(to)
if err != nil {
return err
}
// Read expected rdv header line
selfHeader, _ := from.headers()
err = expectStr(from, selfHeader)
if err != nil {
return err
}
// Write rdv header line to the other peer
_, err = io.WriteString(to, selfHeader)
return err
}
// Copies data with the configured tap
func copyRelayInner(to io.WriteCloser, from io.Reader, tap io.Writer, it *idleTimer) (n int64, err error) {
w := io.MultiWriter(it, tap, to)
n, err = io.Copy(w, from)
if err == nil {
err = io.EOF
}
return
}
func (r *Relayer) idleTimeout() time.Duration {
if r.IdleTimeout > 0 {
return r.IdleTimeout
}
return math.MaxInt64
}
// Utility to get non-nil taps
func (r *Relayer) taps() (dTap, aTap io.Writer) {
dTap, aTap = r.DialTap, r.AcceptTap
if dTap == nil {
dTap = noopTap{}
}
if aTap == nil {
aTap = noopTap{}
}
return
}
type noopTap struct{}
func (noopTap) Write(p []byte) (n int, err error) {
return len(p), nil
}