Skip to content

Commit

Permalink
Routing Table Refresh manager (ipfs#601)
Browse files Browse the repository at this point in the history
* rt refresh refactor
  • Loading branch information
aarshkshah1992 committed May 19, 2020
1 parent 08ab423 commit 2851c88
Show file tree
Hide file tree
Showing 8 changed files with 506 additions and 644 deletions.
104 changes: 62 additions & 42 deletions dht.go
Expand Up @@ -16,13 +16,11 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
"go.uber.org/zap"

"go.opencensus.io/tag"

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
Expand All @@ -35,6 +33,8 @@ import (
"github.com/multiformats/go-base32"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"go.opencensus.io/tag"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -71,6 +71,9 @@ type IpfsDHT struct {
// ProviderManager stores & manages the provider records for this Dht peer.
ProviderManager *providers.ProviderManager

// manages Routing Table refresh
rtRefreshManager *rtrefresh.RtRefreshManager

birth time.Time // When this peer started up

Validator record.Validator
Expand Down Expand Up @@ -104,11 +107,7 @@ type IpfsDHT struct {
queryPeerFilter QueryFilterFunc
routingTablePeerFilter RouteTableFilterFunc

autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshInterval time.Duration
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error
autoRefresh bool

// A set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
Expand All @@ -122,11 +121,6 @@ type IpfsDHT struct {
// networks).
enableProviders, enableValues bool

// successfulOutboundQueryGracePeriod is the maximum grace period we will give to a peer
// to between two successful query responses from it, failing which,
// we will ping it to see if it's alive.
successfulOutboundQueryGracePeriod time.Duration

fixLowPeersChan chan struct{}
}

Expand Down Expand Up @@ -156,14 +150,13 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
if err := cfg.validate(); err != nil {
return nil, err
}

dht, err := makeDHT(ctx, h, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
}

dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshInterval = cfg.routingTable.refreshInterval
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout

dht.maxRecordAge = cfg.maxRecordAge
dht.enableProviders = cfg.enableProviders
Expand Down Expand Up @@ -196,8 +189,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())

dht.startSelfLookup()
dht.startRefreshing()
if err := dht.rtRefreshManager.Start(); err != nil {
return nil, err
}

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
Expand Down Expand Up @@ -266,23 +260,44 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
fixLowPeersChan: make(chan struct{}, 1),
}

var maxLastSuccessfulOutboundThreshold time.Duration

// The threshold is calculated based on the expected amount of time that should pass before we
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
if cfg.concurrency < cfg.bucketSize { // (alpha < K)
l1 := math.Log(float64(1) / float64(cfg.bucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
} else {
maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
}

// construct routing table
rt, err := makeRoutingTable(dht, cfg)
rt, err := makeRoutingTable(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}
dht.routingTable = rt
dht.bootstrapPeers = cfg.bootstrapPeers

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
}
dht.rtRefreshManager = rtRefresh

// create a DHT proc with the given context
dht.proc = goprocessctx.WithContext(ctx)
dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
return rtRefresh.Close()
})

// create a tagged context derived from the original context
ctxTags := dht.newContextWithLocalTags(ctx)
Expand All @@ -298,19 +313,32 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return dht, nil
}

func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
// The threshold is calculated based on the expected amount of time that should pass before we
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
return string(p), err
}

queryFnc := func(ctx context.Context, key string) error {
_, err := dht.GetClosestPeers(ctx, key)
return err
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
keyGenFnc,
queryFnc,
cfg.routingTable.refreshQueryTimeout,
cfg.routingTable.refreshInterval,
maxLastSuccessfulOutboundThreshold)

return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(dht.host.ID())

rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold
cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand Down Expand Up @@ -397,10 +425,7 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
}

if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
default:
}
dht.rtRefreshManager.RefreshNoWait()
}
}

Expand Down Expand Up @@ -520,8 +545,8 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=N/A
// If we connect to a peer and exchange a query RPC ->
// LastUsefulAt=0
// If we connect to a peer and then exchange a query RPC ->
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
Expand All @@ -542,12 +567,7 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
// peer not added.
return
}

// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
// value that must have been set in the Routing Table for this peer when it was first added during a connection.
if newlyAdded && queryPeer {
dht.routingTable.UpdateLastUsefulAt(p, time.Now())
} else if queryPeer {
if !newlyAdded && queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
Expand Down

0 comments on commit 2851c88

Please sign in to comment.