Skip to content

Commit

Permalink
Pull request 1925: 6006-client-processor
Browse files Browse the repository at this point in the history
Updates #6006.

Squashed commit of the following:

commit c72d637
Merge: 02d64b1 0cd441f
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Tue Jul 18 13:56:26 2023 +0300

    Merge branch 'master' into 6006-client-processor

commit 02d64b1
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Mon Jul 17 19:42:07 2023 +0300

    client: imp code, tests

commit b161346
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Mon Jul 17 18:42:19 2023 +0300

    client: imp code, docs, tests

commit f71a179
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Fri Jul 14 21:53:47 2023 +0300

    all: add new client processor; imp code
  • Loading branch information
ainar-g committed Jul 18, 2023
1 parent 0cd441f commit dead10e
Show file tree
Hide file tree
Showing 10 changed files with 664 additions and 33 deletions.
57 changes: 57 additions & 0 deletions internal/aghtest/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import (
"context"
"io"
"io/fs"
"net/netip"

"github.com/AdguardTeam/AdGuardHome/internal/aghos"
"github.com/AdguardTeam/AdGuardHome/internal/client"
"github.com/AdguardTeam/AdGuardHome/internal/next/agh"
"github.com/AdguardTeam/AdGuardHome/internal/rdns"
"github.com/AdguardTeam/AdGuardHome/internal/whois"
"github.com/AdguardTeam/dnsproxy/upstream"
"github.com/miekg/dns"
)
Expand Down Expand Up @@ -135,6 +139,59 @@ func (s *ServiceWithConfig[ConfigType]) Config() (c ConfigType) {
return s.OnConfig()
}

// Package client

// AddressProcessor is a fake [client.AddressProcessor] implementation for
// tests.
type AddressProcessor struct {
OnProcess func(ip netip.Addr)
OnClose func() (err error)
}

// type check
var _ client.AddressProcessor = (*AddressProcessor)(nil)

// Process implements the [client.AddressProcessor] interface for
// *AddressProcessor.
func (p *AddressProcessor) Process(ip netip.Addr) {
p.OnProcess(ip)
}

// Close implements the [client.AddressProcessor] interface for
// *AddressProcessor.
func (p *AddressProcessor) Close() (err error) {
return p.OnClose()
}

// AddressUpdater is a fake [client.AddressUpdater] implementation for tests.
type AddressUpdater struct {
OnUpdateAddress func(ip netip.Addr, host string, info *whois.Info)
}

// type check
var _ client.AddressUpdater = (*AddressUpdater)(nil)

// UpdateAddress implements the [client.AddressUpdater] interface for
// *AddressUpdater.
func (p *AddressUpdater) UpdateAddress(ip netip.Addr, host string, info *whois.Info) {
p.OnUpdateAddress(ip, host, info)
}

// Package rdns

// Exchanger is a fake [rdns.Exchanger] implementation for tests.
type Exchanger struct {
OnExchange func(ip netip.Addr) (host string, err error)
}

// type check
var _ rdns.Exchanger = (*Exchanger)(nil)

// Exchange implements [rdns.Exchanger] interface for *Exchanger.
func (e *Exchanger) Exchange(ip netip.Addr) (host string, err error) {
return e.OnExchange(ip)
}

// Module dnsproxy

// Package upstream
Expand Down
293 changes: 293 additions & 0 deletions internal/client/addrproc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package client

import (
"context"
"net/netip"
"sync"
"time"

"github.com/AdguardTeam/AdGuardHome/internal/rdns"
"github.com/AdguardTeam/AdGuardHome/internal/whois"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/netutil"
)

// ErrClosed is returned from [AddressProcessor.Close] if it's closed more than
// once.
const ErrClosed errors.Error = "use of closed address processor"

// AddressProcessor is the interface for types that can process clients.
type AddressProcessor interface {
Process(ip netip.Addr)
Close() (err error)
}

// EmptyAddrProc is an [AddressProcessor] that does nothing.
type EmptyAddrProc struct{}

// type check
var _ AddressProcessor = EmptyAddrProc{}

