Skip to content

Commit

Permalink
fix(dot/network, lib/grandpa): fix node sync, improve devnet finality
Browse files Browse the repository at this point in the history
  • Loading branch information
noot committed Jul 6, 2021
1 parent ce9a8a4 commit bcc7935
Show file tree
Hide file tree
Showing 26 changed files with 924 additions and 500 deletions.
10 changes: 5 additions & 5 deletions dot/digest/digest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ func TestHandler_GrandpaScheduledChange(t *testing.T) {
require.NoError(t, err)

headers := addTestBlocksToState(t, 2, handler.blockState)
for _, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), 0, 0)
for i, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), uint64(i), 0)
}

// authorities should change on start of block 3 from start
headers = addTestBlocksToState(t, 1, handler.blockState)
for _, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), 0, 0)
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), 3, 0)
}

time.Sleep(time.Millisecond * 500)
Expand Down Expand Up @@ -231,8 +231,8 @@ func TestHandler_GrandpaPauseAndResume(t *testing.T) {
require.Equal(t, big.NewInt(int64(p.Delay)), nextPause)

headers := addTestBlocksToState(t, 3, handler.blockState)
for _, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), 0, 0)
for i, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), uint64(i), 0)
}

time.Sleep(time.Millisecond * 100)
Expand Down
7 changes: 6 additions & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,12 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

if !added {
// TODO: ensure grandpa stores *all* previously received votes and discards them
// only when they are for already finalised rounds; currently this causes issues
// because a vote might be received slightly too early, causing a round mismatch err,
// causing grandpa to discard the vote.
_, isConsensusMsg := msg.(*ConsensusMessage)
if !added && !isConsensusMsg {
return
}
}
Expand Down
49 changes: 28 additions & 21 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

// SendBlockReqestByHash sends a block request to the network with the given block hash
func (s *Service) SendBlockReqestByHash(hash common.Hash) {
req := createBlockRequestWithHash(hash, blockRequestSize)
s.syncQueue.requestDataByHash.Delete(hash)
s.syncQueue.trySync(&syncRequest{
req: req,
to: "",
})
}

// handleSyncStream handles streams with the <protocol-id>/sync/2 protocol ID
func (s *Service) handleSyncStream(stream libp2pnetwork.Stream) {
if stream == nil {
Expand Down Expand Up @@ -537,7 +547,11 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error
}

q.responses = sortResponses(q.responses)
logger.Debug("pushed block data to queue", "start", start, "end", end, "queue", q.stringifyResponseQueue())
logger.Debug("pushed block data to queue", "start", start, "end", end,
"start hash", q.responses[0].Hash,
"end hash", q.responses[len(q.responses)-1].Hash,
"queue", q.stringifyResponseQueue(),
)
return nil
}

