Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IP Filter #635

Merged
merged 4 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ const (

type (
contractor struct {
ap *Autopilot
logger *zap.SugaredLogger
ap *Autopilot
resolver *ipResolver
logger *zap.SugaredLogger

maintenanceTxnID types.TransactionID
revisionBroadcastInterval time.Duration
Expand Down Expand Up @@ -127,6 +128,7 @@ type (
func newContractor(ap *Autopilot, revisionSubmissionBuffer uint64, revisionBroadcastInterval time.Duration) *contractor {
return &contractor{
ap: ap,
resolver: newIPResolver(resolverLookupTimeout, ap.logger.Named("resolver")),
logger: ap.logger.Named("contractor"),
revisionBroadcastInterval: revisionBroadcastInterval,
revisionLastBroadcast: make(map[types.FileContractID]time.Time),
Expand Down Expand Up @@ -598,6 +600,9 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
return nil, nil, nil, nil, nil, err
}

// create new IP filter
ipFilter := c.newIPFilter()

// calculate 'maxKeepLeeway' which defines the amount of contracts we'll be
// lenient towards when we fail to either fetch a valid price table or the
// contract's revision
Expand All @@ -618,9 +623,6 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
)
}()

// create a new ip filter
f := newIPFilter(c.logger)

// return variables
toArchive = make(map[types.FileContractID]string)
toStopUsing = make(map[types.FileContractID]string)
Expand Down Expand Up @@ -710,6 +712,8 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
if contract.Revision == nil {
if _, found := inCurrentSet[fcid]; !found || remainingKeepLeeway == 0 {
toStopUsing[fcid] = errContractNoRevision.Error()
} else if !state.cfg.Hosts.AllowRedundantIPs && ipFilter.IsRedundantIP(contract.HostIP, contract.HostKey) {
toStopUsing[fcid] = fmt.Sprintf("%v; %v", errHostRedundantIP, errContractNoRevision)
} else {
toKeep = append(toKeep, fcid)
remainingKeepLeeway-- // we let it slide
Expand All @@ -736,7 +740,7 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
c.logger.Errorw(fmt.Sprintf("failed to compute renterFunds for contract: %v", err))
}

usable, recoverable, refresh, renew, reasons := isUsableContract(state.cfg, ci, cs.BlockHeight, renterFunds, f)
usable, recoverable, refresh, renew, reasons := c.isUsableContract(state.cfg, ci, cs.BlockHeight, renterFunds, ipFilter)
ci.usable = usable
ci.recoverable = recoverable
if !usable {
Expand Down Expand Up @@ -777,6 +781,7 @@ func (c *contractor) runContractFormations(ctx context.Context, w Worker, hosts

// convenience variables
state := c.ap.State()
shouldFilter := !state.cfg.Hosts.AllowRedundantIPs

c.logger.Debugw(
"run contract formations",
Expand Down Expand Up @@ -811,10 +816,12 @@ func (c *contractor) runContractFormations(ctx context.Context, w Worker, hosts
gc := worker.NewGougingChecker(state.gs, cs, state.fee, state.cfg.Contracts.Period, state.cfg.Contracts.RenewWindow)

// prepare an IP filter that contains all used hosts
f := newIPFilter(c.logger)
for _, h := range hosts {
if _, used := usedHosts[h.PublicKey]; used {
_ = f.isRedundantIP(h.NetAddress, h.PublicKey)
ipFilter := c.newIPFilter()
if shouldFilter {
for _, h := range hosts {
if _, used := usedHosts[h.PublicKey]; used {
_ = ipFilter.IsRedundantIP(h.NetAddress, h.PublicKey)
}
}
}

Expand Down Expand Up @@ -854,7 +861,7 @@ func (c *contractor) runContractFormations(ctx context.Context, w Worker, hosts
}

// check if we already have a contract with a host on that subnet
if !state.cfg.Hosts.AllowRedundantIPs && f.isRedundantIP(host.NetAddress, host.PublicKey) {
if shouldFilter && ipFilter.IsRedundantIP(host.NetAddress, host.PublicKey) {
continue
}

Expand Down
18 changes: 9 additions & 9 deletions autopilot/hostfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,38 +222,38 @@ func isUsableHost(cfg api.AutopilotConfig, rs api.RedundancySettings, gc worker.
// - recoverable -> can be usable in the contract set if it is refreshed/renewed
// - refresh -> should be refreshed
// - renew -> should be renewed
func isUsableContract(cfg api.AutopilotConfig, ci contractInfo, bh uint64, renterFunds types.Currency, f *ipFilter) (usable, recoverable, refresh, renew bool, reasons []string) {
c, s, pt := ci.contract, ci.settings, ci.priceTable
func (c *contractor) isUsableContract(cfg api.AutopilotConfig, ci contractInfo, bh uint64, renterFunds types.Currency, f *ipFilter) (usable, recoverable, refresh, renew bool, reasons []string) {
contract, s, pt := ci.contract, ci.settings, ci.priceTable

usable = true
if bh > c.EndHeight() {
if bh > contract.EndHeight() {
reasons = append(reasons, errContractExpired.Error())
usable = false
recoverable = false
refresh = false
renew = false
} else if c.Revision.RevisionNumber == math.MaxUint64 {
} else if contract.Revision.RevisionNumber == math.MaxUint64 {
reasons = append(reasons, errContractMaxRevisionNumber.Error())
usable = false
recoverable = false
refresh = false
renew = false
} else {
if isOutOfCollateral(c, s, pt, renterFunds, cfg.Contracts.Period, bh) {
if isOutOfCollateral(contract, s, pt, renterFunds, cfg.Contracts.Period, bh) {
reasons = append(reasons, errContractOutOfCollateral.Error())
usable = false
recoverable = true
refresh = true
renew = false
}
if isOutOfFunds(cfg, s, c) {
if isOutOfFunds(cfg, s, contract) {
reasons = append(reasons, errContractOutOfFunds.Error())
usable = false
recoverable = true
refresh = true
renew = false
}
if shouldRenew, secondHalf := isUpForRenewal(cfg, *c.Revision, bh); shouldRenew {
if shouldRenew, secondHalf := isUpForRenewal(cfg, *contract.Revision, bh); shouldRenew {
reasons = append(reasons, fmt.Errorf("%w; second half: %t", errContractUpForRenewal, secondHalf).Error())
usable = usable && !secondHalf // only unusable if in second half of renew window
recoverable = true
Expand All @@ -263,13 +263,13 @@ func isUsableContract(cfg api.AutopilotConfig, ci contractInfo, bh uint64, rente
}

// IP check should be last since it modifies the filter
if !cfg.Hosts.AllowRedundantIPs && (usable || recoverable) && f.isRedundantIP(c.HostIP, c.HostKey) {
shouldFilter := !cfg.Hosts.AllowRedundantIPs && (usable || recoverable)
if shouldFilter && f.IsRedundantIP(contract.HostIP, contract.HostKey) {
reasons = append(reasons, errHostRedundantIP.Error())
usable = false
recoverable = false // do not use in the contract set, but keep it around for downloads
renew = false // do not renew, but allow refreshes so the contracts stays funded
}

return
}

Expand Down
167 changes: 125 additions & 42 deletions autopilot/ipfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autopilot

import (
"context"
"errors"
"fmt"
"net"
"strings"
Expand All @@ -16,75 +17,150 @@ const (
ipv4FilterRange = 24
ipv6FilterRange = 32

// ipCacheEntryValidity defines the amount of time the IP filter uses a
// cached entry when it encounters an error while trying to resolve a host's
// IP address
ipCacheEntryValidity = 24 * time.Hour

// resolverLookupTimeout is the timeout we apply when resolving a host's IP address
resolverLookupTimeout = 5 * time.Second
)

type resolver interface {
LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error)
var (
errIOTimeout = errors.New("i/o timeout")
errNoSuchHost = errors.New("no such host")
errServerMisbehaving = errors.New("server misbehaving")
errTooManyAddresses = errors.New("host has more than two addresses, or two of the same type")
errUnparsableAddress = errors.New("host address could not be parsed to a subnet")
)

type (
ipFilter struct {
subnetToHostKey map[string]string

resolver *ipResolver
logger *zap.SugaredLogger
}
)

func (c *contractor) newIPFilter() *ipFilter {
c.resolver.pruneCache()
return &ipFilter{
subnetToHostKey: make(map[string]string),

resolver: c.resolver,
logger: c.logger,
}
}

type ipFilter struct {
subnets map[string]string
resolver resolver
timeout time.Duration
func (f *ipFilter) IsRedundantIP(hostIP string, hostKey types.PublicKey) bool {
// perform lookup
subnets, err := f.resolver.lookup(hostIP)
if err != nil {
if !strings.Contains(err.Error(), errNoSuchHost.Error()) {
f.logger.Errorf("failed to check for redundant IP, treating host %v with IP %v as redundant, err: %v", hostKey, hostIP, err)
}
return true
}

// return early if we couldn't resolve to a subnet
if len(subnets) == 0 {
f.logger.Errorf("failed to resolve IP to a subnet, treating host %v with IP %v as redundant, err: %v", hostKey, hostIP, errUnparsableAddress)
return true
}

logger *zap.SugaredLogger
// check if we know about this subnet, if not register all the subnets
host, found := f.subnetToHostKey[subnets[0]]
if !found {
for _, subnet := range subnets {
f.subnetToHostKey[subnet] = hostKey.String()
}
return false
}

// otherwise compare host keys
sameHost := host == hostKey.String()
return !sameHost
}

func newIPFilter(logger *zap.SugaredLogger) *ipFilter {
return &ipFilter{
subnets: make(map[string]string),
type (
resolver interface {
LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error)
}

ipResolver struct {
resolver resolver
cache map[string]ipCacheEntry
timeout time.Duration

logger *zap.SugaredLogger
}

ipCacheEntry struct {
created time.Time
subnets []string
}
)

func newIPResolver(timeout time.Duration, logger *zap.SugaredLogger) *ipResolver {
if timeout == 0 {
panic("timeout must be greater than zero") // developer error
}
return &ipResolver{
resolver: &net.Resolver{},
cache: make(map[string]ipCacheEntry),
timeout: resolverLookupTimeout,

logger: logger,
logger: logger,
}
}

func (f *ipFilter) isRedundantIP(hostIP string, hostKey types.PublicKey) bool {
// create a context
ctx := context.Background()
if f.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), f.timeout)
defer cancel()
func (r *ipResolver) pruneCache() {
for hostIP, entry := range r.cache {
if time.Since(entry.created) > ipCacheEntryValidity {
delete(r.cache, hostIP)
}
}
}

// lookup all IP addresses for the given host
func (r *ipResolver) lookup(hostIP string) ([]string, error) {
// split off host
host, _, err := net.SplitHostPort(hostIP)
if err != nil {
return true
return nil, err
}
addresses, err := f.resolver.LookupIPAddr(ctx, host)
if err != nil {
if !strings.Contains(err.Error(), "no such host") {
f.logger.Debugf("failed to lookup IP for host %v, err: %v", hostKey, err)

// make sure we don't hang
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()

// lookup IP addresses
addrs, err := r.resolver.LookupIPAddr(ctx, host)
if err != nil && (isErr(err, errIOTimeout) || isErr(err, errServerMisbehaving)) {
if entry, found := r.cache[hostIP]; found && time.Since(entry.created) < ipCacheEntryValidity {
r.logger.Debugf("using cached IP addresses for %v, err: %v", hostIP, err)
return entry.subnets, nil
}
return true
return nil, err
}

// filter hosts associated with more than two addresses or two of the same type
if len(addresses) > 2 || (len(addresses) == 2) && (len(addresses[0].IP) == len(addresses[1].IP)) {
return true
// filter out hosts associated with more than two addresses or two of the same type
if len(addrs) > 2 || (len(addrs) == 2) && (len(addrs[0].IP) == len(addrs[1].IP)) {
return nil, errTooManyAddresses
}

// check whether the host's subnet was already in the list, if it is in the
// list we compare the cached net address with the one from the host being
// filtered as it might be the same host
var filter bool
for _, subnet := range subnets(addresses) {
original, exists := f.subnets[subnet]
if exists && hostKey.String() != original {
filter = true
} else if !exists {
f.subnets[subnet] = hostKey.String()
}
// parse out subnets
subnets := parseSubnets(addrs)

// add to cache
r.cache[hostIP] = ipCacheEntry{
created: time.Now(),
subnets: subnets,
}
return filter

return subnets, nil
}

func subnets(addresses []net.IPAddr) []string {
func parseSubnets(addresses []net.IPAddr) []string {
subnets := make([]string, 0, len(addresses))

for _, address := range addresses {
Expand All @@ -107,3 +183,10 @@ func subnets(addresses []net.IPAddr) []string {

return subnets
}

func isErr(err error, target error) bool {
if errors.Is(err, target) {
return true
}
return err != nil && target != nil && strings.Contains(err.Error(), target.Error())
}