Skip to content

Commit

Permalink
fix(dot/sync): rework on bootstrap/tip sync (#3227)
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed Aug 11, 2023
1 parent 911d6b5 commit ab6650a
Show file tree
Hide file tree
Showing 46 changed files with 3,373 additions and 6,588 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/zombienet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
steps:
- uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.20"
stable: true
check-latest: true

Expand All @@ -19,7 +19,6 @@ jobs:
echo "::set-output name=go-build::$(go env GOCACHE)"
echo "::set-output name=go-mod::$(go env GOMODCACHE)"
- uses: actions/checkout@v3

- name: Go build cache
uses: actions/cache@v3
with:
Expand Down Expand Up @@ -51,4 +50,4 @@ jobs:
chmod +x /usr/local/bin/zombienet
- name: Zombienet test
run: |
zombienet test -p native zombienet_tests/functional/0001-basic-network.zndsl
zombienet test -p native zombienet_tests/functional/0001-basic-network.zndsl
18 changes: 17 additions & 1 deletion chain/westend/genesis.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,23 @@
"/dns/boot-node.helikon.io/tcp/7080/p2p/12D3KooWRFDPyT8vA8mLzh6dJoyujn4QNjeqi6Ch79eSMz9beKXC",
"/dns/boot-node.helikon.io/tcp/7082/wss/p2p/12D3KooWRFDPyT8vA8mLzh6dJoyujn4QNjeqi6Ch79eSMz9beKXC",
"/dns/westend.bootnode.amforc.com/tcp/30333/p2p/12D3KooWJ5y9ZgVepBQNW4aabrxgmnrApdVnscqgKWiUu4BNJbC8",
"/dns/westend.bootnode.amforc.com/tcp/30334/wss/p2p/12D3KooWJ5y9ZgVepBQNW4aabrxgmnrApdVnscqgKWiUu4BNJbC8"
"/dns/westend.bootnode.amforc.com/tcp/30334/wss/p2p/12D3KooWJ5y9ZgVepBQNW4aabrxgmnrApdVnscqgKWiUu4BNJbC8",
"/dns/westend-bootnode.polkadotters.com/tcp/30333/p2p/12D3KooWHPHb64jXMtSRJDrYFATWeLnvChL8NtWVttY67DCH1eC5",
"/dns/westend-bootnode.polkadotters.com/tcp/30334/wss/p2p/12D3KooWHPHb64jXMtSRJDrYFATWeLnvChL8NtWVttY67DCH1eC5",
"/dns/boot-cr.gatotech.network/tcp/33300/p2p/12D3KooWQGR1vUhoy6mvQorFp3bZFn6NNezhQZ6NWnVV7tpFgoPd",
"/dns/boot-cr.gatotech.network/tcp/35300/wss/p2p/12D3KooWQGR1vUhoy6mvQorFp3bZFn6NNezhQZ6NWnVV7tpFgoPd",
"/dns/boot-westend.metaspan.io/tcp/33012/p2p/12D3KooWNTau7iG4G9cUJSwwt2QJP1W88pUf2SgqsHjRU2RL8pfa",
"/dns/boot-westend.metaspan.io/tcp/33015/ws/p2p/12D3KooWNTau7iG4G9cUJSwwt2QJP1W88pUf2SgqsHjRU2RL8pfa",
"/dns/boot-westend.metaspan.io/tcp/33016/wss/p2p/12D3KooWNTau7iG4G9cUJSwwt2QJP1W88pUf2SgqsHjRU2RL8pfa",
"/dns/westend-bootnode.turboflakes.io/tcp/30310/p2p/12D3KooWJvPDCZmReU46ghpCMJCPVUvUCav4WQdKtXQhZgJdH6tZ",
"/dns/westend-bootnode.turboflakes.io/tcp/30410/wss/p2p/12D3KooWJvPDCZmReU46ghpCMJCPVUvUCav4WQdKtXQhZgJdH6tZ",
"/dns/westend-boot-ng.dwellir.com/tcp/443/wss/p2p/12D3KooWJifoDhCL3swAKt7MWhFb7wLRFD9oG33AL3nAathmU24x",
"/dns/westend-boot-ng.dwellir.com/tcp/30335/p2p/12D3KooWJifoDhCL3swAKt7MWhFb7wLRFD9oG33AL3nAathmU24x",
"/dns/westend-bootnode.radiumblock.com/tcp/30335/wss/p2p/12D3KooWJBowJuX1TaWNWHt8Dz8z44BoCZunLCfFqxA2rLTn6TBD",
"/dns/westend-bootnode.radiumblock.com/tcp/30333/p2p/12D3KooWJBowJuX1TaWNWHt8Dz8z44BoCZunLCfFqxA2rLTn6TBD",
"/dns/wnd-bootnode.stakeworld.io/tcp/30320/p2p/12D3KooWBYdKipcNbrV5rCbgT5hco8HMLME7cE9hHC3ckqCKDuzP",
"/dns/wnd-bootnode.stakeworld.io/tcp/30321/ws/p2p/12D3KooWBYdKipcNbrV5rCbgT5hco8HMLME7cE9hHC3ckqCKDuzP",
"/dns/wnd-bootnode.stakeworld.io/tcp/30322/wss/p2p/12D3KooWBYdKipcNbrV5rCbgT5hco8HMLME7cE9hHC3ckqCKDuzP"
],
"telemetryEndpoints": [
[
Expand Down
4 changes: 2 additions & 2 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
startDHTTimeout = time.Second * 10
initialAdvertisementTimeout = time.Millisecond
tryAdvertiseTimeout = time.Second * 30
connectToPeersTimeout = time.Minute * 5
connectToPeersTimeout = time.Minute
findPeersTimeout = time.Minute
)

Expand Down Expand Up @@ -183,7 +183,7 @@ func (d *discovery) checkPeerCount() {
case <-d.ctx.Done():
return
case <-ticker.C:
if len(d.h.Network().Peers()) > d.minPeers {
if len(d.h.Network().Peers()) >= d.maxPeers {
continue
}

Expand Down
2 changes: 2 additions & 0 deletions dot/network/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

var (
ErrReceivedEmptyMessage = errors.New("received empty message")

errCannotValidateHandshake = errors.New("failed to validate handshake")
errMessageTypeNotValid = errors.New("message type is not valid")
errInvalidHandshakeForPeer = errors.New("peer previously sent invalid handshake")
Expand Down
61 changes: 60 additions & 1 deletion dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/ChainSafe/gossamer/pkg/scale"
)

// MaxBlocksInResponse is maximum number of block data a BlockResponse message can contain
const MaxBlocksInResponse = 128

type MessageType byte

// Message types for notifications protocol messages. Used internally to map message to protocol.
Expand Down Expand Up @@ -44,6 +47,9 @@ const (
RequestedDataReceipt = byte(4)
RequestedDataMessageQueue = byte(8)
RequestedDataJustification = byte(16)
BootstrapRequestData = RequestedDataHeader +
RequestedDataBody +
RequestedDataJustification
)

var _ Message = (*BlockRequestMessage)(nil)
Expand Down Expand Up @@ -325,7 +331,7 @@ type ConsensusMessage struct {
}

// Type returns ConsensusMsgType
func (cm *ConsensusMessage) Type() MessageType {
func (*ConsensusMessage) Type() MessageType {
return ConsensusMsgType
}

Expand Down Expand Up @@ -354,3 +360,56 @@ func (cm *ConsensusMessage) Hash() (common.Hash, error) {
}
return common.Blake2bHash(encMsg)
}

func NewBlockRequest(startingBlock variadic.Uint32OrHash, amount uint32,
requestedData byte, direction SyncDirection) *BlockRequestMessage {
return &BlockRequestMessage{
RequestedData: requestedData,
StartingBlock: startingBlock,
Direction: direction,
Max: &amount,
}
}

func NewAscendingBlockRequests(startNumber, targetNumber uint, requestedData byte) []*BlockRequestMessage {
if startNumber > targetNumber {
return []*BlockRequestMessage{}
}

diff := targetNumber - (startNumber - 1)

// start and end block are the same, just request 1 block
if diff == 0 {
return []*BlockRequestMessage{
NewBlockRequest(*variadic.MustNewUint32OrHash(uint32(startNumber)), 1, requestedData, Ascending),
}
}

numRequests := diff / MaxBlocksInResponse
// we should check if the diff is in the maxResponseSize bounds
// otherwise we should increase the numRequests by one, take this
// example, we want to sync from 0 to 259, the diff is 259
// then the num of requests is 2 (uint(259)/uint(128)) however two requests will
// retrieve only 256 blocks (each request can retrieve a max of 128 blocks), so we should
// create one more request to retrieve those missing blocks, 3 in this example.
missingBlocks := diff % MaxBlocksInResponse
if missingBlocks != 0 {
numRequests++
}

reqs := make([]*BlockRequestMessage, numRequests)
for i := uint(0); i < numRequests; i++ {
max := uint32(MaxBlocksInResponse)

lastIteration := numRequests - 1
if i == lastIteration && missingBlocks != 0 {
max = uint32(missingBlocks)
}

start := variadic.MustNewUint32OrHash(startNumber)
reqs[i] = NewBlockRequest(*start, max, requestedData, Ascending)
startNumber += uint(max)
}

return reqs
}
131 changes: 131 additions & 0 deletions dot/network/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,134 @@ func TestDecodeConsensusMessage(t *testing.T) {
require.NoError(t, err)
require.Equal(t, encMsg, encodedMessage)
}

func TestAscendingBlockRequest(t *testing.T) {
one := uint32(1)
three := uint32(3)
maxResponseSize := uint32(MaxBlocksInResponse)
cases := map[string]struct {
startNumber, targetNumber uint
expectedBlockRequestMessage []*BlockRequestMessage
expectedTotalOfBlocksRequested uint32
}{
"start_greater_than_target": {
startNumber: 10,
targetNumber: 0,
expectedBlockRequestMessage: []*BlockRequestMessage{},
expectedTotalOfBlocksRequested: 0,
},

"no_difference_between_start_and_target": {
startNumber: 10,
targetNumber: 10,
expectedBlockRequestMessage: []*BlockRequestMessage{
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(10)),
Direction: Ascending,
Max: &one,
},
},
expectedTotalOfBlocksRequested: 1,
},

"requesting_128_blocks": {
startNumber: 1,
targetNumber: 128,
expectedTotalOfBlocksRequested: 128,
expectedBlockRequestMessage: []*BlockRequestMessage{
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(1)),
Direction: Ascending,
Max: &maxResponseSize,
},
},
},

"requesting_4_chunks_of_128_blocks": {
startNumber: 1,
targetNumber: 128 * 4, // 512
expectedTotalOfBlocksRequested: 512,
expectedBlockRequestMessage: []*BlockRequestMessage{
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(1)),
Direction: Ascending,
Max: &maxResponseSize,
},
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(129)),
Direction: Ascending,
Max: &maxResponseSize,
},
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(257)),
Direction: Ascending,
Max: &maxResponseSize,
},
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(385)),
Direction: Ascending,
Max: &maxResponseSize,
},
},
},

