Skip to content

Commit

Permalink
fix(dot/sync): execute p2p handshake when there is no target (#3695)
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed Jan 18, 2024
1 parent e03fa13 commit a9db0ec
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 132 deletions.
2 changes: 2 additions & 0 deletions dot/network/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

var (
ErrNoPeersConnected = errors.New("no peers connected")
ErrReceivedEmptyMessage = errors.New("received empty message")

errCannotValidateHandshake = errors.New("failed to validate handshake")
Expand All @@ -22,4 +23,5 @@ var (
ErrNilStream = errors.New("nil stream")
ErrInvalidLEB128EncodedData = errors.New("invalid LEB128 encoded data")
ErrGreaterThanMaxSize = errors.New("greater than maximum size")
ErrStreamReset = errors.New("stream reset")
)
2 changes: 1 addition & 1 deletion dot/network/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message,
func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg Message) (err error) {
defer func() {
err := stream.Close()
if err != nil {
if err != nil && err.Error() != ErrStreamReset.Error() {
logger.Warnf("failed to close stream: %s", err)
}
}()
Expand Down
3 changes: 2 additions & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ func closeOutboundStream(info *notificationsProtocol, peerID peer.ID, stream net
)

info.peersData.deleteOutboundHandshakeData(peerID)

err := stream.Close()
if err != nil {
if err != nil && err.Error() != ErrStreamReset.Error() {
logger.Warnf("failed to close outbound stream: %s", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/request_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (rrp *RequestResponseProtocol) Do(to peer.ID, req Message, res ResponseMess

defer func() {
err := stream.Close()
if err != nil {
if err != nil && err.Error() != ErrStreamReset.Error() {
logger.Warnf("failed to close stream: %s", err)
}
}()
Expand Down
41 changes: 41 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/internal/mdns"
"github.com/ChainSafe/gossamer/internal/metrics"
Expand Down Expand Up @@ -742,3 +743,43 @@ func (s *Service) startProcessingMsg() {
}
}
}

func (s *Service) BlockAnnounceHandshake(header *types.Header) error {
peers := s.host.peers()
if len(peers) == 0 {
return ErrNoPeersConnected
}

protocol, ok := s.notificationsProtocols[blockAnnounceMsgType]
if !ok {
panic("block announce message type not found")
}

handshake, err := protocol.getHandshake()
if err != nil {
return fmt.Errorf("getting handshake: %w", err)
}

wg := sync.WaitGroup{}
wg.Add(len(peers))
for _, p := range peers {
protocol.peersData.setMutex(p)

go func(p peer.ID) {
defer wg.Done()
stream, err := s.sendHandshake(p, handshake, protocol)
if err != nil {
logger.Tracef("sending block announce handshake: %s", err)
return
}

response := protocol.peersData.getOutboundHandshakeData(p)
if response.received && response.validated {
closeOutboundStream(protocol, p, stream)
}
}(p)
}

wg.Wait()
return nil
}
2 changes: 1 addition & 1 deletion dot/network/stream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (sm *streamManager) cleanupStreams() {

if time.Since(lastReceived) > cleanupStreamInterval {
err := stream.Close()
if err != nil {
if err != nil && err.Error() != ErrStreamReset.Error() {
logger.Warnf("failed to close inactive stream: %s", err)
}
delete(sm.streamData, id)
Expand Down
2 changes: 1 addition & 1 deletion dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er

defer func() {
err := stream.Close()
if err != nil {
if err != nil && err.Error() != ErrStreamReset.Error() {
logger.Warnf("failed to close stream: %s", err)
}
}()
Expand Down

0 comments on commit a9db0ec

Please sign in to comment.