// Process implements the [AddressProcessor] interface for EmptyAddrProc.
func (EmptyAddrProc) Process(_ netip.Addr) {}

// Close implements the [AddressProcessor] interface for EmptyAddrProc.
func (EmptyAddrProc) Close() (_ error) { return nil }

// DefaultAddrProcConfig is the configuration structure for address processors.
type DefaultAddrProcConfig struct {
// DialContext is used to create TCP connections to WHOIS servers.
// DialContext must not be nil if [DefaultAddrProcConfig.UseWHOIS] is true.
DialContext whois.DialContextFunc

// Exchanger is used to perform rDNS queries. Exchanger must not be nil if
// [DefaultAddrProcConfig.UseRDNS] is true.
Exchanger rdns.Exchanger

// PrivateSubnets are used to determine if an incoming IP address is
// private. It must not be nil.
PrivateSubnets netutil.SubnetSet

// AddressUpdater is used to update the information about a client's IP
// address. It must not be nil.
AddressUpdater AddressUpdater

// InitialAddresses are the addresses that are queued for processing
// immediately by [NewDefaultAddrProc].
InitialAddresses []netip.Addr

// UseRDNS, if true, enables resolving of client IP addresses using reverse
// DNS.
UseRDNS bool

// UsePrivateRDNS, if true, enables resolving of private client IP addresses
// using reverse DNS. See [DefaultAddrProcConfig.PrivateSubnets].
UsePrivateRDNS bool

// UseWHOIS, if true, enables resolving of client IP addresses using WHOIS.
UseWHOIS bool
}

// AddressUpdater is the interface for storages of DNS clients that can update
// information about them.
//
// TODO(a.garipov): Consider using the actual client storage once it is moved
// into this package.
type AddressUpdater interface {
// UpdateAddress updates information about an IP address, setting host (if
// not empty) and WHOIS information (if not nil).
UpdateAddress(ip netip.Addr, host string, info *whois.Info)
}

// DefaultAddrProc processes incoming client addresses with rDNS and WHOIS, if
// configured, and updates that information in a client storage.
type DefaultAddrProc struct {
// clientIPsMu serializes closure of clientIPs and access to isClosed.
clientIPsMu *sync.Mutex

// clientIPs is the channel queueing client processing tasks.
clientIPs chan netip.Addr

// rdns is used to perform rDNS lookups of clients' IP addresses.
rdns rdns.Interface

// whois is used to perform WHOIS lookups of clients' IP addresses.
whois whois.Interface

// addrUpdater is used to update the information about a client's IP
// address.
addrUpdater AddressUpdater

// privateSubnets are used to determine if an incoming IP address is
// private.
privateSubnets netutil.SubnetSet

// isClosed is set to true once the address processor is closed.
isClosed bool

// usePrivateRDNS, if true, enables resolving of private client IP addresses
// using reverse DNS.
usePrivateRDNS bool
}

const (
// defaultQueueSize is the size of queue of IPs for rDNS and WHOIS
// processing.
defaultQueueSize = 255

// defaultCacheSize is the maximum size of the cache for rDNS and WHOIS
// processing. It must be greater than zero.
defaultCacheSize = 10_000

// defaultIPTTL is the Time to Live duration for IP addresses cached by
// rDNS and WHOIS.
defaultIPTTL = 1 * time.Hour
)

// NewDefaultAddrProc returns a new running client address processor. c must
// not be nil.
func NewDefaultAddrProc(c *DefaultAddrProcConfig) (p *DefaultAddrProc) {
p = &DefaultAddrProc{
clientIPsMu: &sync.Mutex{},
clientIPs: make(chan netip.Addr, defaultQueueSize),
rdns: &rdns.Empty{},
addrUpdater: c.AddressUpdater,
whois: &whois.Empty{},
privateSubnets: c.PrivateSubnets,
usePrivateRDNS: c.UsePrivateRDNS,
}

if c.UseRDNS {
p.rdns = rdns.New(&rdns.Config{
Exchanger: c.Exchanger,
CacheSize: defaultCacheSize,
CacheTTL: defaultIPTTL,
})
}

if c.UseWHOIS {
p.whois = newWHOIS(c.DialContext)
}

go p.process()

for _, ip := range c.InitialAddresses {
p.Process(ip)
}

return p
}

