Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
network/stream: fix an edge case where a stream that was requested has
Browse files Browse the repository at this point in the history
become unwanted due to depth change in kademlia. this resulted in a
batch of chunks being delivered on the now unwanted stream before
_not_ requesting the next interval (due to WantStream returning false)
  • Loading branch information
acud committed Nov 15, 2019
1 parent 2eb42ed commit 7e500e7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
22 changes: 22 additions & 0 deletions network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package stream
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math/rand"
"strings"
"sync"
Expand Down Expand Up @@ -123,6 +125,22 @@ func New(intervalsStore state.Store, baseKey []byte, providers ...StreamProvider
r.providers[p.StreamName()] = p
}

go func() {
time.Sleep(3 * time.Second)
peerInfo, err := r.PeerInfo()
if err != nil {
//return "", err
panic(err)
}
v, err := json.Marshal(peerInfo)
if err != nil {
//return "", err
panic(err)
}

fmt.Println(string(v))
}()

return r
}

Expand Down Expand Up @@ -551,6 +569,7 @@ func (r *Registry) clientHandleOfferedHashes(ctx context.Context, p *Peer, msg *
for i := 0; i < lenHashes; i += HashSize {
hash := msg.Hashes[i : i+HashSize]
addresses[i/HashSize] = hash
p.logger.Trace("clientHandleOfferedHashes peer offered hash", "ruid", msg.Ruid, "stream", w.stream, "chunk", addresses[i/HashSize])
}

startNeed := time.Now()
Expand Down Expand Up @@ -755,6 +774,9 @@ func (r *Registry) serverHandleWantedHashes(ctx context.Context, p *Peer, msg *W
// clientHandleChunkDelivery handles chunk delivery messages
func (r *Registry) clientHandleChunkDelivery(ctx context.Context, p *Peer, msg *ChunkDelivery, w *want, provider StreamProvider) {
p.logger.Debug("clientHandleChunkDelivery", "ruid", msg.Ruid)
if !provider.WantStream(p, w.stream) {
return
}
processReceivedChunksMsgCount.Inc(1)
r.setLastReceivedChunkTime() // needed for IsPullSyncing

Expand Down
21 changes: 13 additions & 8 deletions network/stream/syncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func TestStarNetworkSyncWithBogusNodes(t *testing.T) {
minPivotDepth = 1
chunkSize = 4096
simTimeout = 60 * time.Second
syncTime = 2 * time.Second
syncTime = 4 * time.Second
filesize = chunkCount * chunkSize
)
sim := simulation.NewBzzInProc(map[string]simulation.ServiceFunc{
Expand Down Expand Up @@ -762,15 +762,23 @@ func TestStarNetworkSyncWithBogusNodes(t *testing.T) {
time.Sleep(syncTime)

pivotLs := sim.MustNodeItem(pivot, bucketKeyLocalStore).(*localstore.DB)
return verifyCorrectChunksOnPivot(chunkProx, pivotDepth, pivotLs)
verifyCorrectChunksOnPivot(t, chunkProx, pivotDepth, pivotLs)
return nil
})

if result.Error != nil {
t.Fatal(result.Error)
}
}

func verifyCorrectChunksOnPivot(chunkProx map[string]chunkProxData, pivotDepth int, pivotLs *localstore.DB) error {
// verifyCorrectChunksOnPivot checks which chunks should be present on the
// pivot node from the perspective of the pivot node. All streams established
// should be presumed from the point of view of the pivot and presence of
// chunks should be assumed by po(chunk,uploader)
// for example, if the pivot has depth==1 and the po(pivot,uploader)==1, then
// all chunks that have po(chunk,uploader)==1 should be synced to the pivot
func verifyCorrectChunksOnPivot(t *testing.T, chunkProx map[string]chunkProxData, pivotDepth int, pivotLs *localstore.DB) {
t.Helper()
for _, v := range chunkProx {
// outside of depth
if v.uploaderNodeToPivotNodePO < pivotDepth {
Expand All @@ -779,20 +787,17 @@ func verifyCorrectChunksOnPivot(chunkProx map[string]chunkProxData, pivotDepth i
//check that the chunk exists on the pivot when the chunkPo == uploaderPo
_, err := pivotLs.Get(context.Background(), chunk.ModeGetRequest, v.addr)
if err != nil {
log.Error("chunk errored", "uploaderNode", v.uploaderNode, "poUploader", v.chunkToUploaderPO, "uploaderToPivotPo", v.uploaderNodeToPivotNodePO, "chunk", hex.EncodeToString(v.addr))
return err
t.Errorf("chunk errored. err %v uploaderNode %s poUploader %d uploaderToPivotPo %d chunk %s", err, v.uploaderNode.String(), v.chunkToUploaderPO, v.uploaderNodeToPivotNodePO, hex.EncodeToString(v.addr))
}
} else {
//chunk should not be synced - exclusion test
_, err := pivotLs.Get(context.Background(), chunk.ModeGetRequest, v.addr)
if err == nil {
log.Error("chunk did not error but should have", "uploaderNode", v.uploaderNode, "poUploader", v.chunkToUploaderPO, "uploaderToPivotPo", v.uploaderNodeToPivotNodePO, "chunk", hex.EncodeToString(v.addr))
return err
t.Errorf("chunk did not error but should have. uploaderNode %s poUploader %d uploaderToPivotPo %d chunk %s", v.uploaderNode.String(), v.chunkToUploaderPO, v.uploaderNodeToPivotNodePO, hex.EncodeToString(v.addr))
}
}
}
}
return nil
}

type chunkProxData struct {
Expand Down

0 comments on commit 7e500e7

Please sign in to comment.