Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 46 additions & 42 deletions services/p2p/server_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"google.golang.org/protobuf/proto"
)

func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) {
func (s *Server) handleBlockTopic(_ context.Context, m []byte, fromID string) {
var (
blockMessage BlockMessage
hash *chainhash.Hash
Expand All @@ -39,11 +39,13 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) {
return
}

if from == blockMessage.PeerID {
s.logger.Infof("[handleBlockTopic] DIRECT block %s from %s", blockMessage.Hash, blockMessage.PeerID)
} else {
s.logger.Infof("[handleBlockTopic] RELAY block %s (originator: %s, via: %s)", blockMessage.Hash, blockMessage.PeerID, from)
// Check that fromID matches the block peer ID
if fromID != blockMessage.PeerID {
// For now, log an error. In the future, we might want to take banning action against peers spoofing other IDs
s.logger.Errorf("[handleBlockTopic] mismatch between fromID (%s) and blockMessage.PeerID (%s)", fromID, blockMessage.PeerID)
return
}
s.logger.Infof("[handleBlockTopic] received block %s fromID %s", blockMessage.Hash, blockMessage.PeerID)

select {
case s.notificationCh <- &notificationMsg{
Expand All @@ -60,17 +62,17 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) {
}

// Ignore our own messages
if s.isOwnMessage(from, blockMessage.PeerID) {
if s.isOwnMessage(fromID, blockMessage.PeerID) {
s.logger.Debugf("[handleBlockTopic] ignoring own block message for %s", blockMessage.Hash)
return
}

now := time.Now().UTC()

// Store the peer ID that sent this block
s.storePeerMapEntry(&s.blockPeerMap, blockMessage.Hash, from, now)
s.storePeerMapEntry(&s.blockPeerMap, blockMessage.Hash, fromID, now)

s.logger.Debugf("[handleBlockTopic] storing peer %s for block %s", from, blockMessage.Hash)
s.logger.Debugf("[handleBlockTopic] storing peer %s for block %s", fromID, blockMessage.Hash)

// Store using the originator's peer ID
if peerID, err := peer.Decode(blockMessage.PeerID); err == nil {
Expand All @@ -79,17 +81,17 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) {
}

// Update last message time for the sender and originator with client name
s.updatePeerLastMessageTime(from, blockMessage.PeerID)
s.updatePeerLastMessageTime(fromID, blockMessage.PeerID)

// Track bytes received from this message
s.updateBytesReceived(from, blockMessage.PeerID, uint64(len(m)))
// Track bytes received fromID this message
s.updateBytesReceived(fromID, blockMessage.PeerID, uint64(len(m)))

// Skip notifications from banned peers
// Skip notifications fromID banned peers
if s.shouldSkipBannedPeer(blockMessage.PeerID, "handleBlockTopic") {
return
}

// Skip notifications from unhealthy peers
// Skip notifications fromID unhealthy peers
if s.shouldSkipUnhealthyPeer(blockMessage.PeerID, "handleBlockTopic") {
return
}
Expand Down Expand Up @@ -123,7 +125,7 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) {
}
}

func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, from string) {
func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, fromID string) {
var (
subtreeMessage SubtreeMessage
hash *chainhash.Hash
Expand All @@ -139,11 +141,13 @@ func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, from string) {
return
}

if from == subtreeMessage.PeerID {
s.logger.Debugf("[handleSubtreeTopic] DIRECT subtree %s from %s", subtreeMessage.Hash, subtreeMessage.PeerID)
} else {
s.logger.Debugf("[handleSubtreeTopic] RELAY subtree %s (originator: %s, via: %s)", subtreeMessage.Hash, subtreeMessage.PeerID, from)
// Check that fromID matches the subtree peer ID
if fromID != subtreeMessage.PeerID {
// For now, log an error. In the future, we might want to take banning action against peers spoofing other IDs
s.logger.Errorf("[handleSubtreeTopic] mismatch between fromID (%s) and subtreeMessage.PeerID (%s)", fromID, subtreeMessage.PeerID)
return
}
s.logger.Debugf("[handleSubtreeTopic] received subtree %s from %s", subtreeMessage.Hash, subtreeMessage.PeerID)

if s.isBlacklistedBaseURL(subtreeMessage.DataHubURL) {
s.logger.Errorf("[handleSubtreeTopic] Blocked subtree notification from blacklisted baseURL: %s", subtreeMessage.DataHubURL)
Expand All @@ -166,25 +170,25 @@ func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, from string) {
}

// Ignore our own messages
if s.isOwnMessage(from, subtreeMessage.PeerID) {
if s.isOwnMessage(fromID, subtreeMessage.PeerID) {
s.logger.Debugf("[handleSubtreeTopic] ignoring own subtree message for %s", subtreeMessage.Hash)
return
}

// Update last message time for the sender and originator with client name
s.updatePeerLastMessageTime(from, subtreeMessage.PeerID)
s.updatePeerLastMessageTime(fromID, subtreeMessage.PeerID)

// Track bytes received from this message
s.updateBytesReceived(from, subtreeMessage.PeerID, uint64(len(m)))
s.updateBytesReceived(fromID, subtreeMessage.PeerID, uint64(len(m)))

// Skip notifications from banned peers
if s.shouldSkipBannedPeer(from, "handleSubtreeTopic") {
s.logger.Debugf("[handleSubtreeTopic] skipping banned peer %s", from)
if s.shouldSkipBannedPeer(fromID, "handleSubtreeTopic") {
s.logger.Debugf("[handleSubtreeTopic] skipping banned peer %s", fromID)
return
}

// Skip notifications from unhealthy peers
if s.shouldSkipUnhealthyPeer(from, "handleSubtreeTopic") {
if s.shouldSkipUnhealthyPeer(fromID, "handleSubtreeTopic") {
return
}

Expand All @@ -195,8 +199,8 @@ func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, from string) {
}

// Store the peer ID that sent this subtree
s.storePeerMapEntry(&s.subtreePeerMap, subtreeMessage.Hash, from, now)
s.logger.Debugf("[handleSubtreeTopic] storing peer %s for subtree %s", from, subtreeMessage.Hash)
s.storePeerMapEntry(&s.subtreePeerMap, subtreeMessage.Hash, fromID, now)
s.logger.Debugf("[handleSubtreeTopic] storing peer %s for subtree %s", fromID, subtreeMessage.Hash)

if s.subtreeKafkaProducerClient != nil { // tests may not set this
msg := &kafkamessage.KafkaSubtreeTopicMessage{
Expand Down Expand Up @@ -267,7 +271,7 @@ func (s *Server) extractHost(urlStr string) string {
return strings.ToLower(host)
}

func (s *Server) handleRejectedTxTopic(_ context.Context, m []byte, from string) {
func (s *Server) handleRejectedTxTopic(_ context.Context, m []byte, fromID string) {
var (
rejectedTxMessage RejectedTxMessage
err error
Expand All @@ -281,37 +285,37 @@ func (s *Server) handleRejectedTxTopic(_ context.Context, m []byte, from string)
return
}

if from == rejectedTxMessage.PeerID {
s.logger.Debugf("[handleRejectedTxTopic] DIRECT rejected tx %s from %s (reason: %s)",
rejectedTxMessage.TxID, rejectedTxMessage.PeerID, rejectedTxMessage.Reason)
} else {
s.logger.Debugf("[handleRejectedTxTopic] RELAY rejected tx %s (originator: %s, via: %s, reason: %s)",
rejectedTxMessage.TxID, rejectedTxMessage.PeerID, from, rejectedTxMessage.Reason)
// Check that fromID matches the rejected tx peer ID
if fromID != rejectedTxMessage.PeerID {
s.logger.Errorf("[handleRejectedTxTopic] peerID does not match fromID: peerID=%s fromID=%s", rejectedTxMessage.PeerID, fromID)
return
}

if s.isOwnMessage(from, rejectedTxMessage.PeerID) {
if s.isOwnMessage(fromID, rejectedTxMessage.PeerID) {
s.logger.Debugf("[handleRejectedTxTopic] ignoring own rejected tx message for %s", rejectedTxMessage.TxID)
return
}
s.logger.Debugf("[handleRejectedTxTopic] received rejected tx %s fromID %s (reason: %s)",
rejectedTxMessage.TxID, rejectedTxMessage.PeerID, rejectedTxMessage.Reason)

// Update last message time with client name
s.updatePeerLastMessageTime(from, rejectedTxMessage.PeerID)
s.updatePeerLastMessageTime(fromID, rejectedTxMessage.PeerID)

// Track bytes received from this message
s.updateBytesReceived(from, rejectedTxMessage.PeerID, uint64(len(m)))
// Track bytes received fromID this message
s.updateBytesReceived(fromID, rejectedTxMessage.PeerID, uint64(len(m)))

if s.shouldSkipBannedPeer(from, "handleRejectedTxTopic") {
if s.shouldSkipBannedPeer(fromID, "handleRejectedTxTopic") {
return
}

// Skip notifications from unhealthy peers
if s.shouldSkipUnhealthyPeer(from, "handleRejectedTxTopic") {
// Skip notifications fromID unhealthy peers
if s.shouldSkipUnhealthyPeer(fromID, "handleRejectedTxTopic") {
return
}

// Rejected TX messages from other peers are informational only.
// Rejected TX messages fromID other peers are informational only.
// They help us understand network state but don't trigger re-broadcasting.
// If we wanted to take action (e.g., remove from our mempool), we would do it here.
// If we wanted to take action (e.g., remove fromID our mempool), we would do it here.
}

// getPeerIDFromDataHubURL finds the peer ID that has the given DataHub URL
Expand Down
Loading