From 671de45fc7c809fa8221f30334c3ea12671dcf8c Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 25 Nov 2025 11:52:09 -0500 Subject: [PATCH 1/2] Check peerID in all subscribed messages --- services/p2p/server_helpers.go | 88 ++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 42 deletions(-) diff --git a/services/p2p/server_helpers.go b/services/p2p/server_helpers.go index 5fa4d043a..768068286 100644 --- a/services/p2p/server_helpers.go +++ b/services/p2p/server_helpers.go @@ -23,7 +23,7 @@ import ( "google.golang.org/protobuf/proto" ) -func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) { +func (s *Server) handleBlockTopic(_ context.Context, m []byte, fromID string) { var ( blockMessage BlockMessage hash *chainhash.Hash @@ -39,11 +39,13 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) { return } - if from == blockMessage.PeerID { - s.logger.Infof("[handleBlockTopic] DIRECT block %s from %s", blockMessage.Hash, blockMessage.PeerID) - } else { - s.logger.Infof("[handleBlockTopic] RELAY block %s (originator: %s, via: %s)", blockMessage.Hash, blockMessage.PeerID, from) + // Check that fromID does matches the subtree peer ID + if fromID != blockMessage.PeerID { + // For now, log an error. In the future, we might want to take banning action against peers spoofing other IDs + s.logger.Errorf("[handleBlockTopic] mismatch between fromID (%s) and blockMessage.PeerID (%s)", fromID, blockMessage.PeerID) + return } + s.logger.Infof("[handleBlockTopic] received block %s fromID %s", blockMessage.Hash, blockMessage.PeerID) select { case s.notificationCh <- ¬ificationMsg{ @@ -60,7 +62,7 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) { } // Ignore our own messages - if s.isOwnMessage(from, blockMessage.PeerID) { + if s.isOwnMessage(fromID, blockMessage.PeerID) { s.logger.Debugf("[handleBlockTopic] ignoring own block message for %s", blockMessage.Hash) return } @@ -68,9 +70,9 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) { now := time.Now().UTC() // Store the peer ID that sent this block - s.storePeerMapEntry(&s.blockPeerMap, blockMessage.Hash, from, now) + s.storePeerMapEntry(&s.blockPeerMap, blockMessage.Hash, fromID, now) - s.logger.Debugf("[handleBlockTopic] storing peer %s for block %s", from, blockMessage.Hash) + s.logger.Debugf("[handleBlockTopic] storing peer %s for block %s", fromID, blockMessage.Hash) // Store using the originator's peer ID if peerID, err := peer.Decode(blockMessage.PeerID); err == nil { @@ -79,17 +81,17 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) { } // Update last message time for the sender and originator with client name - s.updatePeerLastMessageTime(from, blockMessage.PeerID) + s.updatePeerLastMessageTime(fromID, blockMessage.PeerID) - // Track bytes received from this message - s.updateBytesReceived(from, blockMessage.PeerID, uint64(len(m))) + // Track bytes received fromID this message + s.updateBytesReceived(fromID, blockMessage.PeerID, uint64(len(m))) - // Skip notifications from banned peers + // Skip notifications fromID banned peers if s.shouldSkipBannedPeer(blockMessage.PeerID, "handleBlockTopic") { return } - // Skip notifications from unhealthy peers + // Skip notifications fromID unhealthy peers if s.shouldSkipUnhealthyPeer(blockMessage.PeerID, "handleBlockTopic") { return } @@ -123,7 +125,7 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, from string) { } } -func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, from string) { +func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, fromID string) { var ( subtreeMessage SubtreeMessage hash *chainhash.Hash @@ -139,11 +141,13 @@ func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, from string) { return } - if from == subtreeMessage.PeerID { - s.logger.Debugf("[handleSubtreeTopic] DIRECT subtree %s from %s", subtreeMessage.Hash, subtreeMessage.PeerID) - } else { - s.logger.Debugf("[handleSubtreeTopic] RELAY subtree %s (originator: %s, via: %s)", subtreeMessage.Hash, subtreeMessage.PeerID, from) + // Check that fromID does matches the subtree peer ID + if fromID != subtreeMessage.PeerID { + // For now, log an error. In the future, we might want to take banning action against peers spoofing other IDs + s.logger.Errorf("[handleSubtreeTopic] mismatch between fromID (%s) and subtreeMessage.PeerID (%s)", fromID, subtreeMessage.PeerID) + return } + s.logger.Debugf("[handleSubtreeTopic] received subtree %s from %s", subtreeMessage.Hash, subtreeMessage.PeerID) if s.isBlacklistedBaseURL(subtreeMessage.DataHubURL) { s.logger.Errorf("[handleSubtreeTopic] Blocked subtree notification from blacklisted baseURL: %s", subtreeMessage.DataHubURL) @@ -166,25 +170,25 @@ func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, from string) { } // Ignore our own messages - if s.isOwnMessage(from, subtreeMessage.PeerID) { + if s.isOwnMessage(fromID, subtreeMessage.PeerID) { s.logger.Debugf("[handleSubtreeTopic] ignoring own subtree message for %s", subtreeMessage.Hash) return } // Update last message time for the sender and originator with client name - s.updatePeerLastMessageTime(from, subtreeMessage.PeerID) + s.updatePeerLastMessageTime(fromID, subtreeMessage.PeerID) // Track bytes received from this message - s.updateBytesReceived(from, subtreeMessage.PeerID, uint64(len(m))) + s.updateBytesReceived(fromID, subtreeMessage.PeerID, uint64(len(m))) // Skip notifications from banned peers - if s.shouldSkipBannedPeer(from, "handleSubtreeTopic") { - s.logger.Debugf("[handleSubtreeTopic] skipping banned peer %s", from) + if s.shouldSkipBannedPeer(fromID, "handleSubtreeTopic") { + s.logger.Debugf("[handleSubtreeTopic] skipping banned peer %s", fromID) return } // Skip notifications from unhealthy peers - if s.shouldSkipUnhealthyPeer(from, "handleSubtreeTopic") { + if s.shouldSkipUnhealthyPeer(fromID, "handleSubtreeTopic") { return } @@ -195,8 +199,8 @@ func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, from string) { } // Store the peer ID that sent this subtree - s.storePeerMapEntry(&s.subtreePeerMap, subtreeMessage.Hash, from, now) - s.logger.Debugf("[handleSubtreeTopic] storing peer %s for subtree %s", from, subtreeMessage.Hash) + s.storePeerMapEntry(&s.subtreePeerMap, subtreeMessage.Hash, fromID, now) + s.logger.Debugf("[handleSubtreeTopic] storing peer %s for subtree %s", fromID, subtreeMessage.Hash) if s.subtreeKafkaProducerClient != nil { // tests may not set this msg := &kafkamessage.KafkaSubtreeTopicMessage{ @@ -267,7 +271,7 @@ func (s *Server) extractHost(urlStr string) string { return strings.ToLower(host) } -func (s *Server) handleRejectedTxTopic(_ context.Context, m []byte, from string) { +func (s *Server) handleRejectedTxTopic(_ context.Context, m []byte, fromID string) { var ( rejectedTxMessage RejectedTxMessage err error @@ -281,37 +285,37 @@ func (s *Server) handleRejectedTxTopic(_ context.Context, m []byte, from string) return } - if from == rejectedTxMessage.PeerID { - s.logger.Debugf("[handleRejectedTxTopic] DIRECT rejected tx %s from %s (reason: %s)", - rejectedTxMessage.TxID, rejectedTxMessage.PeerID, rejectedTxMessage.Reason) - } else { - s.logger.Debugf("[handleRejectedTxTopic] RELAY rejected tx %s (originator: %s, via: %s, reason: %s)", - rejectedTxMessage.TxID, rejectedTxMessage.PeerID, from, rejectedTxMessage.Reason) + // Check that fromID does matches the rejected tx peer ID + if fromID != rejectedTxMessage.PeerID { + s.logger.Errorf("[handleRejectedTxTopic] peerID does not match fromID: peerID=%s fromID=%s", rejectedTxMessage.PeerID, fromID) + return } - if s.isOwnMessage(from, rejectedTxMessage.PeerID) { + if s.isOwnMessage(fromID, rejectedTxMessage.PeerID) { s.logger.Debugf("[handleRejectedTxTopic] ignoring own rejected tx message for %s", rejectedTxMessage.TxID) return } + s.logger.Debugf("[handleRejectedTxTopic] received rejected tx %s fromID %s (reason: %s)", + rejectedTxMessage.TxID, rejectedTxMessage.PeerID, rejectedTxMessage.Reason) // Update last message time with client name - s.updatePeerLastMessageTime(from, rejectedTxMessage.PeerID) + s.updatePeerLastMessageTime(fromID, rejectedTxMessage.PeerID) - // Track bytes received from this message - s.updateBytesReceived(from, rejectedTxMessage.PeerID, uint64(len(m))) + // Track bytes received fromID this message + s.updateBytesReceived(fromID, rejectedTxMessage.PeerID, uint64(len(m))) - if s.shouldSkipBannedPeer(from, "handleRejectedTxTopic") { + if s.shouldSkipBannedPeer(fromID, "handleRejectedTxTopic") { return } - // Skip notifications from unhealthy peers - if s.shouldSkipUnhealthyPeer(from, "handleRejectedTxTopic") { + // Skip notifications fromID unhealthy peers + if s.shouldSkipUnhealthyPeer(fromID, "handleRejectedTxTopic") { return } - // Rejected TX messages from other peers are informational only. + // Rejected TX messages fromID other peers are informational only. // They help us understand network state but don't trigger re-broadcasting. - // If we wanted to take action (e.g., remove from our mempool), we would do it here. + // If we wanted to take action (e.g., remove fromID our mempool), we would do it here. } // getPeerIDFromDataHubURL finds the peer ID that has the given DataHub URL From d4e71b744f213fe237243a249149304ebd689193 Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 25 Nov 2025 12:18:53 -0500 Subject: [PATCH 2/2] Fix grammar --- services/p2p/server_helpers.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/p2p/server_helpers.go b/services/p2p/server_helpers.go index 768068286..bdf1c4142 100644 --- a/services/p2p/server_helpers.go +++ b/services/p2p/server_helpers.go @@ -39,7 +39,7 @@ func (s *Server) handleBlockTopic(_ context.Context, m []byte, fromID string) { return } - // Check that fromID does matches the subtree peer ID + // Check that fromID matches the block peer ID if fromID != blockMessage.PeerID { // For now, log an error. In the future, we might want to take banning action against peers spoofing other IDs s.logger.Errorf("[handleBlockTopic] mismatch between fromID (%s) and blockMessage.PeerID (%s)", fromID, blockMessage.PeerID) @@ -141,7 +141,7 @@ func (s *Server) handleSubtreeTopic(_ context.Context, m []byte, fromID string) return } - // Check that fromID does matches the subtree peer ID + // Check that fromID matches the subtree peer ID if fromID != subtreeMessage.PeerID { // For now, log an error. In the future, we might want to take banning action against peers spoofing other IDs s.logger.Errorf("[handleSubtreeTopic] mismatch between fromID (%s) and subtreeMessage.PeerID (%s)", fromID, subtreeMessage.PeerID) @@ -285,7 +285,7 @@ func (s *Server) handleRejectedTxTopic(_ context.Context, m []byte, fromID strin return } - // Check that fromID does matches the rejected tx peer ID + // Check that fromID matches the rejected tx peer ID if fromID != rejectedTxMessage.PeerID { s.logger.Errorf("[handleRejectedTxTopic] peerID does not match fromID: peerID=%s fromID=%s", rejectedTxMessage.PeerID, fromID) return