// newWHOIS returns a whois.Interface instance using the given function for
// dialing.
func newWHOIS(dialFunc whois.DialContextFunc) (w whois.Interface) {
// TODO(s.chzhen): Consider making configurable.
const (
// defaultTimeout is the timeout for WHOIS requests.
defaultTimeout = 5 * time.Second

// defaultMaxConnReadSize is an upper limit in bytes for reading from a
// net.Conn.
defaultMaxConnReadSize = 64 * 1024

// defaultMaxRedirects is the maximum redirects count.
defaultMaxRedirects = 5

// defaultMaxInfoLen is the maximum length of whois.Info fields.
defaultMaxInfoLen = 250
)

return whois.New(&whois.Config{
DialContext: dialFunc,
ServerAddr: whois.DefaultServer,
Port: whois.DefaultPort,
Timeout: defaultTimeout,
CacheSize: defaultCacheSize,
MaxConnReadSize: defaultMaxConnReadSize,
MaxRedirects: defaultMaxRedirects,
MaxInfoLen: defaultMaxInfoLen,
CacheTTL: defaultIPTTL,
})
}

// type check
var _ AddressProcessor = (*DefaultAddrProc)(nil)

// Process implements the [AddressProcessor] interface for *DefaultAddrProc.
func (p *DefaultAddrProc) Process(ip netip.Addr) {
p.clientIPsMu.Lock()
defer p.clientIPsMu.Unlock()

if p.isClosed {
return
}

select {
case p.clientIPs <- ip:
// Go on.
default:
log.Debug("clients: ip channel is full; len: %d", len(p.clientIPs))
}
}

// process processes the incoming client IP-address information. It is intended
// to be used as a goroutine. Once clientIPs is closed, process exits.
func (p *DefaultAddrProc) process() {
defer log.OnPanic("addrProcessor.process")

log.Info("clients: processing addresses")

for ip := range p.clientIPs {
host := p.processRDNS(ip)
info := p.processWHOIS(ip)

p.addrUpdater.UpdateAddress(ip, host, info)
}

log.Info("clients: finished processing addresses")
}

// processRDNS resolves the clients' IP addresses using reverse DNS. host is
// empty if there were errors or if the information hasn't changed.
func (p *DefaultAddrProc) processRDNS(ip netip.Addr) (host string) {
start := time.Now()
log.Debug("clients: processing %s with rdns", ip)
defer func() {
log.Debug("clients: finished processing %s with rdns in %s", ip, time.Since(start))
}()

ok := p.shouldResolve(ip)
if !ok {
return
}

host, changed := p.rdns.Process(ip)
if !changed {
host = ""
}

return host
}

// shouldResolve returns false if ip is a loopback address, or ip is private and
// resolving of private addresses is disabled.
func (p *DefaultAddrProc) shouldResolve(ip netip.Addr) (ok bool) {
return !ip.IsLoopback() &&
(p.usePrivateRDNS || !p.privateSubnets.Contains(ip.AsSlice()))
}

// processWHOIS looks up the information about clients' IP addresses in the
// WHOIS databases. info is nil if there were errors or if the information
// hasn't changed.
func (p *DefaultAddrProc) processWHOIS(ip netip.Addr) (info *whois.Info) {
start := time.Now()
log.Debug("clients: processing %s with whois", ip)
defer func() {
log.Debug("clients: finished processing %s with whois in %s", ip, time.Since(start))
}()

// TODO(s.chzhen): Move the timeout logic from WHOIS configuration to the
// context.
info, changed := p.whois.Process(context.Background(), ip)
if !changed {
info = nil
}

return info
}

// Close implements the [AddressProcessor] interface for *DefaultAddrProc.
func (p *DefaultAddrProc) Close() (err error) {
p.clientIPsMu.Lock()
defer p.clientIPsMu.Unlock()

if p.isClosed {
return ErrClosed
}

close(p.clientIPs)
p.isClosed = true

return nil
}

0 comments on commit dead10e

Please sign in to comment.