Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swarm/network: fix and document tests, fix shouldNOTRequestAgain
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Apr 17, 2019
1 parent d8f9a43 commit 935a0bd
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 92 deletions.
4 changes: 2 additions & 2 deletions cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func trackChunks(testData []byte, submitMetrics bool) error {

wg.Wait()

for i, _ := range addrs {
for i := range addrs {
var foundAt int
for j, _ := range allHostChunks {
for j := range allHostChunks {
if allHostChunks[j][i] == '1' {
foundAt++
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/simulations/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math/rand"
"net/http/httptest"
"os"
"reflect"
"sync"
"sync/atomic"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
"github.com/mattn/go-colorable"
)

var (
Expand All @@ -46,7 +46,7 @@ func init() {
flag.Parse()

log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stdout, log.TerminalFormat(true))))
}

// testService implements the node.Service interface and provides protocols
Expand Down
18 changes: 0 additions & 18 deletions swarm/api/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,6 @@ func (inspector *Inspector) ListKnown() []string {
return res
}

func (inspector *Inspector) Nearest(chunkRef string) string {
node, err := inspector.hive.Kademlia.Nearest(chunkRef)
if err != nil {
log.Error(err.Error())
return ""
}
return node
}

func (inspector *Inspector) AllNearest(chunkRef string) []*network.Peer {
nodes, err := inspector.hive.Kademlia.AllNearest(chunkRef)
if err != nil {
log.Error(err.Error())
return nil
}
return nodes
}

func (inspector *Inspector) IsSyncing() bool {
lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)

Expand Down
2 changes: 1 addition & 1 deletion swarm/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const (
)

// TestInitialPeersMsg tests if peersMsg response to incoming subPeersMsg is correct
func TestInitialPeersMsg(t *testing.T) {
func XTestInitialPeersMsg(t *testing.T) {
for po := 0; po < maxPO; po++ {
for depth := 0; depth < maxPO; depth++ {
t.Run(fmt.Sprintf("PO=%d,advertised depth=%d", po, depth), func(t *testing.T) {
Expand Down
12 changes: 7 additions & 5 deletions swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
"golang.org/x/crypto/sha3"
)

Expand All @@ -71,7 +70,7 @@ func init() {
rand.Seed(time.Now().UnixNano())

log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stdout, log.TerminalFormat(true))))
}

// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
Expand Down Expand Up @@ -420,19 +419,22 @@ func newTestClient(t string) *testClient {
}

func (self *testClient) NeedData(ctx context.Context, hash []byte) (bool, func(context.Context) error) {
// we always return false for the first argument,
// because we want to make sure we trigger a request in tests
// (i.e. add a bit to the bitvector of WantedHashes)
self.receivedHashes[string(hash)] = hash
if bytes.Equal(hash, hash0[:]) {
return true, func(context.Context) error {
return false, func(context.Context) error {
<-self.wait0
return nil
}
} else if bytes.Equal(hash, hash2[:]) {
return true, func(context.Context) error {
return false, func(context.Context) error {
<-self.wait2
return nil
}
}
return true, nil
return false, nil
}

func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
Expand Down
35 changes: 19 additions & 16 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,27 +291,28 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {

}

//TODO: fix flaky test; remove skipCheck;
func XTestDeliveryFromNodes(t *testing.T) {
testDeliveryFromNodes(t, 2, dataChunkCount, true)
testDeliveryFromNodes(t, 2, dataChunkCount, false)
testDeliveryFromNodes(t, 4, dataChunkCount, true)
testDeliveryFromNodes(t, 4, dataChunkCount, false)
// TestDeliveryFromNodes adds N nodes in a chain, meaning every node has 2 peers, and both those peers
// are in its nearest neighbourhood. This means when a node handles a retrieve request from one peer,
// it should always forward that request to the next peer, no matter what the chunk address is.
// Then we randomly upload dataChunkCount chunks and try to retrieve them from one `pivot` node.
func TestDeliveryFromNodes(t *testing.T) {
dataChunkCount := 500

testDeliveryFromNodes(t, 2, dataChunkCount)
testDeliveryFromNodes(t, 4, dataChunkCount)

if testutil.RaceEnabled {
// Travis cannot handle more nodes with -race; would time out.
return
}

testDeliveryFromNodes(t, 8, dataChunkCount, true)
testDeliveryFromNodes(t, 8, dataChunkCount, false)
testDeliveryFromNodes(t, 16, dataChunkCount, true)
testDeliveryFromNodes(t, 16, dataChunkCount, false)
testDeliveryFromNodes(t, 8, dataChunkCount)
testDeliveryFromNodes(t, 16, dataChunkCount)
}

func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int) {
t.Helper()
t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d", nodes, chunkCount), func(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
Expand All @@ -320,8 +321,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
}

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Syncing: SyncingDisabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)

Expand All @@ -342,7 +342,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
}

log.Info("Starting simulation")
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
Expand Down Expand Up @@ -373,7 +373,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
//now we can actually upload a (random) file to the round-robin store
size := chunkCount * chunkSize
log.Debug("Storing data to file store")
log.Debug("storing data to file store", "size", size)
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
// wait until all chunks stored
if err != nil {
Expand All @@ -384,6 +384,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
return err
}

// make sure we really have stored all chunks
time.Sleep(3 * time.Second)

//get the pivot node's filestore
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
if !ok {
Expand Down
6 changes: 2 additions & 4 deletions swarm/network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package stream
import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
bv "github.com/ethereum/go-ethereum/swarm/network/bitvector"
"github.com/ethereum/go-ethereum/swarm/network/timeouts"
"github.com/ethereum/go-ethereum/swarm/storage"
)

var syncBatchTimeout = 30 * time.Second

// Stream defines a unique stream identifier.
type Stream struct {
// Name is used for Client and Server functions identification.
Expand Down Expand Up @@ -213,7 +211,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg

ctr := 0
errC := make(chan error)
ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
ctx, cancel := context.WithTimeout(ctx, timeouts.SyncBatchTimeout)

for i := 0; i < lenHashes; i += HashSize {
hash := hashes[i : i+HashSize]
Expand Down
14 changes: 0 additions & 14 deletions swarm/network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,6 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
}
s.currentBatch = hashes

///// TODO: remove debug code before merge

lenHashes := len(hashes)
if lenHashes%HashSize != 0 {
panic("wtf")
}

for i := 0; i < lenHashes; i += HashSize {
hash := hashes[i : i+HashSize]
log.Trace("offering hash", "ref", fmt.Sprintf("%x", hash), "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
}

//////

msg := &OfferedHashesMsg{
HandoverProof: proof,
Hashes: hashes,
Expand Down
42 changes: 23 additions & 19 deletions swarm/network/stream/snapshot_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ const (
//provided to the test.
//Files are uploaded to nodes, other nodes try to retrieve the file
//Number of nodes can be provided via commandline too.
//TODO: fix flaky test
func XTestFileRetrieval(t *testing.T) {
func TestFileRetrieval(t *testing.T) {
var nodeCount []int

if *nodes != 0 {
Expand Down Expand Up @@ -74,8 +73,7 @@ func XTestFileRetrieval(t *testing.T) {
//provided to the test, the number of chunks is uploaded
//to the pivot node and other nodes try to retrieve the chunk(s).
//Number of chunks and nodes can be provided via commandline too.
//TODO: fix flaky test
func XTestRetrieval(t *testing.T) {
func TestRetrieval(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
Expand All @@ -85,7 +83,7 @@ func XTestRetrieval(t *testing.T) {
}
} else {
nodeCnt := []int{16}
chnkCnt := []int{32}
chnkCnt := []int{256} // we should measure the number of messages exchanged with each of these tests

if *longrunning {
nodeCnt = []int{16, 32, 64}
Expand Down Expand Up @@ -210,7 +208,7 @@ func runFileRetrievalTest(nodeCount int) error {
//check that we can read the file size and that it corresponds to the generated file size
if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
log.Debug("Retrieve error", "err", err, "hash", hash, "nodeId", id)
time.Sleep(500 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
continue REPEAT
}
log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
Expand Down Expand Up @@ -252,7 +250,7 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
Expand Down Expand Up @@ -286,29 +284,35 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
return err
}

for _, v := range conf.hashes {
log.Debug("conf.hashes", "ref", v)
}

// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
REPEAT:
for {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
//check on the node's FileStore (netstore)
item, ok := sim.NodeItem(id, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
fileStore := item.(*storage.FileStore)
//check all chunks
for _, hash := range conf.hashes {
for _, hash := range conf.hashes {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
//check on the node's FileStore (netstore)
item, ok := sim.NodeItem(id, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
fileStore := item.(*storage.FileStore)
log.Debug("trying to retrieve", "ref", hash, "node", id)
reader, _ := fileStore.Retrieve(context.TODO(), hash)
//check that we can read the chunk size and that it corresponds to the generated chunk size
if s, err := reader.Size(ctx, nil); err != nil || s != int64(chunkSize) {
log.Debug("Retrieve error", "err", err, "hash", hash, "nodeId", id, "size", s)
log.Debug("retrieve error", "err", err, "hash", hash, "nodeId", id, "size", s)
time.Sleep(500 * time.Millisecond)
continue REPEAT
}
log.Debug(fmt.Sprintf("Chunk with root hash %x successfully retrieved", hash))
log.Debug("chunk successfully retrieved", "ref", hash)
}

log.Debug("chunk successfully retrieved for all hosts", "ref", hash)
}
// all nodes and files found, exit loop and return without error
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/swarm/testutil"
"golang.org/x/crypto/sha3"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -114,6 +115,11 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
t.Fatalf("Expected no error, got %v", err)
}

hash0 := sha3.Sum256([]byte{0})
hash1 := sha3.Sum256([]byte{1})
hash2 := sha3.Sum256([]byte{2})
hashes := append(hash0[:], append(hash1[:], hash2[:]...)...)

err = tester.TestExchanges(
p2ptest.Exchange{
Label: "Subscribe message",
Expand Down Expand Up @@ -152,7 +158,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
Code: 2,
Msg: &WantedHashesMsg{
Stream: stream,
Want: []byte{5},
Want: []byte{5}, // 101, because we want to first and the third hash
From: 9,
To: 0,
},
Expand Down Expand Up @@ -429,7 +435,14 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}

func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
corruptHashes = append(hashes[:40])
var (
hash0 = sha3.Sum256([]byte{0})
hash1 = sha3.Sum256([]byte{1})
hash2 = sha3.Sum256([]byte{2})
hashesTmp = append(hash0[:], hash1[:]...)
hashes = append(hashesTmp, hash2[:]...)
corruptHashes = append(hashes[:40])
)

tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
Expand Down Expand Up @@ -554,7 +567,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
Code: 2,
Msg: &WantedHashesMsg{
Stream: stream,
Want: []byte{5},
Want: []byte{5}, // 101, because we want to first and the third hash, based on the values of `hashes`
From: 9,
To: 0,
},
Expand Down
Loading

0 comments on commit 935a0bd

Please sign in to comment.