"requesting_4_chunks_of_128_plus_3_blocks": {
startNumber: 1,
targetNumber: (128 * 4) + 3,
expectedTotalOfBlocksRequested: 515,
expectedBlockRequestMessage: []*BlockRequestMessage{
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(1)),
Direction: Ascending,
Max: &maxResponseSize,
},
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(129)),
Direction: Ascending,
Max: &maxResponseSize,
},
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(257)),
Direction: Ascending,
Max: &maxResponseSize,
},
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(385)),
Direction: Ascending,
Max: &maxResponseSize,
},
{
RequestedData: BootstrapRequestData,
StartingBlock: *variadic.MustNewUint32OrHash(uint32(513)),
Direction: Ascending,
Max: &three,
},
},
},
}

for tname, tt := range cases {
tt := tt

t.Run(tname, func(t *testing.T) {
requests := NewAscendingBlockRequests(tt.startNumber, tt.targetNumber, BootstrapRequestData)
require.Equal(t, tt.expectedBlockRequestMessage, requests)

acc := uint32(0)
for _, r := range requests {
acc += *r.Max
}
require.Equal(t, tt.expectedTotalOfBlocksRequested, acc)
})
}
}
2 changes: 1 addition & 1 deletion dot/network/request_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (rrp *RequestResponseProtocol) receiveResponse(stream libp2pnetwork.Stream,
}

