forked from coredns/coredns
-
Notifications
You must be signed in to change notification settings - Fork 0
/
persistent.go
140 lines (114 loc) · 3.45 KB
/
persistent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package forward
import (
"crypto/tls"
"net"
"time"
"github.com/miekg/dns"
)
// a persistConn hold the dns.Conn and the last used time.
type persistConn struct {
c *dns.Conn
used time.Time
}
// transport hold the persistent cache.
type transport struct {
conns map[string][]*persistConn // Buckets for udp, tcp and tcp-tls.
expire time.Duration // After this duration a connection is expired.
addr string
tlsConfig *tls.Config
dial chan string
yield chan *dns.Conn
ret chan *dns.Conn
stop chan bool
}
func newTransport(addr string, tlsConfig *tls.Config) *transport {
t := &transport{
conns: make(map[string][]*persistConn),
expire: defaultExpire,
addr: addr,
dial: make(chan string),
yield: make(chan *dns.Conn),
ret: make(chan *dns.Conn),
stop: make(chan bool),
}
go func() { t.connManager() }()
return t
}
// len returns the number of connection, used for metrics. Can only be safely
// used inside connManager() because of data races.
func (t *transport) len() int {
l := 0
for _, conns := range t.conns {
l += len(conns)
}
return l
}
// connManagers manages the persistent connection cache for UDP and TCP.
func (t *transport) connManager() {
Wait:
for {
select {
case proto := <-t.dial:
// Yes O(n), shouldn't put millions in here. We walk all connection until we find the first
// one that is usuable.
i := 0
for i = 0; i < len(t.conns[proto]); i++ {
pc := t.conns[proto][i]
if time.Since(pc.used) < t.expire {
// Found one, remove from pool and return this conn.
t.conns[proto] = t.conns[proto][i+1:]
t.ret <- pc.c
continue Wait
}
// This conn has expired. Close it.
pc.c.Close()
}
// Not conns were found. Connect to the upstream to create one.
t.conns[proto] = t.conns[proto][i:]
SocketGauge.WithLabelValues(t.addr).Set(float64(t.len()))
t.ret <- nil
case conn := <-t.yield:
SocketGauge.WithLabelValues(t.addr).Set(float64(t.len() + 1))
// no proto here, infer from config and conn
if _, ok := conn.Conn.(*net.UDPConn); ok {
t.conns["udp"] = append(t.conns["udp"], &persistConn{conn, time.Now()})
continue Wait
}
if t.tlsConfig == nil {
t.conns["tcp"] = append(t.conns["tcp"], &persistConn{conn, time.Now()})
continue Wait
}
t.conns["tcp-tls"] = append(t.conns["tcp-tls"], &persistConn{conn, time.Now()})
case <-t.stop:
close(t.ret)
return
}
}
}
// Dial dials the address configured in transport, potentially reusing a connection or creating a new one.
func (t *transport) Dial(proto string) (*dns.Conn, bool, error) {
// If tls has been configured; use it.
if t.tlsConfig != nil {
proto = "tcp-tls"
}
t.dial <- proto
c := <-t.ret
if c != nil {
return c, true, nil
}
if proto == "tcp-tls" {
conn, err := dns.DialTimeoutWithTLS("tcp", t.addr, t.tlsConfig, dialTimeout)
return conn, false, err
}
conn, err := dns.DialTimeout(proto, t.addr, dialTimeout)
return conn, false, err
}
// Yield return the connection to transport for reuse.
func (t *transport) Yield(c *dns.Conn) { t.yield <- c }
// Stop stops the transport's connection manager.
func (t *transport) Stop() { close(t.stop) }
// SetExpire sets the connection expire time in transport.
func (t *transport) SetExpire(expire time.Duration) { t.expire = expire }
// SetTLSConfig sets the TLS config in transport.
func (t *transport) SetTLSConfig(cfg *tls.Config) { t.tlsConfig = cfg }
const defaultExpire = 10 * time.Second