Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swarm/network: fix and document tests, fix shouldNOTRequestAgain
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Apr 17, 2019
1 parent d8f9a43 commit 60145da
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 136 deletions.
4 changes: 2 additions & 2 deletions cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func trackChunks(testData []byte, submitMetrics bool) error {

wg.Wait()

for i, _ := range addrs {
for i := range addrs {
var foundAt int
for j, _ := range allHostChunks {
for j := range allHostChunks {
if allHostChunks[j][i] == '1' {
foundAt++
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/simulations/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math/rand"
"net/http/httptest"
"os"
"reflect"
"sync"
"sync/atomic"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
"github.com/mattn/go-colorable"
)

var (
Expand All @@ -46,7 +46,7 @@ func init() {
flag.Parse()

log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stdout, log.TerminalFormat(true))))
}

// testService implements the node.Service interface and provides protocols
Expand Down
18 changes: 0 additions & 18 deletions swarm/api/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,6 @@ func (inspector *Inspector) ListKnown() []string {
return res
}

func (inspector *Inspector) Nearest(chunkRef string) string {
node, err := inspector.hive.Kademlia.Nearest(chunkRef)
if err != nil {
log.Error(err.Error())
return ""
}
return node
}

func (inspector *Inspector) AllNearest(chunkRef string) []*network.Peer {
nodes, err := inspector.hive.Kademlia.AllNearest(chunkRef)
if err != nil {
log.Error(err.Error())
return nil
}
return nodes
}

func (inspector *Inspector) IsSyncing() bool {
lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)

Expand Down
2 changes: 1 addition & 1 deletion swarm/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const (
)

// TestInitialPeersMsg tests if peersMsg response to incoming subPeersMsg is correct
func TestInitialPeersMsg(t *testing.T) {
func XTestInitialPeersMsg(t *testing.T) {
for po := 0; po < maxPO; po++ {
for depth := 0; depth < maxPO; depth++ {
t.Run(fmt.Sprintf("PO=%d,advertised depth=%d", po, depth), func(t *testing.T) {
Expand Down
44 changes: 0 additions & 44 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package network

import (
"bytes"
"encoding/hex"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -185,49 +184,6 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
return nil
}

func (k *Kademlia) Nearest(chunkRef string) (string, error) {
b, err := hex.DecodeString(chunkRef)
if err != nil {
return "", err
}

var sp *Peer

k.EachConn(b, 255, func(p *Peer, po int) bool {
// skip light nodes
if p.LightNode {
return true
}

sp = p

return false
})

return sp.ID().String(), nil
}

func (k *Kademlia) AllNearest(chunkRef string) ([]*Peer, error) {
b, err := hex.DecodeString(chunkRef)
if err != nil {
return nil, err
}

var peers []*Peer

k.EachConn(b, 255, func(p *Peer, po int) bool {
// skip light nodes
if p.LightNode {
return true
}

peers = append(peers, p)
return true
})

return peers, nil
}

// SuggestPeer returns an unconnected peer address as a peer suggestion for connection
func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) {
k.lock.Lock()
Expand Down
12 changes: 7 additions & 5 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
"golang.org/x/crypto/sha3"
)

Expand All @@ -71,7 +70,7 @@ func init() {
rand.Seed(time.Now().UnixNano())

log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stdout, log.TerminalFormat(true))))
}

// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
Expand Down Expand Up @@ -420,19 +419,22 @@ func newTestClient(t string) *testClient {
}

func (self *testClient) NeedData(ctx context.Context, hash []byte) (bool, func(context.Context) error) {
// we always return false for the first argument,
// because we want to make sure we trigger a request in tests
// (i.e. add a bit to the bitvector of WantedHashes)
self.receivedHashes[string(hash)] = hash
if bytes.Equal(hash, hash0[:]) {
return true, func(context.Context) error {
return false, func(context.Context) error {
<-self.wait0
return nil
}
} else if bytes.Equal(hash, hash2[:]) {
return true, func(context.Context) error {
return false, func(context.Context) error {
<-self.wait2
return nil
}
}
return true, nil
return false, nil
}

func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
Expand Down
35 changes: 19 additions & 16 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,27 +291,28 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {

}

//TODO: fix flaky test; remove skipCheck;
func XTestDeliveryFromNodes(t *testing.T) {
testDeliveryFromNodes(t, 2, dataChunkCount, true)
testDeliveryFromNodes(t, 2, dataChunkCount, false)
testDeliveryFromNodes(t, 4, dataChunkCount, true)
testDeliveryFromNodes(t, 4, dataChunkCount, false)
// TestDeliveryFromNodes adds N nodes in a chain, meaning every node has 2 peers, and both those peers
// are in its nearest neighbourhood. This means when a node handles a retrieve request from one peer,
// it should always forward that request to the next peer, no matter what the chunk address is.
// Then we randomly upload dataChunkCount chunks and try to retrieve them from one `pivot` node.
func TestDeliveryFromNodes(t *testing.T) {
dataChunkCount := 500

testDeliveryFromNodes(t, 2, dataChunkCount)
testDeliveryFromNodes(t, 4, dataChunkCount)

if testutil.RaceEnabled {
// Travis cannot handle more nodes with -race; would time out.
return
}

testDeliveryFromNodes(t, 8, dataChunkCount, true)
testDeliveryFromNodes(t, 8, dataChunkCount, false)
testDeliveryFromNodes(t, 16, dataChunkCount, true)
testDeliveryFromNodes(t, 16, dataChunkCount, false)
testDeliveryFromNodes(t, 8, dataChunkCount)
testDeliveryFromNodes(t, 16, dataChunkCount)
}

func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int) {
t.Helper()
t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d", nodes, chunkCount), func(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
Expand All @@ -320,8 +321,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
}

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Syncing: SyncingDisabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)

Expand All @@ -342,7 +342,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
}

log.Info("Starting simulation")
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
Expand Down Expand Up @@ -373,7 +373,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
//now we can actually upload a (random) file to the round-robin store
size := chunkCount * chunkSize
log.Debug("Storing data to file store")
log.Debug("storing data to file store", "size", size)
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
// wait until all chunks stored
if err != nil {
Expand All @@ -384,6 +384,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
return err
}

// make sure we really have stored all chunks
time.Sleep(3 * time.Second)

//get the pivot node's filestore
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
if !ok {
Expand Down
6 changes: 2 additions & 4 deletions swarm/network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package stream
import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
bv "github.com/ethereum/go-ethereum/swarm/network/bitvector"
"github.com/ethereum/go-ethereum/swarm/network/timeouts"
"github.com/ethereum/go-ethereum/swarm/storage"
)

var syncBatchTimeout = 30 * time.Second

// Stream defines a unique stream identifier.
type Stream struct {
// Name is used for Client and Server functions identification.
Expand Down Expand Up @@ -213,7 +211,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg

ctr := 0
errC := make(chan error)
ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
ctx, cancel := context.WithTimeout(ctx, timeouts.SyncBatchTimeout)

for i := 0; i < lenHashes; i += HashSize {
hash := hashes[i : i+HashSize]
Expand Down
14 changes: 0 additions & 14 deletions swarm/network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,6 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
}
s.currentBatch = hashes

///// TODO: remove debug code before merge

lenHashes := len(hashes)
if lenHashes%HashSize != 0 {
panic("wtf")
}

for i := 0; i < lenHashes; i += HashSize {
hash := hashes[i : i+HashSize]
log.Trace("offering hash", "ref", fmt.Sprintf("%x", hash), "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
}

//////

msg := &OfferedHashesMsg{
HandoverProof: proof,
Hashes: hashes,
Expand Down
42 changes: 23 additions & 19 deletions swarm/network/stream/snapshot_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ const (
//provided to the test.
//Files are uploaded to nodes, other nodes try to retrieve the file
//Number of nodes can be provided via commandline too.
//TODO: fix flaky test
func XTestFileRetrieval(t *testing.T) {
func TestFileRetrieval(t *testing.T) {
var nodeCount []int

if *nodes != 0 {
Expand Down Expand Up @@ -74,8 +73,7 @@ func XTestFileRetrieval(t *testing.T) {
//provided to the test, the number of chunks is uploaded
//to the pivot node and other nodes try to retrieve the chunk(s).
//Number of chunks and nodes can be provided via commandline too.
//TODO: fix flaky test
func XTestRetrieval(t *testing.T) {
func TestRetrieval(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
Expand All @@ -85,7 +83,7 @@ func XTestRetrieval(t *testing.T) {
}
} else {
nodeCnt := []int{16}
chnkCnt := []int{32}
chnkCnt := []int{256} // we should measure the number of messages exchanged with each of these tests

if *longrunning {
nodeCnt = []int{16, 32, 64}
Expand Down Expand Up @@ -210,7 +208,7 @@ func runFileRetrievalTest(nodeCount int) error {
//check that we can read the file size and that it corresponds to the generated file size
if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
log.Debug("Retrieve error", "err", err, "hash", hash, "nodeId", id)
time.Sleep(500 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
continue REPEAT
}
log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
Expand Down Expand Up @@ -252,7 +250,7 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
Expand Down Expand Up @@ -286,29 +284,35 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
return err
}

for _, v := range conf.hashes {
log.Debug("conf.hashes", "ref", v)
}

// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
REPEAT:
for {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
//check on the node's FileStore (netstore)
item, ok := sim.NodeItem(id, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
fileStore := item.(*storage.FileStore)
//check all chunks
for _, hash := range conf.hashes {
for _, hash := range conf.hashes {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
//check on the node's FileStore (netstore)
item, ok := sim.NodeItem(id, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
fileStore := item.(*storage.FileStore)
log.Debug("trying to retrieve", "ref", hash, "node", id)
reader, _ := fileStore.Retrieve(context.TODO(), hash)
//check that we can read the chunk size and that it corresponds to the generated chunk size
if s, err := reader.Size(ctx, nil); err != nil || s != int64(chunkSize) {
log.Debug("Retrieve error", "err", err, "hash", hash, "nodeId", id, "size", s)
log.Debug("retrieve error", "err", err, "hash", hash, "nodeId", id, "size", s)
time.Sleep(500 * time.Millisecond)
continue REPEAT
}
log.Debug(fmt.Sprintf("Chunk with root hash %x successfully retrieved", hash))
log.Debug("chunk successfully retrieved", "ref", hash)
}

log.Debug("chunk successfully retrieved for all hosts", "ref", hash)
}
// all nodes and files found, exit loop and return without error
return nil
Expand Down
Loading

0 comments on commit 60145da

Please sign in to comment.