Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/network): request block justifications when near head #1499

Merged
merged 15 commits into from
Apr 5, 2021
16 changes: 8 additions & 8 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,6 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err
return errors.New("genesis hash mismatch")
}

// if peer has higher best block than us, begin syncing
latestHeader, err := s.blockState.BestBlockHeader()
if err != nil {
return err
}

bestBlockNum := big.NewInt(int64(bhs.BestBlockNumber))

np, ok := s.notificationsProtocols[BlockAnnounceMsgType]
if !ok {
// this should never happen.
Expand All @@ -239,6 +231,14 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err

data.handshake = hs

// if peer has higher best block than us, begin syncing
latestHeader, err := s.blockState.BestBlockHeader()
if err != nil {
return err
}

bestBlockNum := big.NewInt(int64(bhs.BestBlockNumber))

// check if peer block number is greater than host block number
if latestHeader.Number.Cmp(bestBlockNum) >= 0 {
return nil
Expand Down
8 changes: 4 additions & 4 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (h *host) bootstrap() {
// peer (gets the already opened outbound message stream or opens a new one).
func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (err error) {
// get outbound stream for given peer
s := h.getStream(p, pid)
s := h.getOutboundStream(p, pid)

// check if stream needs to be opened
if s == nil {
Expand Down Expand Up @@ -286,10 +286,10 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error {
return err
}

// getStream returns the outbound message stream for the given peer or returns
// getOutboundStream returns the outbound message stream for the given peer or returns
// nil if no outbound message stream exists. For each peer, each host opens an
// outbound message stream and writes to the same stream until closed or reset.
func (h *host) getStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Stream) {
func (h *host) getOutboundStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Stream) {
conns := h.h.Network().ConnsToPeer(p)

// loop through connections (only one for now)
Expand All @@ -310,7 +310,7 @@ func (h *host) getStream(p peer.ID, pid protocol.ID) (stream libp2pnetwork.Strea

// closeStream closes a stream open to the peer with the given sub-protocol, if it exists.
func (h *host) closeStream(p peer.ID, pid protocol.ID) {
stream := h.getStream(p, pid)
stream := h.getOutboundStream(p, pid)
if stream != nil {
_ = stream.Close()
}
Expand Down
10 changes: 5 additions & 5 deletions dot/network/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func TestExistingStream(t *testing.T) {
}
require.NoError(t, err)

stream := nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID)
stream := nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID)
require.Nil(t, stream, "node A should not have an outbound stream")

// node A opens the stream to send the first message
Expand All @@ -279,15 +279,15 @@ func TestExistingStream(t *testing.T) {
time.Sleep(TestMessageTimeout)
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A")

stream = nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID)
stream = nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID)
require.NotNil(t, stream, "node A should have an outbound stream")

// node A uses the stream to send a second message
err = nodeA.host.send(addrInfosB[0].ID, nodeB.host.protocolID, testBlockRequestMessage)
require.NoError(t, err)
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A")

stream = nodeA.host.getStream(nodeB.host.id(), nodeB.host.protocolID)
stream = nodeA.host.getOutboundStream(nodeB.host.id(), nodeB.host.protocolID)
require.NotNil(t, stream, "node B should have an outbound stream")

// node B opens the stream to send the first message
Expand All @@ -297,15 +297,15 @@ func TestExistingStream(t *testing.T) {
time.Sleep(TestMessageTimeout)
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B")

stream = nodeB.host.getStream(nodeA.host.id(), nodeB.host.protocolID)
stream = nodeB.host.getOutboundStream(nodeA.host.id(), nodeB.host.protocolID)
require.NotNil(t, stream, "node B should have an outbound stream")

// node B uses the stream to send a second message
err = nodeB.host.send(addrInfosA[0].ID, nodeB.host.protocolID, testBlockRequestMessage)
require.NoError(t, err)
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B")

stream = nodeB.host.getStream(nodeA.host.id(), nodeB.host.protocolID)
stream = nodeB.host.getOutboundStream(nodeA.host.id(), nodeB.host.protocolID)
require.NotNil(t, stream, "node B should have an outbound stream")
}

Expand Down
15 changes: 10 additions & 5 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
return errors.New("message is not NotificationsMessage")
}

logger.Trace("received message on notifications sub-protocol", "protocol", info.protocolID,
"message", msg,
"peer", stream.Conn().RemotePeer(),
)

if msg.IsHandshake() {
logger.Trace("received handshake on notifications sub-protocol", "protocol", info.protocolID,
"message", msg,
"peer", stream.Conn().RemotePeer(),
)

hs, ok := msg.(Handshake)
if !ok {
return errors.New("failed to convert message to Handshake")
Expand Down Expand Up @@ -186,6 +186,11 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
return nil
}

logger.Debug("received message on notifications sub-protocol", "protocol", info.protocolID,
"message", msg,
"peer", stream.Conn().RemotePeer(),
)

err := messageHandler(peer, msg)
if err != nil {
return err
Expand Down
11 changes: 5 additions & 6 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) {
defer info.mapMu.RUnlock()

peer := conn.RemotePeer()
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received {
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { //nolint
info.handshakeData.Store(peer, &handshakeData{
validated: false,
})
Expand Down Expand Up @@ -428,6 +428,9 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID,

info := s.notificationsProtocols[messageID]

decoder := createDecoder(info, handshakeDecoder, messageDecoder)
handlerWithValidate := s.createNotificationsMessageHandler(info, handshakeValidator, messageHandler)

s.host.registerStreamHandlerWithOverwrite(sub, overwriteProtocol, func(stream libp2pnetwork.Stream) {
logger.Trace("received stream", "sub-protocol", sub)
conn := stream.Conn()
Expand All @@ -437,10 +440,6 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID,
}

p := conn.RemotePeer()

decoder := createDecoder(info, handshakeDecoder, messageDecoder)
handlerWithValidate := s.createNotificationsMessageHandler(info, handshakeValidator, messageHandler)

s.readStream(stream, p, decoder, handlerWithValidate)
})

Expand Down Expand Up @@ -537,7 +536,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder
// decode message based on message type
msg, err := decoder(msgBytes[:tot], peer)
if err != nil {
logger.Trace("Failed to decode message from peer", "peer", peer, "err", err)
logger.Trace("failed to decode message from peer", "protocol", stream.Protocol(), "err", err)
continue
}

Expand Down
1 change: 1 addition & 0 deletions dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type BlockState interface {
GenesisHash() common.Hash
HasBlockBody(common.Hash) (bool, error)
GetFinalizedHeader(round, setID uint64) (*types.Header, error)
GetHashByNumber(num *big.Int) (common.Hash, error)
}

// Syncer is implemented by the syncing service
Expand Down
4 changes: 4 additions & 0 deletions dot/network/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,7 @@ func (mbs *MockBlockState) HasBlockBody(common.Hash) (bool, error) {
func (mbs *MockBlockState) GetFinalizedHeader(_, _ uint64) (*types.Header, error) {
return mbs.BestBlockHeader()
}

func (mbs *MockBlockState) GetHashByNumber(_ *big.Int) (common.Hash, error) {
return common.Hash{}, nil
}
Loading