Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

network: structured output for kademlia table #1586

Merged
merged 2 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions api/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,16 @@ func NewInspector(api *API, hive *network.Hive, netStore *storage.NetStore) *Ins
}

// Hive prints the kademlia table
func (inspector *Inspector) Hive() string {
return inspector.hive.String()
func (i *Inspector) Hive() string {
return i.hive.String()
}

func (inspector *Inspector) ListKnown() []string {
res := []string{}
for _, v := range inspector.hive.Kademlia.ListKnown() {
res = append(res, fmt.Sprintf("%v", v))
}
return res
// KademliaInfo returns structured output of the Kademlia state that we can check for equality
func (i *Inspector) KademliaInfo() network.KademliaInfo {
return i.hive.KademliaInfo()
}

func (inspector *Inspector) IsPullSyncing() bool {
func (i *Inspector) IsPullSyncing() bool {
lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)

// last received chunks msg time
Expand All @@ -63,11 +60,12 @@ func (inspector *Inspector) IsPullSyncing() bool {
return lrct.After(time.Now().Add(-15 * time.Second))
}

func (inspector *Inspector) DeliveriesPerPeer() map[string]int64 {
// DeliveriesPerPeer returns the sum of chunks we received from a given peer
func (i *Inspector) DeliveriesPerPeer() map[string]int64 {
res := map[string]int64{}

// iterate connection in kademlia
inspector.hive.Kademlia.EachConn(nil, 255, func(p *network.Peer, po int) bool {
i.hive.Kademlia.EachConn(nil, 255, func(p *network.Peer, po int) bool {
// get how many chunks we receive for retrieve requests per peer
peermetric := fmt.Sprintf("chunk.delivery.%x", p.Over()[:16])

Expand All @@ -82,10 +80,10 @@ func (inspector *Inspector) DeliveriesPerPeer() map[string]int64 {
// Has checks whether each chunk address is present in the underlying datastore,
// the bool in the returned structs indicates if the underlying datastore has
// the chunk stored with the given address (true), or not (false)
func (inspector *Inspector) Has(chunkAddresses []storage.Address) string {
func (i *Inspector) Has(chunkAddresses []storage.Address) string {
hostChunks := []string{}
for _, addr := range chunkAddresses {
has, err := inspector.netStore.Has(context.Background(), addr)
has, err := i.netStore.Has(context.Background(), addr)
if err != nil {
log.Error(err.Error())
}
Expand Down
71 changes: 59 additions & 12 deletions network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -93,6 +94,14 @@ type Kademlia struct {
nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed
}

type KademliaInfo struct {
Depth int `json:"depth"`
TotalConnections int `json:"total_connections"`
TotalKnown int `json:"total_known"`
Connections [][]string `json:"connections"`
Known [][]string `json:"known"`
}

// NewKademlia creates a Kademlia table for base address addr
// with parameters as in params
// if params is nil, it uses default values
Expand Down Expand Up @@ -422,18 +431,6 @@ func (k *Kademlia) Off(p *Peer) {
}
}

func (k *Kademlia) ListKnown() []*BzzAddr {
res := []*BzzAddr{}

k.addrs.Each(func(val pot.Val) bool {
e := val.(*entry)
res = append(res, e.BzzAddr)
return true
})

return res
}

// EachConn is an iterator with args (base, po, f) applies f to each live peer
// that has proximity order po or less as measured from the base
// if base is nil, kademlia base address is used
Expand Down Expand Up @@ -576,6 +573,56 @@ func (k *Kademlia) BaseAddr() []byte {
return k.base
}

func (k *Kademlia) KademliaInfo() KademliaInfo {
k.lock.RLock()
defer k.lock.RUnlock()
return k.kademliaInfo()
}

func (k *Kademlia) kademliaInfo() (ki KademliaInfo) {
ki.Depth = depthForPot(k.conns, k.NeighbourhoodSize, k.base)
ki.TotalConnections = k.conns.Size()
ki.TotalKnown = k.addrs.Size()
ki.Connections = make([][]string, k.MaxProxDisplay)
ki.Known = make([][]string, k.MaxProxDisplay)

k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
if po >= k.MaxProxDisplay {
po = k.MaxProxDisplay - 1
}

row := []string{}
f(func(val pot.Val) bool {
e := val.(*Peer)
row = append(row, fmt.Sprintf("%x", e.Address()))
return true
})
sort.Strings(row)
ki.Connections[po] = row

return true
})

k.addrs.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
if po >= k.MaxProxDisplay {
po = k.MaxProxDisplay - 1
}

row := []string{}
f(func(val pot.Val) bool {
e := val.(*entry)
row = append(row, fmt.Sprintf("%x", e.Address()))
return true
})
sort.Strings(row)
ki.Known[po] = row

return true
})

return
}

// String returns kademlia table + kaddb table displayed with ascii
func (k *Kademlia) String() string {
k.lock.RLock()
Expand Down