Skip to content

Commit

Permalink
fastip: mk ping timeout configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
EugeneOne1 committed Aug 25, 2021
1 parent 1b7a9bf commit bbd9640
Show file tree
Hide file tree
Showing 163 changed files with 3,477 additions and 1,710 deletions.
36 changes: 22 additions & 14 deletions fastip/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"time"
)

// TODO(e.burkov): Rewrite the cache using zero-values instead of storing
// useless boolean as an integer.

const (
fastestAddrCacheTTLSec = 10 * 60 // cache TTL for IP addresses
)

type cacheEntry struct {
status int //0:ok; 1:timed out
status int // 0:ok; 1:timed out
latencyMsec uint
}

Expand All @@ -22,8 +25,8 @@ type cacheEntry struct {
// latency_msec [2]byte
func packCacheEntry(ent *cacheEntry, ttl uint32) []byte {
expire := uint32(time.Now().Unix()) + ttl
var d []byte
d = make([]byte, 4+1+2)

d := make([]byte, 4+1+2)
binary.BigEndian.PutUint32(d, expire)
i := 4

Expand Down Expand Up @@ -60,7 +63,7 @@ func unpackCacheEntry(data []byte) *cacheEntry {
// returns null if nothing found or if the record for this ip is expired
func (f *FastestAddr) cacheFind(ip net.IP) *cacheEntry {
k := getCacheKey(ip)
val := f.cache.Get(k)
val := f.ipCache.Get(k)
if val == nil {
return nil
}
Expand All @@ -73,34 +76,39 @@ func (f *FastestAddr) cacheFind(ip net.IP) *cacheEntry {

// cacheAddFailure - store unsuccessful attempt in cache
func (f *FastestAddr) cacheAddFailure(addr net.IP) {
ent := cacheEntry{}
ent.status = 1
f.cacheLock.Lock()
ent := cacheEntry{
status: 1,
}

f.ipCacheLock.Lock()
defer f.ipCacheLock.Unlock()

if f.cacheFind(addr) == nil {
f.cacheAdd(&ent, addr, fastestAddrCacheTTLSec)
}
f.cacheLock.Unlock()
}

// store a successful ping result in cache
// replace previous result if our latency is lower
func (f *FastestAddr) cacheAddSuccessful(addr net.IP, latency uint) {
ent := cacheEntry{}
ent.status = 0
ent.latencyMsec = latency
f.cacheLock.Lock()
ent := cacheEntry{
latencyMsec: latency,
}

f.ipCacheLock.Lock()
defer f.ipCacheLock.Unlock()

entCached := f.cacheFind(addr)
if entCached == nil || entCached.status != 0 || entCached.latencyMsec > latency {
f.cacheAdd(&ent, addr, fastestAddrCacheTTLSec)
}
f.cacheLock.Unlock()
}

// cacheAdd -- adds a new entry to the cache
func (f *FastestAddr) cacheAdd(ent *cacheEntry, addr net.IP, ttl uint32) {
ip := getCacheKey(addr)
val := packCacheEntry(ent, ttl)
f.cache.Set(ip, val)
f.ipCache.Set(ip, val)
}

// getCacheKey - gets cache key (compresses ipv4 to 4 bytes)
Expand Down
2 changes: 1 addition & 1 deletion fastip/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestCache(t *testing.T) {
}
// f.cacheAdd(&ent, net.ParseIP("1.1.1.1"), fastestAddrCacheMinTTLSec)
val := packCacheEntry(&ent, 1) // ttl=1
f.cache.Set(net.ParseIP("1.1.1.1").To4(), val)
f.ipCache.Set(net.ParseIP("1.1.1.1").To4(), val)
ent = cacheEntry{
status: 0,
latencyMsec: 222,
Expand Down
112 changes: 68 additions & 44 deletions fastip/fastest_addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"strings"
"sync"
"time"

"github.com/AdguardTeam/golibs/log"

Expand All @@ -13,24 +14,33 @@ import (
"github.com/miekg/dns"
)

// FastestAddr - object data
// FastestAddr provides methods to determine the fastest network addresses.
type FastestAddr struct {
cache glcache.Cache // cache of the fastest IP addresses
cacheLock sync.Mutex // for atomic find-and-store cache operation
allowTCP bool // connect via TCP
tcpPorts []uint // TCP ports we're using to check connection speed
// ipCacheLock protects ipCache.
ipCacheLock sync.Mutex
// ipCache caches fastest IP addresses.
ipCache glcache.Cache

// tcpPorts are the ports to ping on.
tcpPorts []uint

// PingWaitTimeout is the timeout for waiting all the resolved addresses
// are pinged. Any ping results received after it are cached but not
// used at the moment. It should be configured right after the
// FastestAddr initialization since it isn't protected for concurrent
// usage.
PingWaitTimeout time.Duration
}

// NewFastestAddr initializes a new instance of the FastestAddr
func NewFastestAddr() *FastestAddr {
conf := glcache.Config{
MaxSize: 64 * 1024,
EnableLRU: true,
}
// NewFastestAddr initializes a new instance of the *FastestAddr.
func NewFastestAddr() (f *FastestAddr) {
return &FastestAddr{
cache: glcache.New(conf),
allowTCP: true,
tcpPorts: []uint{80, 443},
ipCache: glcache.New(glcache.Config{
MaxSize: 64 * 1024,
EnableLRU: true,
}),
tcpPorts: []uint{80, 443},
PingWaitTimeout: defaultPingWaitTimeout,
}
}

Expand All @@ -49,22 +59,26 @@ func NewFastestAddr() *FastestAddr {
// . Receive TCP connection status. The first connected address - the fastest IP address.
// . Choose the fastest address between this and the one previously found in cache
// . Return DNS packet containing the chosen IP address (remove all other IP addresses from the packet)
func (f *FastestAddr) ExchangeFastest(req *dns.Msg, upstreams []upstream.Upstream) (*dns.Msg, upstream.Upstream, error) {
replies, err := upstream.ExchangeAll(upstreams, req)
if err != nil || len(replies) == 0 {
func (f *FastestAddr) ExchangeFastest(req *dns.Msg, ups []upstream.Upstream) (
resp *dns.Msg,
u upstream.Upstream,
err error,
) {
replies, err := upstream.ExchangeAll(ups, req)
if err != nil {
return nil, nil, err
}

host := strings.ToLower(req.Question[0].Name)
ips := f.getIPAddresses(replies)
found, pingRes := f.pingAll(host, ips)
ips := f.extractIPs(replies)

if !found {
log.Debug("%s: no fastest IP found, using the first response", host)
return replies[0].Resp, replies[0].Upstream, nil
if pingRes := f.pingAll(host, ips); pingRes != nil {
return f.prepareReply(pingRes, replies)
}

return f.prepareReply(pingRes, replies)
log.Debug("%s: no fastest IP found, using the first response", host)

return replies[0].Resp, replies[0].Upstream, nil
}

// prepareReply - prepares the DNS response that will be sent back to the client
Expand All @@ -74,30 +88,29 @@ func (f *FastestAddr) ExchangeFastest(req *dns.Msg, upstreams []upstream.Upstrea
// 2. Find the one that contains the fastest IP
// 3. Remove all other IP addresses from that response
// 4. Return it
func (f *FastestAddr) prepareReply(pingRes *pingResult, replies []upstream.ExchangeAllResult) (*dns.Msg, upstream.Upstream, error) {
var m *dns.Msg
var u upstream.Upstream

func (f *FastestAddr) prepareReply(pingRes *pingResult, replies []upstream.ExchangeAllResult) (
m *dns.Msg,
u upstream.Upstream,
err error,
) {
for _, r := range replies {
for _, rr := range r.Resp.Answer {
ip := proxyutil.GetIPFromDNSRecord(rr)
if ip != nil && ip.Equal(pingRes.ip) {
// Found it!
m = r.Resp
u = r.Upstream
break
}
if hasAns(r.Resp, pingRes.ip) {
m = r.Resp
u = r.Upstream

break
}
}

if m == nil {
// Something definitely went wrong
// Something definitely went wrong.
log.Error("Found no replies with IP %s, most likely this is a bug", pingRes.ip)

return replies[0].Resp, replies[0].Upstream, nil
}

// Now modify that message and keep only those A/AAAA records
// that contain our fastest IP address
// Modify the message and keep only A and AAAA records containing the
// fastest IP address.
ans := []dns.RR{}
for _, rr := range m.Answer {
switch addr := rr.(type) {
Expand All @@ -116,15 +129,26 @@ func (f *FastestAddr) prepareReply(pingRes *pingResult, replies []upstream.Excha
}
}

// Set new answer
// Set new answer.
m.Answer = ans

return m, u, nil
}

// getIPAddresses -- extracts all IP addresses from the list of upstream.ExchangeAllResult
func (f *FastestAddr) getIPAddresses(results []upstream.ExchangeAllResult) []net.IP {
var ips []net.IP
// hasAns returns true if m contains ip in its answer section.
func hasAns(m *dns.Msg, ip net.IP) (ok bool) {
for _, rr := range m.Answer {
respIP := proxyutil.GetIPFromDNSRecord(rr)
if respIP != nil && respIP.Equal(ip) {
return true
}
}

return false
}

// extractIPs extracts all IP addresses from results.
func (f *FastestAddr) extractIPs(results []upstream.ExchangeAllResult) (ips []net.IP) {
for _, r := range results {
for _, rr := range r.Resp.Answer {
ip := proxyutil.GetIPFromDNSRecord(rr)
Expand All @@ -137,8 +161,8 @@ func (f *FastestAddr) getIPAddresses(results []upstream.ExchangeAllResult) []net
return ips
}

// containsIP - checks if IP address is in the list of IPs
func containsIP(ips []net.IP, ip net.IP) bool {
// containsIP returns true if ips contains the ip.
func containsIP(ips []net.IP, ip net.IP) (ok bool) {
if len(ips) == 0 {
return false
}
Expand Down
Loading

0 comments on commit bbd9640

Please sign in to comment.