Skip to content

Commit

Permalink
Merge branch 'development' into prune-state-trie
Browse files Browse the repository at this point in the history
  • Loading branch information
arijitAD committed May 19, 2021
2 parents 3cd1599 + 935bc59 commit 1253b89
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 10 deletions.
54 changes: 44 additions & 10 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type syncQueue struct {
cancel context.CancelFunc
peerScore *sync.Map // map[peer.ID]int; peers we have successfully synced from before -> their score; score increases on successful response

requestDataByHash *sync.Map // map[common.Hash]requestData; caching requestData by hash
requestData *sync.Map // map[uint64]requestData; map of start # of request -> requestData
justificationRequestData *sync.Map // map[common.Hash]requestData; map of requests of justifications -> requestData
requestCh chan *syncRequest
Expand All @@ -144,6 +145,7 @@ func newSyncQueue(s *Service) *syncQueue {
cancel: cancel,
peerScore: new(sync.Map),
requestData: new(sync.Map),
requestDataByHash: new(sync.Map),
justificationRequestData: new(sync.Map),
requestCh: make(chan *syncRequest, blockRequestBufferSize),
responses: []*types.BlockData{},
Expand Down Expand Up @@ -457,6 +459,7 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error
}

startHash := resp.BlockData[0].Hash

if _, has := q.justificationRequestData.Load(startHash); has && !resp.BlockData[0].Header.Exists() {
numJustifications := 0
justificationResponses := []*types.BlockData{}
Expand Down Expand Up @@ -500,11 +503,18 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error

// update peer's score
q.updatePeerScore(pid, 1)
q.requestData.Store(uint64(start), requestData{

reqdata := requestData{
sent: true,
received: true,
from: pid,
})
}

if _, has := q.requestDataByHash.Load(startHash); has {
q.requestDataByHash.Store(startHash, reqdata)
} else {
q.requestData.Store(uint64(start), reqdata)
}

q.responseLock.Lock()
defer q.responseLock.Unlock()
Expand All @@ -522,6 +532,28 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error
return nil
}

func (q *syncQueue) isRequestDataCached(startingBlock *variadic.Uint64OrHash) (*requestData, bool) {
if startingBlock == nil {
return nil, false
}

if startingBlock.IsHash() {
if d, has := q.requestDataByHash.Load(startingBlock.Hash()); has {
data := d.(requestData)
return &data, true
}
}

if startingBlock.IsUint64() {
if d, has := q.requestData.Load(startingBlock.Uint64()); has {
data := d.(requestData)
return &data, true
}
}

return nil, false
}

func (q *syncQueue) processBlockRequests() {
for {
select {
Expand All @@ -530,16 +562,15 @@ func (q *syncQueue) processBlockRequests() {
continue
}

if !req.req.StartingBlock.IsUint64() {
reqData, ok := q.isRequestDataCached(req.req.StartingBlock)

if !ok {
q.trySync(req)
continue
}

if d, has := q.requestData.Load(req.req.StartingBlock.Uint64()); has {
data := d.(requestData)
if data.sent && data.received {
continue
}
if reqData.sent && reqData.received {
continue
}

q.trySync(req)
Expand Down Expand Up @@ -599,10 +630,13 @@ func (q *syncQueue) trySync(req *syncRequest) {
received: false,
})
} else if req.req.StartingBlock.IsHash() && (req.req.RequestedData&RequestedDataHeader) == 0 {
q.justificationRequestData.Store(req.req.StartingBlock.Hash(), requestData{
startingBlockHash := req.req.StartingBlock.Hash()
reqdata := requestData{
sent: true,
received: false,
})
}

q.justificationRequestData.Store(startingBlockHash, reqdata)
}

req.to = ""
Expand Down
48 changes: 48 additions & 0 deletions dot/network/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common/optional"
"github.com/ChainSafe/gossamer/lib/common/variadic"
"github.com/ChainSafe/gossamer/lib/utils"

"github.com/ChainSafe/chaindb"
Expand Down Expand Up @@ -421,6 +422,53 @@ func TestSyncQueue_processBlockResponses(t *testing.T) {
require.Equal(t, blockRequestBufferSize, len(q.requestCh))
}

func TestSyncQueue_isRequestDataCached(t *testing.T) {
q := newTestSyncQueue(t)
q.stop()

reqdata := requestData{
sent: true,
received: false,
}

// generate hash or uint64
hashtrack := variadic.NewUint64OrHashFromBytes([]byte{0, 0, 0, 1})
uinttrack := variadic.NewUint64OrHashFromBytes([]byte{1, 0, 0, 1})
othertrack := variadic.NewUint64OrHashFromBytes([]byte{1, 2, 3, 1})

tests := []struct {
variadic *variadic.Uint64OrHash
reqMessage BlockRequestMessage
expectedOk bool
expectedData *requestData
}{
{
variadic: hashtrack,
expectedOk: true,
expectedData: &reqdata,
},
{
variadic: uinttrack,
expectedOk: true,
expectedData: &reqdata,
},
{
variadic: othertrack,
expectedOk: false,
expectedData: nil,
},
}

q.requestDataByHash.Store(hashtrack.Hash(), reqdata)
q.requestData.Store(uinttrack.Uint64(), reqdata)

for _, test := range tests {
data, ok := q.isRequestDataCached(test.variadic)
require.Equal(t, test.expectedOk, ok)
require.Equal(t, test.expectedData, data)
}
}

func TestSyncQueue_SyncAtHead(t *testing.T) {
q := newTestSyncQueue(t)
q.stop()
Expand Down

0 comments on commit 1253b89

Please sign in to comment.