forked from esiqveland/balancer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cachingresolver.go
118 lines (98 loc) · 2.62 KB
/
cachingresolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package balancer
import (
"context"
"net/netip"
"sync"
"sync/atomic"
"time"
)
// CachingResolver caches responses from the wrapped DNS resolver for the specified
// amount of time.
//
// CachingResolver does not implement a timeout for DNS queries: for that you can
// use a TimeoutResolver. Similarly, it does not implement concurrent request
// deduplication: for that you can use a SingleflightResolver.
// See ExampleAdvanced for the recommended way of composing these additional
// resolvers.
type CachingResolver struct {
Resolver Resolver // Wrapped DNS resolver.
TTL time.Duration // How long to cache positive results for. 0 disables caching for positive results.
NegTTL time.Duration // How long to cache negative results for. 0 disables caching for negative results.
mu sync.RWMutex
m map[key]result
count uint64
}
type key struct {
af string
host string
}
type result struct {
ips []netip.Addr
err error
exp time.Time
}
var _ Resolver = &CachingResolver{}
func (c *CachingResolver) LookupNetIP(ctx context.Context, af, host string) ([]netip.Addr, error) {
if c.TTL > 0 || c.NegTTL > 0 {
c.mu.RLock()
r, ok := c.m[key{af, host}]
c.mu.RUnlock()
if ok && r.exp.After(time.Now()) {
c.sampledCleanupAsync(asyncSamples)
return r.ips, r.err
}
}
exp := time.Now()
ips, err := c.Resolver.LookupNetIP(ctx, af, host)
if (err != nil && ctx.Err() != nil) || (err != nil && c.NegTTL == 0) || (err == nil && c.TTL == 0) {
// If the context was cancelled we don't cache the result.
// Similarly if the TTL is 0.
c.sampledCleanupAsync(asyncSamples)
return ips, err
}
if err == nil {
exp = exp.Add(c.TTL)
} else {
exp = exp.Add(c.NegTTL)
}
c.mu.Lock()
if c.m == nil {
c.m = map[key]result{}
}
if r, ok := c.m[key{af, host}]; !ok || r.exp.Before(exp) {
c.m[key{af, host}] = result{ips, err, exp}
}
// Whenever we lock the map to add or update an entry, we also check
// a small number of random entries to see if they are expired. If so
// we remove them from the map. This is meant to prevent the map from
// growing unbounded.
c.sampledCleanupLocked(lockedSamples)
c.mu.Unlock()
return ips, err
}
const (
asyncInterval = 1024
asyncSamples = 10
lockedSamples = 3
)
func (c *CachingResolver) sampledCleanupAsync(samples int) {
if atomic.AddUint64(&c.count, 1)%asyncInterval == 0 {
go func() {
c.mu.Lock()
defer c.mu.Unlock()
c.sampledCleanupLocked(samples)
}()
}
}
func (c *CachingResolver) sampledCleanupLocked(samples int) {
now := time.Now()
for k, r := range c.m {
if r.exp.Before(now) {
delete(c.m, k)
}
samples--
if samples == 0 {
break
}
}
}