Expand Down Expand Up @@ -611,9 +625,10 @@ func (q *syncQueue) trySync(req *syncRequest) {
logger.Trace("trying peers in prioritised order...")
syncPeers := q.getSortedPeers()

for _, peer := range syncPeers {
for i, peer := range syncPeers {
// if peer doesn't respond multiple times, then ignore them TODO: determine best values for this
if peer.score <= badPeerThreshold {
// TODO: if we only have a few peers, should we do this check at all?
if peer.score <= badPeerThreshold && i > q.s.cfg.MinPeers {
break
}

Expand Down Expand Up @@ -647,9 +662,6 @@ func (q *syncQueue) trySync(req *syncRequest) {

q.justificationRequestData.Store(startingBlockHash, reqdata)
}

req.to = ""
q.requestCh <- req
}

func (q *syncQueue) syncWithPeer(peer peer.ID, req *BlockRequestMessage) (*BlockResponseMessage, error) {
Expand Down Expand Up @@ -737,7 +749,7 @@ func (q *syncQueue) handleBlockData(data []*types.BlockData) {

end := data[len(data)-1].Number().Int64()
if end <= finalised.Number.Int64() {
logger.Debug("ignoring block data that is below our head", "got", end, "head", finalised.Number.Int64())
logger.Debug("ignoring block data that is below our finalised head", "got", end, "head", finalised.Number.Int64())
q.pushRequest(uint64(end+1), blockRequestBufferSize, "")
return
}
Expand Down Expand Up @@ -844,21 +856,16 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID)
return
}

if header.Number.Int64() <= q.goal {
return
if header.Number.Int64() > q.goal {
q.goal = header.Number.Int64()
}

q.goal = header.Number.Int64()

bestNum, err := q.s.blockState.BestBlockNumber()
if err != nil {
logger.Error("failed to get best block number", "error", err)
return
req := createBlockRequestWithHash(header.Hash(), blockRequestSize)
q.requestDataByHash.Delete(req)
q.requestCh <- &syncRequest{
req: req,
to: from,
}

// TODO: if we're at the head, this should request by hash instead of number, since there will
// certainly be blocks with the same number.
q.pushRequest(uint64(bestNum.Int64()+1), blockRequestBufferSize, from)
}

func createBlockRequest(startInt int64, size uint32) *BlockRequestMessage {
Expand All @@ -875,7 +882,7 @@ func createBlockRequest(startInt int64, size uint32) *BlockRequestMessage {
RequestedData: RequestedDataHeader + RequestedDataBody + RequestedDataJustification,
StartingBlock: start,
EndBlockHash: optional.NewHash(false, common.Hash{}),
Direction: 0, // ascending
Direction: 0, // TODO: define this somewhere
Max: max,
}

Expand All @@ -896,7 +903,7 @@ func createBlockRequestWithHash(startHash common.Hash, size uint32) *BlockReques
RequestedData: RequestedDataHeader + RequestedDataBody + RequestedDataJustification,
StartingBlock: start,
EndBlockHash: optional.NewHash(false, common.Hash{}),
Direction: 0, // ascending
Direction: 0, // TODO: define this somewhere
Max: max,
}

Expand Down
9 changes: 5 additions & 4 deletions dot/network/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,12 @@ func TestSyncQueue_HandleBlockAnnounce(t *testing.T) {
require.True(t, ok)
require.Equal(t, 1, score.(int))
require.Equal(t, testBlockAnnounceMessage.Number.Int64(), q.goal)
require.Equal(t, 6, len(q.requestCh))
require.Equal(t, 1, len(q.requestCh))

head, err := q.s.blockState.BestBlockNumber()
require.NoError(t, err)
expected := createBlockRequest(head.Int64(), blockRequestSize)
header := &types.Header{
Number: testBlockAnnounceMessage.Number,
}
expected := createBlockRequestWithHash(header.Hash(), blockRequestSize)
req := <-q.requestCh
require.Equal(t, &syncRequest{req: expected, to: testPeerID}, req)
}
Expand Down
5 changes: 3 additions & 2 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,9 @@ func (bs *BlockState) SetFinalizedHash(hash common.Hash, round, setID uint64) er
}
}

go bs.notifyFinalized(hash, round, setID)
if round > 0 {
go bs.notifyFinalized(hash, round, setID)

err := bs.SetRound(round)
if err != nil {
return err
Expand All @@ -452,7 +453,7 @@ func (bs *BlockState) SetFinalizedHash(hash common.Hash, round, setID uint64) er
return err
}

logger.Trace("pruned block", "hash", rem)
logger.Trace("pruned block", "hash", rem, "number", header.Number)
bs.pruneKeyCh <- header
}

Expand Down
4 changes: 2 additions & 2 deletions dot/state/block_notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestFinalizedChannel(t *testing.T) {
chain, _ := AddBlocksToState(t, bs, 3)

for _, b := range chain {
bs.SetFinalizedHash(b.Hash(), 0, 0)
bs.SetFinalizedHash(b.Hash(), 1, 0)
}

for i := 0; i < 1; i++ {
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestFinalizedChannel_Multi(t *testing.T) {
}

time.Sleep(time.Millisecond * 10)
bs.SetFinalizedHash(chain[0].Hash(), 0, 0)
bs.SetFinalizedHash(chain[0].Hash(), 1, 0)
wg.Wait()

for _, id := range ids {
Expand Down
8 changes: 4 additions & 4 deletions dot/sync/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ func (s *Service) CreateBlockResponse(blockRequest *network.BlockRequestMessage)
responseData := []*types.BlockData{}

switch blockRequest.Direction {
case 0: // ascending (ie child to parent)
for i := endHeader.Number.Int64(); i >= startHeader.Number.Int64(); i-- {
case 0: // ascending (ie parent to child)
for i := startHeader.Number.Int64(); i <= endHeader.Number.Int64(); i++ {
blockData, err := s.getBlockData(big.NewInt(i), blockRequest.RequestedData)
if err != nil {
return nil, err
}
responseData = append(responseData, blockData)
}
case 1: // descending (ie parent to child)
for i := startHeader.Number.Int64(); i <= endHeader.Number.Int64(); i++ {
case 1: // descending (ie child to parent)
for i := endHeader.Number.Int64(); i >= startHeader.Number.Int64(); i-- {
blockData, err := s.getBlockData(big.NewInt(i), blockRequest.RequestedData)
if err != nil {
return nil, err
Expand Down
18 changes: 9 additions & 9 deletions dot/sync/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestService_CreateBlockResponse_MaxSize(t *testing.T) {
RequestedData: 3,
StartingBlock: start,
EndBlockHash: optional.NewHash(false, common.Hash{}),
Direction: 1,
Direction: 0,
Max: optional.NewUint32(false, 0),
}

Expand All @@ -62,7 +62,7 @@ func TestService_CreateBlockResponse_MaxSize(t *testing.T) {
RequestedData: 3,
StartingBlock: start,
EndBlockHash: optional.NewHash(false, common.Hash{}),
Direction: 1,
Direction: 0,
Max: optional.NewUint32(true, maxResponseSize+100),
}

Expand All @@ -87,7 +87,7 @@ func TestService_CreateBlockResponse_StartHash(t *testing.T) {
RequestedData: 3,
StartingBlock: start,
EndBlockHash: optional.NewHash(false, common.Hash{}),
Direction: 1,
Direction: 0,
Max: optional.NewUint32(false, 0),
}

Expand All @@ -98,7 +98,7 @@ func TestService_CreateBlockResponse_StartHash(t *testing.T) {
require.Equal(t, big.NewInt(128), resp.BlockData[127].Number())
}

func TestService_CreateBlockResponse_Ascending(t *testing.T) {
func TestService_CreateBlockResponse_Descending(t *testing.T) {
s := NewTestSyncer(t, false)
addTestBlocksToState(t, int(maxResponseSize), s.blockState)

Expand All @@ -112,7 +112,7 @@ func TestService_CreateBlockResponse_Ascending(t *testing.T) {
RequestedData: 3,
StartingBlock: start,
EndBlockHash: optional.NewHash(false, common.Hash{}),
Direction: 0,
Direction: 1,
Max: optional.NewUint32(false, 0),
}

Expand Down Expand Up @@ -169,7 +169,7 @@ func TestService_CreateBlockResponse(t *testing.T) {
RequestedData: 3,
StartingBlock: start,
EndBlockHash: optional.NewHash(true, endHash),
Direction: 1,
Direction: 0,
Max: optional.NewUint32(false, 0),
},
expectedMsgValue: &network.BlockResponseMessage{
Expand All @@ -188,7 +188,7 @@ func TestService_CreateBlockResponse(t *testing.T) {
RequestedData: 1,
StartingBlock: start,
EndBlockHash: optional.NewHash(true, endHash),
Direction: 1,
Direction: 0,
Max: optional.NewUint32(false, 0),
},
expectedMsgValue: &network.BlockResponseMessage{
Expand All @@ -207,7 +207,7 @@ func TestService_CreateBlockResponse(t *testing.T) {
RequestedData: 4,
StartingBlock: start,
EndBlockHash: optional.NewHash(true, endHash),
Direction: 1,
Direction: 0,
Max: optional.NewUint32(false, 0),
},
expectedMsgValue: &network.BlockResponseMessage{
Expand All @@ -227,7 +227,7 @@ func TestService_CreateBlockResponse(t *testing.T) {
RequestedData: 8,
StartingBlock: start,
EndBlockHash: optional.NewHash(true, endHash),
Direction: 1,
Direction: 0,
Max: optional.NewUint32(false, 0),
},
expectedMsgValue: &network.BlockResponseMessage{
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
Expand Down
3 changes: 3 additions & 0 deletions lib/common/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
// Hash used to store a blake2b hash
type Hash [32]byte

// EmptyHash is an empty [32]byte{}
var EmptyHash = Hash{}

// NewHash casts a byte array to a Hash
// if the input is longer than 32 bytes, it takes the first 32 bytes
func NewHash(in []byte) (res Hash) {
Expand Down
2 changes: 2 additions & 0 deletions lib/grandpa/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,6 @@ var (

// ErrAuthorityNotInSet is returned when a precommit within a justification is signed by a key not in the authority set
ErrAuthorityNotInSet = errors.New("authority is not in set")

errVoteExists = errors.New("already have vote")
)
Loading

0 comments on commit bcc7935

Please sign in to comment.