Skip to content

Commit

Permalink
Merge pull request ethereum#379 from ethersphere/kademlia-fixes
Browse files Browse the repository at this point in the history
Kademlia fixes
  • Loading branch information
zelig committed Apr 12, 2018
2 parents 5b7c6da + 88b2e3f commit 6f8f818
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 33 deletions.
33 changes: 24 additions & 9 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/pot"
)

Expand Down Expand Up @@ -263,11 +263,10 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) {
if po >= depth {
return false
}
f(func(val pot.Val, _ int) bool {
return f(func(val pot.Val, _ int) bool {
a = k.callable(val)
return a == nil
})
return false
})
// found a candidate
if a != nil {
Expand Down Expand Up @@ -616,16 +615,17 @@ type PeerPot struct {
EmptyBins []int
}

// NewPeerPot just creates a new pot record OverlayAddr
func NewPeerPot(kadMinProxSize int, ids []discover.NodeID, addrs [][]byte) map[discover.NodeID]*PeerPot {
// NewPeerPotMap creates a map of pot record of OverlayAddr with keys
// as hexadecimal representations of the address.
func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot {
// create a table of all nodes for health check
np := pot.NewPot(nil, 0)
for _, addr := range addrs {
np, _, _ = pot.Add(np, addr, pof)
}
ppmap := make(map[discover.NodeID]*PeerPot)
ppmap := make(map[string]*PeerPot)

for i, id := range ids {
for i, a := range addrs {
pl := 256
prev := 256
var emptyBins []int
Expand Down Expand Up @@ -654,7 +654,7 @@ func NewPeerPot(kadMinProxSize int, ids []discover.NodeID, addrs [][]byte) map[d
emptyBins = append(emptyBins, j)
}
log.Trace(fmt.Sprintf("%x NNS: %s", addrs[i][:4], logNNS(nns)))
ppmap[id] = &PeerPot{nns, emptyBins}
ppmap[common.Bytes2Hex(a)] = &PeerPot{nns, emptyBins}
}
return ppmap
}
Expand All @@ -674,23 +674,38 @@ func (k *Kademlia) saturation(n int) int {
return prev
}

// full returns true if all required bins have connected peers.
// It is used in Healthy function.
func (k *Kademlia) full(emptyBins []int) (full bool) {
prev := 0
e := len(emptyBins)
ok := true
depth := k.neighbourhoodDepth()
k.conns.EachBin(k.base, pof, 0, func(po, _ int, _ func(func(val pot.Val, i int) bool) bool) bool {
for i := prev; e > 0 && i < po; i++ {
if prev == depth+1 {
return true
}
for i := prev; i < po; i++ {
e--
if e < 0 {
ok = false
return false
}
if emptyBins[e] != i {
log.Trace(fmt.Sprintf("%08x po: %d, i: %d, e: %d, emptybins: %v", k.BaseAddr()[:4], po, i, e, logEmptyBins(emptyBins)))
if emptyBins[e] < i {
panic("incorrect peerpot")
}
ok = false
return false
}
}
prev = po + 1
return true
})
if !ok {
return false
}
return e == 0
}

Expand Down
274 changes: 269 additions & 5 deletions swarm/network/kademlia_test.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions swarm/network/simulations/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -207,7 +208,7 @@ func discoverySimulation(nodes, conns int, adapter adapters.NodeAdapter) (*simul
wg.Wait()
log.Debug(fmt.Sprintf("nodes: %v", len(addrs)))
// construct the peer pot, so that kademlia health can be checked
ppmap := network.NewPeerPot(testMinProxBinSize, ids, addrs)
ppmap := network.NewPeerPotMap(testMinProxBinSize, addrs)
check := func(ctx context.Context, id discover.NodeID) (bool, error) {
select {
case <-ctx.Done():
Expand All @@ -224,7 +225,8 @@ func discoverySimulation(nodes, conns int, adapter adapters.NodeAdapter) (*simul
return false, fmt.Errorf("error getting node client: %s", err)
}
healthy := &network.Health{}
if err := client.Call(&healthy, "hive_healthy", ppmap[id]); err != nil {
addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
if err := client.Call(&healthy, "hive_healthy", ppmap[addr]); err != nil {
return false, fmt.Errorf("error getting node health: %s", err)
}
log.Debug(fmt.Sprintf("node %4s healthy: got nearest neighbours: %v, know nearest neighbours: %v, saturated: %v\n%v", id, healthy.GotNN, healthy.KnowNN, healthy.Full, healthy.Hive))
Expand Down
22 changes: 10 additions & 12 deletions swarm/network/stream/snapshot_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
Expand All @@ -51,7 +52,7 @@ var (
startTime time.Time
ids []discover.NodeID
datadirs map[discover.NodeID]string
ppmap map[discover.NodeID]*network.PeerPot
ppmap map[string]*network.PeerPot

globalWg sync.WaitGroup

Expand Down Expand Up @@ -235,7 +236,7 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
log.Info("Test config successfully initialized")

//only needed for healthy call when debugging
ppmap = network.NewPeerPot(testMinProxBinSize, ids, conf.addrs)
ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)

//define the action to be performed before the test checks: start syncing
action := func(ctx context.Context) error {
Expand All @@ -246,7 +247,8 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
for _, id := range ids {
r := registries[id]
//PeerPot for this node
pp := ppmap[id]
addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
pp := ppmap[addr]
//call Healthy RPC
h := r.delivery.overlay.Healthy(pp)
//print info
Expand Down Expand Up @@ -323,11 +325,9 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
errc <- nil
}()

select {
case err := <-errc:
if err != nil {
return err
}
err := <-errc
if err != nil {
return err
}
log.Info("Stream subscriptions successfully requested")
if live {
Expand Down Expand Up @@ -428,10 +428,9 @@ func (r *TestRegistry) StartSyncing(ctx context.Context) error {
var err error

if log.Lvl(*loglevel) == log.LvlDebug {
//address of registry
add := r.addr.ID()
//PeerPot for this node
pp := ppmap[add]
addr := common.Bytes2Hex(r.addr.OAddr)
pp := ppmap[addr]
//call Healthy RPC
h := r.delivery.overlay.Healthy(pp)
//print info
Expand Down Expand Up @@ -618,7 +617,6 @@ func watchSubscriptionEvents(ctx context.Context, id discover.NodeID, client *rp
}
}
}()
return
}

//create a local store for the given node
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/testing/snapshot_128.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion swarm/network/stream/testing/snapshot_16.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion swarm/network/stream/testing/snapshot_256.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion swarm/network/stream/testing/snapshot_32.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion swarm/network/stream/testing/snapshot_64.json

Large diffs are not rendered by default.

0 comments on commit 6f8f818

Please sign in to comment.