Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swarm/network: debug trace code
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Apr 16, 2019
1 parent acf6431 commit d8f9a43
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 1 deletion.
20 changes: 20 additions & 0 deletions cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func trackChunks(testData []byte, submitMetrics bool) error {
var wg sync.WaitGroup
wg.Add(len(hosts))

var allHostChunksMu sync.Mutex
var allHostChunks []string

for _, host := range hosts {
host := host
go func() {
Expand All @@ -114,6 +117,10 @@ func trackChunks(testData []byte, submitMetrics bool) error {
return
}

allHostChunksMu.Lock()
allHostChunks = append(allHostChunks, hostChunks)
allHostChunksMu.Unlock()

yes, no := 0, 0
for _, val := range hostChunks {
if val == '1' {
Expand All @@ -140,6 +147,19 @@ func trackChunks(testData []byte, submitMetrics bool) error {

wg.Wait()

for i, _ := range addrs {
var foundAt int
for j, _ := range allHostChunks {
if allHostChunks[j][i] == '1' {
foundAt++
}
}
// if chunk found at less than 2 hosts
if foundAt < 2 {
log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i])
}
}

if !hasErr && submitMetrics {
// remove the chunks stored on the uploader node
globalYes -= len(addrs)
Expand Down
18 changes: 18 additions & 0 deletions swarm/api/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ func (inspector *Inspector) ListKnown() []string {
return res
}

func (inspector *Inspector) Nearest(chunkRef string) string {
node, err := inspector.hive.Kademlia.Nearest(chunkRef)
if err != nil {
log.Error(err.Error())
return ""
}
return node
}

func (inspector *Inspector) AllNearest(chunkRef string) []*network.Peer {
nodes, err := inspector.hive.Kademlia.AllNearest(chunkRef)
if err != nil {
log.Error(err.Error())
return nil
}
return nodes
}

func (inspector *Inspector) IsSyncing() bool {
lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)

Expand Down
44 changes: 44 additions & 0 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package network

import (
"bytes"
"encoding/hex"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -184,6 +185,49 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
return nil
}

func (k *Kademlia) Nearest(chunkRef string) (string, error) {
b, err := hex.DecodeString(chunkRef)
if err != nil {
return "", err
}

var sp *Peer

k.EachConn(b, 255, func(p *Peer, po int) bool {
// skip light nodes
if p.LightNode {
return true
}

sp = p

return false
})

return sp.ID().String(), nil
}

func (k *Kademlia) AllNearest(chunkRef string) ([]*Peer, error) {
b, err := hex.DecodeString(chunkRef)
if err != nil {
return nil, err
}

var peers []*Peer

k.EachConn(b, 255, func(p *Peer, po int) bool {
// skip light nodes
if p.LightNode {
return true
}

peers = append(peers, p)
return true
})

return peers, nil
}

// SuggestPeer returns an unconnected peer address as a peer suggestion for connection
func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) {
k.lock.Lock()
Expand Down
15 changes: 15 additions & 0 deletions swarm/network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,21 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
}
}
s.currentBatch = hashes

///// TODO: remove debug code before merge

lenHashes := len(hashes)
if lenHashes%HashSize != 0 {
panic("wtf")
}

for i := 0; i < lenHashes; i += HashSize {
hash := hashes[i : i+HashSize]
log.Trace("offering hash", "ref", fmt.Sprintf("%x", hash), "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
}

//////

msg := &OfferedHashesMsg{
HandoverProof: proof,
Hashes: hashes,
Expand Down
6 changes: 6 additions & 0 deletions swarm/network/stream/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network/timeouts"
"github.com/ethereum/go-ethereum/swarm/storage"
)
Expand Down Expand Up @@ -102,6 +103,10 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
// are added in batchTimeout period, the batch will be returned. This function
// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
if from > 0 {
from--
}

descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
defer stop()

Expand All @@ -128,6 +133,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
iterate = false
break
}
log.Trace("syncer add chunk", "ref", d.Address, "po", s.po, "from", from, "to", to)
batch = append(batch, d.Address[:]...)
// This is the most naive approach to label the chunk as synced
// allowing it to be garbage collected. A proper way requires
Expand Down
9 changes: 8 additions & 1 deletion swarm/storage/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package localstore

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
Expand Down Expand Up @@ -107,12 +109,13 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (exists bool, err error) {
// put to indexes: retrieve, push, pull

exists, err = db.retrievalDataIndex.Has(item)
po := db.po(item.Address)
if err != nil {
return false, err
}
if !exists {
item.StoreTimestamp = now()
item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
item.BinID, err = db.binIDs.IncInBatch(batch, uint64(po))
if err != nil {
return false, err
}
Expand All @@ -121,6 +124,10 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (exists bool, err error) {
triggerPullFeed = true
db.pushIndex.PutInBatch(batch, item)
triggerPushFeed = true

log.Trace("item pullindex-new", "ref", fmt.Sprintf("%x", item.Address), "kabin", po, "bin", item.BinID, "ts", item.StoreTimestamp)
} else {
log.Trace("item pullindex-exists", "ref", fmt.Sprintf("%x", item.Address), "kabin", po, "bin", item.BinID, "ts", item.StoreTimestamp)
}

case chunk.ModePutSync:
Expand Down

0 comments on commit d8f9a43

Please sign in to comment.