Skip to content

Commit

Permalink
Merge pull request #635 from SiaFoundation/pj/ip-filter
Browse files Browse the repository at this point in the history
IP Filter
  • Loading branch information
ChrisSchinnerl committed Oct 5, 2023
2 parents c2a263d + 102393e commit e7cafa3
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 62 deletions.
29 changes: 18 additions & 11 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ const (

type (
contractor struct {
ap *Autopilot
logger *zap.SugaredLogger
ap *Autopilot
resolver *ipResolver
logger *zap.SugaredLogger

maintenanceTxnID types.TransactionID
revisionBroadcastInterval time.Duration
Expand Down Expand Up @@ -127,6 +128,7 @@ type (
func newContractor(ap *Autopilot, revisionSubmissionBuffer uint64, revisionBroadcastInterval time.Duration) *contractor {
return &contractor{
ap: ap,
resolver: newIPResolver(resolverLookupTimeout, ap.logger.Named("resolver")),
logger: ap.logger.Named("contractor"),
revisionBroadcastInterval: revisionBroadcastInterval,
revisionLastBroadcast: make(map[types.FileContractID]time.Time),
Expand Down Expand Up @@ -598,6 +600,9 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
return nil, nil, nil, nil, nil, err
}

// create new IP filter
ipFilter := c.newIPFilter()

// calculate 'maxKeepLeeway' which defines the amount of contracts we'll be
// lenient towards when we fail to either fetch a valid price table or the
// contract's revision
Expand All @@ -618,9 +623,6 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
)
}()

// create a new ip filter
f := newIPFilter(c.logger)

// return variables
toArchive = make(map[types.FileContractID]string)
toStopUsing = make(map[types.FileContractID]string)
Expand Down Expand Up @@ -710,6 +712,8 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
if contract.Revision == nil {
if _, found := inCurrentSet[fcid]; !found || remainingKeepLeeway == 0 {
toStopUsing[fcid] = errContractNoRevision.Error()
} else if !state.cfg.Hosts.AllowRedundantIPs && ipFilter.IsRedundantIP(contract.HostIP, contract.HostKey) {
toStopUsing[fcid] = fmt.Sprintf("%v; %v", errHostRedundantIP, errContractNoRevision)
} else {
toKeep = append(toKeep, fcid)
remainingKeepLeeway-- // we let it slide
Expand All @@ -736,7 +740,7 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
c.logger.Errorw(fmt.Sprintf("failed to compute renterFunds for contract: %v", err))
}

usable, recoverable, refresh, renew, reasons := isUsableContract(state.cfg, ci, cs.BlockHeight, renterFunds, f)
usable, recoverable, refresh, renew, reasons := c.isUsableContract(state.cfg, ci, cs.BlockHeight, renterFunds, ipFilter)
ci.usable = usable
ci.recoverable = recoverable
if !usable {
Expand Down Expand Up @@ -777,6 +781,7 @@ func (c *contractor) runContractFormations(ctx context.Context, w Worker, hosts

// convenience variables
state := c.ap.State()
shouldFilter := !state.cfg.Hosts.AllowRedundantIPs

c.logger.Debugw(
"run contract formations",
Expand Down Expand Up @@ -811,10 +816,12 @@ func (c *contractor) runContractFormations(ctx context.Context, w Worker, hosts
gc := worker.NewGougingChecker(state.gs, cs, state.fee, state.cfg.Contracts.Period, state.cfg.Contracts.RenewWindow)

// prepare an IP filter that contains all used hosts
f := newIPFilter(c.logger)
for _, h := range hosts {
if _, used := usedHosts[h.PublicKey]; used {
_ = f.isRedundantIP(h.NetAddress, h.PublicKey)
ipFilter := c.newIPFilter()
if shouldFilter {
for _, h := range hosts {
if _, used := usedHosts[h.PublicKey]; used {
_ = ipFilter.IsRedundantIP(h.NetAddress, h.PublicKey)
}
}
}

Expand Down Expand Up @@ -854,7 +861,7 @@ func (c *contractor) runContractFormations(ctx context.Context, w Worker, hosts
}

// check if we already have a contract with a host on that subnet
if !state.cfg.Hosts.AllowRedundantIPs && f.isRedundantIP(host.NetAddress, host.PublicKey) {
if shouldFilter && ipFilter.IsRedundantIP(host.NetAddress, host.PublicKey) {
continue
}

Expand Down
18 changes: 9 additions & 9 deletions autopilot/hostfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,38 +222,38 @@ func isUsableHost(cfg api.AutopilotConfig, rs api.RedundancySettings, gc worker.
// - recoverable -> can be usable in the contract set if it is refreshed/renewed
// - refresh -> should be refreshed
// - renew -> should be renewed
func isUsableContract(cfg api.AutopilotConfig, ci contractInfo, bh uint64, renterFunds types.Currency, f *ipFilter) (usable, recoverable, refresh, renew bool, reasons []string) {
c, s, pt := ci.contract, ci.settings, ci.priceTable
func (c *contractor) isUsableContract(cfg api.AutopilotConfig, ci contractInfo, bh uint64, renterFunds types.Currency, f *ipFilter) (usable, recoverable, refresh, renew bool, reasons []string) {
contract, s, pt := ci.contract, ci.settings, ci.priceTable

usable = true
if bh > c.EndHeight() {
if bh > contract.EndHeight() {
reasons = append(reasons, errContractExpired.Error())
usable = false
recoverable = false
refresh = false
renew = false
} else if c.Revision.RevisionNumber == math.MaxUint64 {
} else if contract.Revision.RevisionNumber == math.MaxUint64 {
reasons = append(reasons, errContractMaxRevisionNumber.Error())
usable = false
recoverable = false
refresh = false
renew = false
} else {
if isOutOfCollateral(c, s, pt, renterFunds, cfg.Contracts.Period, bh) {
if isOutOfCollateral(contract, s, pt, renterFunds, cfg.Contracts.Period, bh) {
reasons = append(reasons, errContractOutOfCollateral.Error())
usable = false
recoverable = true
refresh = true
renew = false
}
if isOutOfFunds(cfg, s, c) {
if isOutOfFunds(cfg, s, contract) {
reasons = append(reasons, errContractOutOfFunds.Error())
usable = false
recoverable = true
refresh = true
renew = false
}
if shouldRenew, secondHalf := isUpForRenewal(cfg, *c.Revision, bh); shouldRenew {
if shouldRenew, secondHalf := isUpForRenewal(cfg, *contract.Revision, bh); shouldRenew {
reasons = append(reasons, fmt.Errorf("%w; second half: %t", errContractUpForRenewal, secondHalf).Error())
usable = usable && !secondHalf // only unusable if in second half of renew window
recoverable = true
Expand All @@ -263,13 +263,13 @@ func isUsableContract(cfg api.AutopilotConfig, ci contractInfo, bh uint64, rente
}

// IP check should be last since it modifies the filter
if !cfg.Hosts.AllowRedundantIPs && (usable || recoverable) && f.isRedundantIP(c.HostIP, c.HostKey) {
shouldFilter := !cfg.Hosts.AllowRedundantIPs && (usable || recoverable)
if shouldFilter && f.IsRedundantIP(contract.HostIP, contract.HostKey) {
reasons = append(reasons, errHostRedundantIP.Error())
usable = false
recoverable = false // do not use in the contract set, but keep it around for downloads
renew = false // do not renew, but allow refreshes so the contracts stays funded
}

return
}

Expand Down
167 changes: 125 additions & 42 deletions autopilot/ipfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autopilot

import (
"context"
"errors"
"fmt"
"net"
"strings"
Expand All @@ -16,75 +17,150 @@ const (
ipv4FilterRange = 24
ipv6FilterRange = 32

// ipCacheEntryValidity defines the amount of time the IP filter uses a
// cached entry when it encounters an error while trying to resolve a host's
// IP address
ipCacheEntryValidity = 24 * time.Hour

// resolverLookupTimeout is the timeout we apply when resolving a host's IP address
resolverLookupTimeout = 5 * time.Second
)

type resolver interface {
LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error)
var (
errIOTimeout = errors.New("i/o timeout")
errNoSuchHost = errors.New("no such host")
errServerMisbehaving = errors.New("server misbehaving")
errTooManyAddresses = errors.New("host has more than two addresses, or two of the same type")
errUnparsableAddress = errors.New("host address could not be parsed to a subnet")
)

type (
ipFilter struct {
subnetToHostKey map[string]string

resolver *ipResolver
logger *zap.SugaredLogger
}
)

func (c *contractor) newIPFilter() *ipFilter {
c.resolver.pruneCache()
return &ipFilter{
subnetToHostKey: make(map[string]string),

resolver: c.resolver,
logger: c.logger,
}
}

type ipFilter struct {
subnets map[string]string
resolver resolver
timeout time.Duration
func (f *ipFilter) IsRedundantIP(hostIP string, hostKey types.PublicKey) bool {
// perform lookup
subnets, err := f.resolver.lookup(hostIP)
if err != nil {
if !strings.Contains(err.Error(), errNoSuchHost.Error()) {
f.logger.Errorf("failed to check for redundant IP, treating host %v with IP %v as redundant, err: %v", hostKey, hostIP, err)
}
return true
}

// return early if we couldn't resolve to a subnet
if len(subnets) == 0 {
f.logger.Errorf("failed to resolve IP to a subnet, treating host %v with IP %v as redundant, err: %v", hostKey, hostIP, errUnparsableAddress)
return true
}

logger *zap.SugaredLogger
// check if we know about this subnet, if not register all the subnets
host, found := f.subnetToHostKey[subnets[0]]
if !found {
for _, subnet := range subnets {
f.subnetToHostKey[subnet] = hostKey.String()
}
return false
}

// otherwise compare host keys
sameHost := host == hostKey.String()
return !sameHost
}

func newIPFilter(logger *zap.SugaredLogger) *ipFilter {
return &ipFilter{
subnets: make(map[string]string),
type (
resolver interface {
LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error)
}

ipResolver struct {
resolver resolver
cache map[string]ipCacheEntry
timeout time.Duration

logger *zap.SugaredLogger
}

ipCacheEntry struct {
created time.Time
subnets []string
}
)

func newIPResolver(timeout time.Duration, logger *zap.SugaredLogger) *ipResolver {
if timeout == 0 {
panic("timeout must be greater than zero") // developer error
}
return &ipResolver{
resolver: &net.Resolver{},
cache: make(map[string]ipCacheEntry),
timeout: resolverLookupTimeout,

logger: logger,
logger: logger,
}
}

func (f *ipFilter) isRedundantIP(hostIP string, hostKey types.PublicKey) bool {
// create a context
ctx := context.Background()
if f.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), f.timeout)
defer cancel()
func (r *ipResolver) pruneCache() {
for hostIP, entry := range r.cache {
if time.Since(entry.created) > ipCacheEntryValidity {
delete(r.cache, hostIP)
}
}
}

// lookup all IP addresses for the given host
func (r *ipResolver) lookup(hostIP string) ([]string, error) {
// split off host
host, _, err := net.SplitHostPort(hostIP)
if err != nil {
return true
return nil, err
}
addresses, err := f.resolver.LookupIPAddr(ctx, host)
if err != nil {
if !strings.Contains(err.Error(), "no such host") {
f.logger.Debugf("failed to lookup IP for host %v, err: %v", hostKey, err)

// make sure we don't hang
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()

// lookup IP addresses
addrs, err := r.resolver.LookupIPAddr(ctx, host)
if err != nil && (isErr(err, errIOTimeout) || isErr(err, errServerMisbehaving)) {
if entry, found := r.cache[hostIP]; found && time.Since(entry.created) < ipCacheEntryValidity {
r.logger.Debugf("using cached IP addresses for %v, err: %v", hostIP, err)
return entry.subnets, nil
}
return true
return nil, err
}

// filter hosts associated with more than two addresses or two of the same type
if len(addresses) > 2 || (len(addresses) == 2) && (len(addresses[0].IP) == len(addresses[1].IP)) {
return true
// filter out hosts associated with more than two addresses or two of the same type
if len(addrs) > 2 || (len(addrs) == 2) && (len(addrs[0].IP) == len(addrs[1].IP)) {
return nil, errTooManyAddresses
}

// check whether the host's subnet was already in the list, if it is in the
// list we compare the cached net address with the one from the host being
// filtered as it might be the same host
var filter bool
for _, subnet := range subnets(addresses) {
original, exists := f.subnets[subnet]
if exists && hostKey.String() != original {
filter = true
} else if !exists {
f.subnets[subnet] = hostKey.String()
}
// parse out subnets
subnets := parseSubnets(addrs)

// add to cache
r.cache[hostIP] = ipCacheEntry{
created: time.Now(),
subnets: subnets,
}
return filter

return subnets, nil
}

func subnets(addresses []net.IPAddr) []string {
func parseSubnets(addresses []net.IPAddr) []string {
subnets := make([]string, 0, len(addresses))

for _, address := range addresses {
Expand All @@ -107,3 +183,10 @@ func subnets(addresses []net.IPAddr) []string {

return subnets
}

func isErr(err error, target error) bool {
if errors.Is(err, target) {
return true
}
return err != nil && target != nil && strings.Contains(err.Error(), target.Error())
}

0 comments on commit e7cafa3

Please sign in to comment.