if n == 0 {
return fmt.Errorf("received empty message")
return ErrReceivedEmptyMessage
}

err = msg.Decode(buf[:n])
Expand Down
5 changes: 5 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,11 @@ func (s *Service) NetworkState() common.NetworkState {
}
}

// AllConnectedPeersIDs returns all the connected to the node instance
func (s *Service) AllConnectedPeersIDs() []peer.ID {
return s.host.p2pHost.Network().Peers()
}

// Peers returns information about connected peers needed for the rpc server
func (s *Service) Peers() []common.PeerInfo {
var peers []common.PeerInfo
Expand Down
6 changes: 0 additions & 6 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,10 @@
package network

import (
"time"

libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

var (
BlockRequestTimeout = time.Second * 20
)

// handleSyncStream handles streams with the <protocol-id>/sync/2 protocol ID
func (s *Service) handleSyncStream(stream libp2pnetwork.Stream) {
if stream == nil {
Expand Down
1 change: 0 additions & 1 deletion dot/node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func TestNewNode(t *testing.T) {
initConfig.Account.Key = "alice"
initConfig.Core.Role = common.FullNodeRole
initConfig.Core.WasmInterpreter = wazero_runtime.Name

initConfig.Log.Digest = "critical"

networkConfig := &network.Config{
Expand Down
14 changes: 10 additions & 4 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"strings"
"time"

cfg "github.com/ChainSafe/gossamer/config"

Expand Down Expand Up @@ -493,6 +494,13 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg Bloc
if err != nil {
return nil, fmt.Errorf("failed to parse sync log level: %w", err)
}

const blockRequestTimeout = time.Second * 20
requestMaker := net.GetRequestResponseProtocol(
network.SyncID,
blockRequestTimeout,
network.MaxBlockResponseSize)

syncCfg := &sync.Config{
LogLvl: syncLogLevel,
Network: net,
Expand All @@ -507,12 +515,10 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg Bloc
SlotDuration: slotDuration,
Telemetry: telemetryMailer,
BadBlocks: genesisData.BadBlocks,
RequestMaker: requestMaker,
}

blockReqRes := net.GetRequestResponseProtocol(network.SyncID, network.BlockRequestTimeout,
network.MaxBlockResponseSize)

return sync.NewService(syncCfg, blockReqRes)
return sync.NewService(syncCfg)
}

func (nodeBuilder) createDigestHandler(st *state.Service) (*digest.Handler, error) {
Expand Down

0 comments on commit ab6650a

Please sign in to comment.