From c819059e6e2e27f5e672392450bd1682e1d86cd8 Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Thu, 20 Nov 2025 10:18:50 +0100 Subject: [PATCH 1/9] =?UTF-8?q?Revert=20"Revert=20"test:=20add=20multi-nod?= =?UTF-8?q?e=20daemon=20test=20infrastructure=20with=20peer=20inj=E2=80=A6?= =?UTF-8?q?"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 44a9b6e7cda85ad8639235a9f3ed2e47bffbbcf5. --- daemon/test_daemon.go | 46 ++++++- services/p2p/server_helpers.go | 18 +++ services/p2p/sync_coordinator.go | 2 +- settings/interface.go | 3 + settings/settings.go | 1 + .../daemon/ready/multi_node_inject_test.go | 124 ++++++++++++++++++ 6 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 test/e2e/daemon/ready/multi_node_inject_test.go diff --git a/daemon/test_daemon.go b/daemon/test_daemon.go index 820af76680..c7930ce37e 100644 --- a/daemon/test_daemon.go +++ b/daemon/test_daemon.go @@ -47,6 +47,7 @@ import ( "github.com/bsv-blockchain/teranode/test/utils/wait" "github.com/bsv-blockchain/teranode/ulogger" "github.com/bsv-blockchain/teranode/util" + libp2pPeer "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" tc "github.com/testcontainers/testcontainers-go/modules/compose" @@ -461,8 +462,13 @@ func NewTestDaemon(t *testing.T, opts TestOptions) *TestDaemon { blockAssembler, ok := blockAssemblyService.(*blockassembly.BlockAssembly) require.True(t, ok) + assetURL := fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort) + if appSettings.Asset.APIPrefix != "" { + assetURL += appSettings.Asset.APIPrefix + } + return &TestDaemon{ - AssetURL: fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort), + AssetURL: assetURL, BlockAssembler: blockAssembler.GetBlockAssembler(), BlockAssemblyClient: blockAssemblyClient, BlockValidationClient: blockValidationClient, @@ -1295,6 +1301,25 @@ func (td *TestDaemon) WaitForBlockStateChange(t *testing.T, expectedBlock *model } } +func (td *TestDaemon) WaitForBlockhash(t *testing.T, blockHash *chainhash.Hash, timeout time.Duration) { + ctx, cancel := context.WithTimeout(td.Ctx, timeout) + defer cancel() + + for { + select { + case <-ctx.Done(): + t.Errorf("Timeout waiting for block with hash %s", blockHash.String()) + return + default: + _, err := td.BlockchainClient.GetBlock(ctx, blockHash) + if err == nil { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} + func (td *TestDaemon) WaitForBlock(t *testing.T, expectedBlock *model.Block, timeout time.Duration, skipVerifyChain ...bool) { ctx, cancel := context.WithTimeout(td.Ctx, timeout) defer cancel() @@ -1906,6 +1931,25 @@ func (td *TestDaemon) DisconnectFromPeer(t *testing.T, peer *TestDaemon) { require.NoError(t, err, "Failed to disconnect from peer") } +func (td *TestDaemon) InjectPeer(t *testing.T, peer *TestDaemon) { + peerID, err := libp2pPeer.Decode(peer.Settings.P2P.PeerID) + require.NoError(t, err, "Failed to decode peer ID") + + p2pService, err := td.d.ServiceManager.GetService("P2P") + require.NoError(t, err, "Failed to get P2P service") + + p2pServer, ok := p2pService.(*p2p.Server) + require.True(t, ok, "Failed to cast P2P service to Server") + + // Inject my peer info to other peer... + header, meta, err := peer.BlockchainClient.GetBestBlockHeader(td.Ctx) + require.NoError(t, err, "Failed to get best block header") + + p2pServer.InjectPeerForTesting(peerID, peer.Settings.Context, peer.AssetURL, meta.Height, header.Hash().String()) + + t.Logf("Injected peer %s into %s's registry (PeerID: %s)", peer.Settings.Context, td.Settings.Context, peerID) +} + func peerAddress(peer *TestDaemon) string { return fmt.Sprintf("/dns/127.0.0.1/tcp/%d/p2p/%s", peer.Settings.P2P.Port, peer.Settings.P2P.PeerID) } diff --git a/services/p2p/server_helpers.go b/services/p2p/server_helpers.go index 9bf55a2494..d3312b55d9 100644 --- a/services/p2p/server_helpers.go +++ b/services/p2p/server_helpers.go @@ -523,6 +523,24 @@ func (s *Server) addConnectedPeer(peerID peer.ID, clientName string) { } } +// InjectPeerForTesting directly injects a peer into the registry for testing purposes. +// This method allows deterministic peer setup without requiring actual P2P network connections. +func (s *Server) InjectPeerForTesting(peerID peer.ID, clientName, dataHubURL string, height uint32, blockHash string) { + s.addConnectedPeer(peerID, clientName) + s.updateDataHubURL(peerID, dataHubURL) + s.updateBlockHash(peerID, blockHash) + s.updatePeerHeight(peerID, int32(height)) + + if s.peerRegistry != nil { + s.peerRegistry.UpdateURLResponsiveness(peerID, true) + s.peerRegistry.UpdateStorage(peerID, "full") + } + + if s.syncCoordinator != nil { + s.syncCoordinator.UpdatePeerInfo(peerID, int32(height), blockHash, dataHubURL) + } +} + func (s *Server) removePeer(peerID peer.ID) { if s.peerRegistry != nil { // Mark as disconnected before removing diff --git a/services/p2p/sync_coordinator.go b/services/p2p/sync_coordinator.go index 83e334822c..1e6eb0a743 100644 --- a/services/p2p/sync_coordinator.go +++ b/services/p2p/sync_coordinator.go @@ -494,7 +494,7 @@ func (sc *SyncCoordinator) logCandidateList(candidates []*PeerInfo) { func (sc *SyncCoordinator) periodicEvaluation(ctx context.Context) { defer sc.wg.Done() - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(sc.settings.P2P.SyncCoordinatorPeriodicEvaluationInterval) defer ticker.Stop() for { diff --git a/settings/interface.go b/settings/interface.go index 1c6dc18b8e..afce24a90b 100644 --- a/settings/interface.go +++ b/settings/interface.go @@ -451,6 +451,9 @@ type P2PSettings struct { // Node mode configuration (full vs pruned) AllowPrunedNodeFallback bool // If true, fall back to pruned nodes when no full nodes available (default: true). Selects youngest pruned node (smallest height) to minimize UTXO pruning risk. + + // This is the time we trigger a periodic evaluation in the sync coordinator + SyncCoordinatorPeriodicEvaluationInterval time.Duration } type CoinbaseSettings struct { diff --git a/settings/settings.go b/settings/settings.go index 40f1e6020a..d7df011ebc 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -400,6 +400,7 @@ func NewSettings(alternativeContext ...string) *Settings { AllowPrivateIPs: getBool("p2p_allow_private_ips", false, alternativeContext...), // Default false for production safety // Full/pruned node selection configuration AllowPrunedNodeFallback: getBool("p2p_allow_pruned_node_fallback", true, alternativeContext...), + SyncCoordinatorPeriodicEvaluationInterval: getDuration("p2p_sync_coordinator_periodic_evaluation_interval", 30*time.Second, alternativeContext...), }, Coinbase: CoinbaseSettings{ DB: getString("coinbaseDB", "", alternativeContext...), diff --git a/test/e2e/daemon/ready/multi_node_inject_test.go b/test/e2e/daemon/ready/multi_node_inject_test.go new file mode 100644 index 0000000000..c1ab383878 --- /dev/null +++ b/test/e2e/daemon/ready/multi_node_inject_test.go @@ -0,0 +1,124 @@ +package smoke + +import ( + "fmt" + "net/url" + "testing" + "time" + + "github.com/bsv-blockchain/teranode/daemon" + "github.com/bsv-blockchain/teranode/services/blockchain" + "github.com/bsv-blockchain/teranode/settings" + "github.com/bsv-blockchain/teranode/test/utils/aerospike" + "github.com/stretchr/testify/require" +) + +func getAerospikeInstance(t *testing.T) *url.URL { + urlStr, teardownFn, err := aerospike.InitAerospikeContainer() + require.NoError(t, err, "Failed to setup Aerospike container") + + url, err := url.Parse(urlStr) + require.NoError(t, err, "Failed to parse UTXO store URL") + + t.Cleanup(func() { + _ = teardownFn() + }) + + return url +} + +func getTestDaemon(t *testing.T, settingsContext string, aerospikeURL *url.URL) *daemon.TestDaemon { + d := daemon.NewTestDaemon(t, daemon.TestOptions{ + EnableRPC: true, + EnableP2P: true, + EnableValidator: true, + SettingsContext: settingsContext, + SettingsOverrideFunc: func(s *settings.Settings) { + s.P2P.PeerCacheDir = t.TempDir() + s.UtxoStore.UtxoStore = aerospikeURL + s.ChainCfgParams.CoinbaseMaturity = 2 + s.P2P.SyncCoordinatorPeriodicEvaluationInterval = 1 * time.Second + }, + FSMState: blockchain.FSMStateRUNNING, + // EnableFullLogging: true, + }) + + t.Cleanup(func() { + d.Stop(t) + }) + + return d +} + +func printPeerRegistry(t *testing.T, td *daemon.TestDaemon) { + registry, err := td.P2PClient.GetPeerRegistry(t.Context()) + require.NoError(t, err) + + fmt.Printf("\nPeer %s (%s) registry:\n", td.Settings.ClientName, td.Settings.P2P.PeerID) + + for _, peerInfo := range registry { + fmt.Printf("\tName: %s (%s): Height=%d, BlockHash=%s, DataHubURL=%s", peerInfo.ClientName, peerInfo.ID, peerInfo.Height, peerInfo.BlockHash, peerInfo.DataHubURL) + } + + fmt.Println() + fmt.Println() +} + +// This test creates 2 nodes, and nodeA mines 3 blocks. Then we inject nodeA into nodeB, and nodeB should sync up to nodeA's height. +func Test_NodeB_Inject_After_NodeA_Mined(t *testing.T) { + SharedTestLock.Lock() + defer SharedTestLock.Unlock() + + sharedAerospike := getAerospikeInstance(t) + nodeA := getTestDaemon(t, "docker.host.teranode1.daemon", sharedAerospike) + nodeB := getTestDaemon(t, "docker.host.teranode2.daemon", sharedAerospike) + + t.Log(" Creating initial blockchain: [Genesis] -> [Block1] -> [Block2] -> [Block3]") + coinbaseTx := nodeA.MineToMaturityAndGetSpendableCoinbaseTx(t, nodeA.Ctx) + t.Logf(" Coinbase transaction available for spending: %s", coinbaseTx.TxIDChainHash().String()) + + // nodeA.InjectPeer(t, nodeB) + nodeB.InjectPeer(t, nodeA) + + printPeerRegistry(t, nodeB) + + nodeABestBlockHeader, _, err := nodeA.BlockchainClient.GetBestBlockHeader(nodeA.Ctx) + require.NoError(t, err) + + nodeB.WaitForBlockhash(t, nodeABestBlockHeader.Hash(), 10*time.Second) + +} + +// This test creates 2 nodes, and nodeB injects nodeA before nodeA mines any blocks. Then we mine 3 blocks on nodeA, and nodeB should sync up to nodeA's height. +func Test_NodeB_Inject_Before_NodeA_Mined(t *testing.T) { + SharedTestLock.Lock() + defer SharedTestLock.Unlock() + + sharedAerospike := getAerospikeInstance(t) + nodeA := getTestDaemon(t, "docker.host.teranode1.daemon", sharedAerospike) + nodeB := getTestDaemon(t, "docker.host.teranode2.daemon", sharedAerospike) + + // nodeA.InjectPeer(t, nodeB) + nodeB.InjectPeer(t, nodeA) + + printPeerRegistry(t, nodeB) + + // go func() { + // for { + // time.Sleep(5 * time.Second) + // printPeerRegistry(t, nodeB) + // } + // }() + + t.Log(" Creating initial blockchain: [Genesis] -> [Block1] -> [Block2] -> [Block3]") + coinbaseTx := nodeA.MineToMaturityAndGetSpendableCoinbaseTx(t, nodeA.Ctx) + t.Logf(" Coinbase transaction available for spending: %s", coinbaseTx.TxIDChainHash().String()) + + printPeerRegistry(t, nodeB) + + nodeABestBlockHeader, _, err := nodeA.BlockchainClient.GetBestBlockHeader(nodeA.Ctx) + require.NoError(t, err) + + nodeB.WaitForBlockhash(t, nodeABestBlockHeader.Hash(), 26*time.Second) + +} From faff0acbfc0926b7bbc9396754295fc3c7ffa238 Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Thu, 20 Nov 2025 10:25:45 +0100 Subject: [PATCH 2/9] Removed non-existant method --- services/p2p/server_helpers.go | 1 - 1 file changed, 1 deletion(-) diff --git a/services/p2p/server_helpers.go b/services/p2p/server_helpers.go index d3312b55d9..628a9aae48 100644 --- a/services/p2p/server_helpers.go +++ b/services/p2p/server_helpers.go @@ -532,7 +532,6 @@ func (s *Server) InjectPeerForTesting(peerID peer.ID, clientName, dataHubURL str s.updatePeerHeight(peerID, int32(height)) if s.peerRegistry != nil { - s.peerRegistry.UpdateURLResponsiveness(peerID, true) s.peerRegistry.UpdateStorage(peerID, "full") } From 4c6e01719f7db5de94e6f6f30f0f5395efe7e19b Mon Sep 17 00:00:00 2001 From: Simon Ordish <71426+ordishs@users.noreply.github.com> Date: Fri, 21 Nov 2025 18:05:10 +0100 Subject: [PATCH 3/9] refactor(rpc,p2p,daemon): improve peer client clarity and type safety - Rename peerClient to legacyP2PClient for clarity across RPC and daemon - Change InjectPeerForTesting to use *chainhash.Hash instead of string - Refactor handleGetpeerinfo with concurrent peer fetching using WaitGroup - Simplify InjectPeerForTesting using peerRegistry.Put - Use test context for better lifecycle management - Adjust p2p notification logging levels (Block=Info, others=Debug) --- daemon/daemon_services.go | 4 +- daemon/test_daemon.go | 4 +- services/p2p/Server.go | 8 +- services/p2p/server_helpers.go | 17 +-- services/rpc/Server.go | 8 +- services/rpc/handlers.go | 137 ++++++++---------- services/rpc/handlers_additional_test.go | 112 +++++++------- .../daemon/ready/multi_node_inject_test.go | 6 + 8 files changed, 139 insertions(+), 157 deletions(-) diff --git a/daemon/daemon_services.go b/daemon/daemon_services.go index 486b7782f5..f8e150a4f4 100644 --- a/daemon/daemon_services.go +++ b/daemon/daemon_services.go @@ -449,7 +449,7 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett return err } - peerClient, err := peer.NewClient(ctx, createLogger("rpc"), appSettings) + legacyPeerClient, err := peer.NewClient(ctx, createLogger("rpc"), appSettings) if err != nil { return err } @@ -486,7 +486,7 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett // Create the RPC server with the necessary parts var rpcServer *rpc.RPCServer - rpcServer, err = rpc.NewServer(createLogger(loggerRPC), appSettings, blockchainClient, blockValidationClient, utxoStore, blockAssemblyClient, peerClient, p2pClient, txStore, validatorClient) + rpcServer, err = rpc.NewServer(createLogger(loggerRPC), appSettings, blockchainClient, blockValidationClient, utxoStore, blockAssemblyClient, legacyPeerClient, p2pClient, txStore, validatorClient) if err != nil { return err } diff --git a/daemon/test_daemon.go b/daemon/test_daemon.go index 1f3af5c185..209678f91b 100644 --- a/daemon/test_daemon.go +++ b/daemon/test_daemon.go @@ -110,7 +110,7 @@ func (je *JSONError) Error() string { // NewTestDaemon creates a new TestDaemon instance with the provided options. func NewTestDaemon(t *testing.T, opts TestOptions) *TestDaemon { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(t.Context()) var ( composeDependencies tc.ComposeStack @@ -1904,7 +1904,7 @@ func (td *TestDaemon) InjectPeer(t *testing.T, peer *TestDaemon) { header, meta, err := peer.BlockchainClient.GetBestBlockHeader(td.Ctx) require.NoError(t, err, "Failed to get best block header") - p2pServer.InjectPeerForTesting(peerID, peer.Settings.Context, peer.AssetURL, meta.Height, header.Hash().String()) + p2pServer.InjectPeerForTesting(peerID, peer.Settings.Context, peer.AssetURL, meta.Height, header.Hash()) t.Logf("Injected peer %s into %s's registry (PeerID: %s)", peer.Settings.Context, td.Settings.Context, peerID) } diff --git a/services/p2p/Server.go b/services/p2p/Server.go index 62d5174a50..f9626c11a3 100644 --- a/services/p2p/Server.go +++ b/services/p2p/Server.go @@ -1390,15 +1390,19 @@ func (s *Server) processBlockchainNotification(ctx context.Context, notification return errors.NewError(fmt.Sprintf("error getting chainhash from notification hash %s: %%w", notification.Hash), err) } - s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String()) - switch notification.Type { case model.NotificationType_Block: + s.logger.Infof("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String()) return s.handleBlockNotification(ctx, hash) // These handlers return wrapped errors + case model.NotificationType_Subtree: + s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String()) return s.handleSubtreeNotification(ctx, hash) + case model.NotificationType_PeerFailure: + s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String()) return s.handlePeerFailureNotification(ctx, notification) + default: s.logger.Warnf("[processBlockchainNotification] Received unhandled notification type: %s for hash %s", notification.Type, hash.String()) } diff --git a/services/p2p/server_helpers.go b/services/p2p/server_helpers.go index 6f6c0515c0..adf88fba95 100644 --- a/services/p2p/server_helpers.go +++ b/services/p2p/server_helpers.go @@ -482,19 +482,14 @@ func (s *Server) addConnectedPeer(peerID peer.ID, clientName string, height uint // InjectPeerForTesting directly injects a peer into the registry for testing purposes. // This method allows deterministic peer setup without requiring actual P2P network connections. -func (s *Server) InjectPeerForTesting(peerID peer.ID, clientName, dataHubURL string, height uint32, blockHash string) { - s.addConnectedPeer(peerID, clientName) - s.updateDataHubURL(peerID, dataHubURL) - s.updateBlockHash(peerID, blockHash) - s.updatePeerHeight(peerID, int32(height)) - - if s.peerRegistry != nil { - s.peerRegistry.UpdateStorage(peerID, "full") +func (s *Server) InjectPeerForTesting(peerID peer.ID, clientName, dataHubURL string, height uint32, blockHash *chainhash.Hash) { + if s.peerRegistry == nil { + return } - if s.syncCoordinator != nil { - s.syncCoordinator.UpdatePeerInfo(peerID, int32(height), blockHash, dataHubURL) - } + s.peerRegistry.Put(peerID, clientName, height, blockHash, dataHubURL) + + s.peerRegistry.UpdateStorage(peerID, "full") } func (s *Server) removePeer(peerID peer.ID) { diff --git a/services/rpc/Server.go b/services/rpc/Server.go index 019807566f..3838a13548 100644 --- a/services/rpc/Server.go +++ b/services/rpc/Server.go @@ -669,9 +669,9 @@ type RPCServer struct { // Used for mining-related RPC commands like getminingcandidate and generate blockAssemblyClient blockassembly.ClientI - // peerClient provides access to legacy peer network services + // legacyP2PClient provides access to legacy peer network services // Used for peer management and information retrieval - peerClient peer.ClientI + legacyP2PClient peer.ClientI // p2pClient provides access to the P2P network services // Used for modern peer management and network operations @@ -1384,7 +1384,7 @@ func (s *RPCServer) Start(ctx context.Context, readyCh chan<- struct{}) error { // Returns: // - *RPCServer: Configured server instance ready for initialization // - error: Any error encountered during configuration -func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainClient blockchain.ClientI, blockValidationClient blockvalidation.Interface, utxoStore utxo.Store, blockAssemblyClient blockassembly.ClientI, peerClient peer.ClientI, p2pClient p2p.ClientI, txStore blob.Store, validatorClient validator.Interface) (*RPCServer, error) { +func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainClient blockchain.ClientI, blockValidationClient blockvalidation.Interface, utxoStore utxo.Store, blockAssemblyClient blockassembly.ClientI, legacyPeerClient peer.ClientI, p2pClient p2p.ClientI, txStore blob.Store, validatorClient validator.Interface) (*RPCServer, error) { initPrometheusMetrics() assetHTTPAddress := tSettings.Asset.HTTPAddress @@ -1409,7 +1409,7 @@ func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainCl helpCacher: newHelpCacher(), utxoStore: utxoStore, blockAssemblyClient: blockAssemblyClient, - peerClient: peerClient, + legacyP2PClient: legacyPeerClient, p2pClient: p2pClient, txStore: txStore, validatorClient: validatorClient, diff --git a/services/rpc/handlers.go b/services/rpc/handlers.go index 0a43847333..d2e1e3d8f5 100644 --- a/services/rpc/handlers.go +++ b/services/rpc/handlers.go @@ -37,6 +37,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/bsv-blockchain/go-bt/v2" @@ -1098,90 +1099,58 @@ func handleGetpeerinfo(ctx context.Context, s *RPCServer, cmd interface{}, _ <-c return cached, nil } - peerCount := 0 + // use a goroutine with select to handle timeouts more reliably + type legacyPeerResult struct { + resp *peer_api.GetPeersResponse + err error + } - var legacyPeerInfo *peer_api.GetPeersResponse + legacyResultCh := make(chan legacyPeerResult, 1) - var newPeerInfo []*p2p.PeerInfo + type newPeerResult struct { + resp []*p2p.PeerInfo + err error + } + newPeerResultCh := make(chan newPeerResult, 1) - // get legacy peer info - if s.peerClient != nil { - // create a timeout context to prevent hanging if legacy peer service is not responding - peerCtx, cancel := context.WithTimeout(ctx, s.settings.RPC.ClientCallTimeout) - defer cancel() + // create a timeout context to prevent hanging if legacy peer service is not responding + peerCtx, cancel := context.WithTimeout(ctx, s.settings.RPC.ClientCallTimeout) + defer cancel() - // use a goroutine with select to handle timeouts more reliably - type peerResult struct { - resp *peer_api.GetPeersResponse - err error - } - resultCh := make(chan peerResult, 1) + wg := sync.WaitGroup{} + // get legacy peer info + if s.legacyP2PClient != nil { + wg.Add(1) go func() { - resp, err := s.peerClient.GetPeers(peerCtx) - resultCh <- peerResult{resp: resp, err: err} + defer wg.Done() + resp, err := s.legacyP2PClient.GetPeers(peerCtx) + legacyResultCh <- legacyPeerResult{resp: resp, err: err} }() - select { - case result := <-resultCh: - if result.err != nil { - // not critical - legacy service may not be running, so log as info - s.logger.Infof("error getting legacy peer info: %v", result.err) - } else { - legacyPeerInfo = result.resp - } - case <-peerCtx.Done(): - // timeout reached - s.logger.Infof("timeout getting legacy peer info from peer service") - } - } - if legacyPeerInfo != nil { - peerCount += len(legacyPeerInfo.Peers) } // get new peer info from p2p service if s.p2pClient != nil { - // create a timeout context to prevent hanging if p2p service is not responding - peerCtx, cancel := context.WithTimeout(ctx, s.settings.RPC.ClientCallTimeout) - defer cancel() - - // use a goroutine with select to handle timeouts more reliably - type peerResult struct { - resp []*p2p.PeerInfo - err error - } - resultCh := make(chan peerResult, 1) - + wg.Add(1) go func() { - resp, err := s.p2pClient.GetPeers(peerCtx) - resultCh <- peerResult{resp: resp, err: err} + defer wg.Done() + resp, err := s.p2pClient.GetPeerRegistry(peerCtx) + newPeerResultCh <- newPeerResult{resp: resp, err: err} }() - - select { - case result := <-resultCh: - if result.err != nil { - // not critical - p2p service may not be running, so log as warning - s.logger.Warnf("error getting new peer info: %v", result.err) - } else { - newPeerInfo = result.resp - } - case <-peerCtx.Done(): - // timeout reached - s.logger.Warnf("timeout getting new peer info from p2p service") - } } - if newPeerInfo != nil { - peerCount += len(newPeerInfo) - for _, np := range newPeerInfo { - s.logger.Debugf("new peer: %v", np) - } - } + wg.Wait() + + infos := make([]*bsvjson.GetPeerInfoResult, 0, 32) - infos := make([]*bsvjson.GetPeerInfoResult, 0, peerCount) + legacyResult := <-legacyResultCh - if legacyPeerInfo != nil { - for _, p := range legacyPeerInfo.Peers { + if legacyResult.err != nil { + // not critical - legacy service may not be running, so log as info + s.logger.Infof("error getting legacy peer info: %v", legacyResult.err) + } else { + for _, p := range legacyResult.resp.Peers { info := &bsvjson.GetPeerInfoResult{ ID: p.Id, Addr: p.Addr, @@ -1215,8 +1184,16 @@ func handleGetpeerinfo(ctx context.Context, s *RPCServer, cmd interface{}, _ <-c } } - if newPeerInfo != nil { - for _, p := range newPeerInfo { + newResult := <-newPeerResultCh + if newResult.err != nil { + // not critical - p2p service may not be running, so log as info + s.logger.Infof("error getting new peer info: %v", newResult.err) + } else { + for _, np := range newResult.resp { + s.logger.Debugf("new peer: %v", np) + } + + for _, p := range newResult.resp { info := &bsvjson.GetPeerInfoResult{ PeerID: p.ID.String(), Addr: p.DataHubURL, // Use DataHub URL as address @@ -1577,7 +1554,7 @@ func handleGetInfo(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan } var legacyConnections *peer_api.GetPeersResponse - if s.peerClient != nil { + if s.legacyP2PClient != nil { // create a timeout context to prevent hanging if legacy peer service is not responding peerCtx, cancel := context.WithTimeout(ctx, s.settings.RPC.ClientCallTimeout) defer cancel() @@ -1590,7 +1567,7 @@ func handleGetInfo(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan resultCh := make(chan peerResult, 1) go func() { - resp, err := s.peerClient.GetPeers(peerCtx) + resp, err := s.legacyP2PClient.GetPeers(peerCtx) resultCh <- peerResult{resp: resp, err: err} }() @@ -1967,8 +1944,8 @@ func handleIsBanned(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan // check if legacy peer service is available var peerBanned bool - if s.peerClient != nil { - isBannedLegacy, err := s.peerClient.IsBanned(ctx, &peer_api.IsBannedRequest{IpOrSubnet: c.IPOrSubnet}) + if s.legacyP2PClient != nil { + isBannedLegacy, err := s.legacyP2PClient.IsBanned(ctx, &peer_api.IsBannedRequest{IpOrSubnet: c.IPOrSubnet}) if err != nil { s.logger.Warnf("Failed to check if banned in legacy peer service: %v", err) } else { @@ -2050,7 +2027,7 @@ func handleListBanned(ctx context.Context, s *RPCServer, cmd interface{}, _ <-ch } // check if legacy peer service is available - if s.peerClient != nil { + if s.legacyP2PClient != nil { // Create a timeout context for the legacy peer client call legacyCtx, cancel := context.WithTimeout(ctx, clientCallTimeout) defer cancel() @@ -2063,7 +2040,7 @@ func handleListBanned(ctx context.Context, s *RPCServer, cmd interface{}, _ <-ch resultCh := make(chan legacyResult, 1) go func() { - resp, err := s.peerClient.ListBanned(legacyCtx, &emptypb.Empty{}) + resp, err := s.legacyP2PClient.ListBanned(legacyCtx, &emptypb.Empty{}) resultCh <- legacyResult{resp: resp, err: err} }() @@ -2127,8 +2104,8 @@ func handleClearBanned(ctx context.Context, s *RPCServer, cmd interface{}, _ <-c } } // check if legacy peer service is available - if s.peerClient != nil { - _, err := s.peerClient.ClearBanned(ctx, &emptypb.Empty{}) + if s.legacyP2PClient != nil { + _, err := s.legacyP2PClient.ClearBanned(ctx, &emptypb.Empty{}) if err != nil { s.logger.Warnf("Failed to clear banned list in legacy peer service: %v", err) } @@ -2224,10 +2201,10 @@ func handleSetBan(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan s } // and ban legacy peers - if s.peerClient != nil { + if s.legacyP2PClient != nil { until := expirationTimeInt64 - resp, err := s.peerClient.BanPeer(ctx, &peer_api.BanPeerRequest{ + resp, err := s.legacyP2PClient.BanPeer(ctx, &peer_api.BanPeerRequest{ Addr: c.IPOrSubnet, Until: until, }) @@ -2265,8 +2242,8 @@ func handleSetBan(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan s } // unban legacy peer - if s.peerClient != nil { - resp, err := s.peerClient.UnbanPeer(ctx, &peer_api.UnbanPeerRequest{ + if s.legacyP2PClient != nil { + resp, err := s.legacyP2PClient.UnbanPeer(ctx, &peer_api.UnbanPeerRequest{ Addr: c.IPOrSubnet, }) if err != nil { diff --git a/services/rpc/handlers_additional_test.go b/services/rpc/handlers_additional_test.go index 52549592ca..c11bc81427 100644 --- a/services/rpc/handlers_additional_test.go +++ b/services/rpc/handlers_additional_test.go @@ -3034,8 +3034,8 @@ func TestHandleIsBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeer, + logger: logger, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3067,9 +3067,9 @@ func TestHandleIsBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3101,9 +3101,9 @@ func TestHandleIsBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3135,9 +3135,9 @@ func TestHandleIsBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3258,8 +3258,8 @@ func TestHandleListBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeer, + logger: logger, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3291,9 +3291,9 @@ func TestHandleListBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3329,9 +3329,9 @@ func TestHandleListBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3414,9 +3414,9 @@ func TestHandleClearBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3444,9 +3444,9 @@ func TestHandleClearBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3491,8 +3491,8 @@ func TestHandleClearBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeer, + logger: logger, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3536,9 +3536,9 @@ func TestHandleClearBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3626,9 +3626,9 @@ func TestHandleSetBanComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3729,9 +3729,9 @@ func TestHandleSetBanComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3847,9 +3847,9 @@ func TestHandleSetBanComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3886,9 +3886,9 @@ func TestHandleSetBanComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3982,7 +3982,7 @@ func TestHandleGetInfoComprehensive(t *testing.T) { blockchainClient: mockBlockchainClient, blockAssemblyClient: mockBlockAssemblyClient, p2pClient: mockP2PClient, - peerClient: mockLegacyPeerClient, + legacyP2PClient: mockLegacyPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -4208,7 +4208,7 @@ func TestHandleGetInfoComprehensive(t *testing.T) { logger: logger, blockchainClient: mockBlockchainClient, blockAssemblyClient: mockBlockAssemblyClient, - peerClient: mockLegacyPeerClient, + legacyP2PClient: mockLegacyPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -4842,8 +4842,8 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, + logger: logger, + legacyP2PClient: mockPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -4989,9 +4989,9 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, - p2pClient: mockP2PClient, + logger: logger, + legacyP2PClient: mockPeerClient, + p2pClient: mockP2PClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -5051,8 +5051,8 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, + logger: logger, + legacyP2PClient: mockPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -5086,9 +5086,9 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, - p2pClient: mockP2PClient, + logger: logger, + legacyP2PClient: mockPeerClient, + p2pClient: mockP2PClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -5128,8 +5128,8 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, + logger: logger, + legacyP2PClient: mockPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ diff --git a/test/e2e/daemon/ready/multi_node_inject_test.go b/test/e2e/daemon/ready/multi_node_inject_test.go index c1ab383878..32735dbbac 100644 --- a/test/e2e/daemon/ready/multi_node_inject_test.go +++ b/test/e2e/daemon/ready/multi_node_inject_test.go @@ -91,6 +91,8 @@ func Test_NodeB_Inject_After_NodeA_Mined(t *testing.T) { // This test creates 2 nodes, and nodeB injects nodeA before nodeA mines any blocks. Then we mine 3 blocks on nodeA, and nodeB should sync up to nodeA's height. func Test_NodeB_Inject_Before_NodeA_Mined(t *testing.T) { + t.SkipNow() + SharedTestLock.Lock() defer SharedTestLock.Unlock() @@ -116,6 +118,10 @@ func Test_NodeB_Inject_Before_NodeA_Mined(t *testing.T) { printPeerRegistry(t, nodeB) + s, err := nodeB.CallRPC(t.Context(), "getpeerinfo", []interface{}{}) + require.NoError(t, err) + t.Logf(" NodeB peer info: %s", s) + nodeABestBlockHeader, _, err := nodeA.BlockchainClient.GetBestBlockHeader(nodeA.Ctx) require.NoError(t, err) From 8bb3344ce912b63fd66a01a30b1aa7b214f730a1 Mon Sep 17 00:00:00 2001 From: Simon Ordish <71426+ordishs@users.noreply.github.com> Date: Fri, 21 Nov 2025 18:11:24 +0100 Subject: [PATCH 4/9] Fixed gci linting --- settings/settings.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/settings/settings.go b/settings/settings.go index d7df011ebc..3816fa9503 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -399,7 +399,7 @@ func NewSettings(alternativeContext ...string) *Settings { EnableMDNS: getBool("p2p_enable_mdns", false, alternativeContext...), // Default false to prevent LAN scanning AllowPrivateIPs: getBool("p2p_allow_private_ips", false, alternativeContext...), // Default false for production safety // Full/pruned node selection configuration - AllowPrunedNodeFallback: getBool("p2p_allow_pruned_node_fallback", true, alternativeContext...), + AllowPrunedNodeFallback: getBool("p2p_allow_pruned_node_fallback", true, alternativeContext...), SyncCoordinatorPeriodicEvaluationInterval: getDuration("p2p_sync_coordinator_periodic_evaluation_interval", 30*time.Second, alternativeContext...), }, Coinbase: CoinbaseSettings{ From e4b8324c6480d460dfcb36fad9f2400145febf74 Mon Sep 17 00:00:00 2001 From: Simon Ordish <71426+ordishs@users.noreply.github.com> Date: Thu, 27 Nov 2025 10:15:52 +0100 Subject: [PATCH 5/9] fix(rpc,daemon,test): resolve channel deadlock and test issues - Fix critical channel deadlock in handleGetpeerinfo when clients are nil - Replace busy-wait with ticker in WaitForBlockhash test helper - Change t.Errorf to t.Fatalf to properly halt tests on timeout - Add missing newline in multi_node_inject_test.go Printf statement --- daemon/test_daemon.go | 8 +- services/rpc/handlers.go | 143 +++++++++--------- .../daemon/ready/multi_node_inject_test.go | 2 +- 3 files changed, 80 insertions(+), 73 deletions(-) diff --git a/daemon/test_daemon.go b/daemon/test_daemon.go index 209678f91b..7033022c0d 100644 --- a/daemon/test_daemon.go +++ b/daemon/test_daemon.go @@ -1304,17 +1304,19 @@ func (td *TestDaemon) WaitForBlockhash(t *testing.T, blockHash *chainhash.Hash, ctx, cancel := context.WithTimeout(td.Ctx, timeout) defer cancel() + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { select { case <-ctx.Done(): t.Errorf("Timeout waiting for block with hash %s", blockHash.String()) - return - default: + t.FailNow() + case <-ticker.C: _, err := td.BlockchainClient.GetBlock(ctx, blockHash) if err == nil { return } - time.Sleep(100 * time.Millisecond) } } } diff --git a/services/rpc/handlers.go b/services/rpc/handlers.go index 133d34e173..ed9f3f6edd 100644 --- a/services/rpc/handlers.go +++ b/services/rpc/handlers.go @@ -1118,19 +1118,21 @@ func handleGetpeerinfo(ctx context.Context, s *RPCServer, cmd interface{}, _ <-c wg := sync.WaitGroup{} + hasLegacyClient := s.legacyP2PClient != nil + hasP2PClient := s.p2pClient != nil + // get legacy peer info - if s.legacyP2PClient != nil { + if hasLegacyClient { wg.Add(1) go func() { defer wg.Done() resp, err := s.legacyP2PClient.GetPeers(peerCtx) legacyResultCh <- legacyPeerResult{resp: resp, err: err} }() - } // get new peer info from p2p service - if s.p2pClient != nil { + if hasP2PClient { wg.Add(1) go func() { defer wg.Done() @@ -1143,74 +1145,77 @@ func handleGetpeerinfo(ctx context.Context, s *RPCServer, cmd interface{}, _ <-c infos := make([]*bsvjson.GetPeerInfoResult, 0, 32) - legacyResult := <-legacyResultCh - - if legacyResult.err != nil { - // not critical - legacy service may not be running, so log as info - s.logger.Infof("error getting legacy peer info: %v", legacyResult.err) - } else { - for _, p := range legacyResult.resp.Peers { - info := &bsvjson.GetPeerInfoResult{ - ID: p.Id, - Addr: p.Addr, - AddrLocal: p.AddrLocal, - // Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)), - ServicesStr: p.Services, - // RelayTxes: !p.IsTxRelayDisabled(), - LastSend: p.LastSend, - LastRecv: p.LastRecv, - BytesSent: p.BytesSent, - BytesRecv: p.BytesReceived, - ConnTime: p.ConnTime, - PingTime: float64(p.PingTime), - TimeOffset: p.TimeOffset, - Version: p.Version, - SubVer: p.SubVer, - Inbound: p.Inbound, - StartingHeight: p.StartingHeight, - CurrentHeight: p.CurrentHeight, - BanScore: p.Banscore, - Whitelisted: p.Whitelisted, - FeeFilter: p.FeeFilter, - // SyncNode: p.ID == syncPeerID, + if hasLegacyClient { + legacyResult := <-legacyResultCh + if legacyResult.err != nil { + // not critical - legacy service may not be running, so log as info + s.logger.Infof("error getting legacy peer info: %v", legacyResult.err) + } else { + for _, p := range legacyResult.resp.Peers { + info := &bsvjson.GetPeerInfoResult{ + ID: p.Id, + Addr: p.Addr, + AddrLocal: p.AddrLocal, + // Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)), + ServicesStr: p.Services, + // RelayTxes: !p.IsTxRelayDisabled(), + LastSend: p.LastSend, + LastRecv: p.LastRecv, + BytesSent: p.BytesSent, + BytesRecv: p.BytesReceived, + ConnTime: p.ConnTime, + PingTime: float64(p.PingTime), + TimeOffset: p.TimeOffset, + Version: p.Version, + SubVer: p.SubVer, + Inbound: p.Inbound, + StartingHeight: p.StartingHeight, + CurrentHeight: p.CurrentHeight, + BanScore: p.Banscore, + Whitelisted: p.Whitelisted, + FeeFilter: p.FeeFilter, + // SyncNode: p.ID == syncPeerID, + } + // if p.ToPeer().LastPingNonce() != 0 { + // wait := float64(time.Since(p.LastPingTime).Nanoseconds()) + // // We actually want microseconds. + // info.PingWait = wait / 1000 + // } + infos = append(infos, info) } - // if p.ToPeer().LastPingNonce() != 0 { - // wait := float64(time.Since(p.LastPingTime).Nanoseconds()) - // // We actually want microseconds. - // info.PingWait = wait / 1000 - // } - infos = append(infos, info) - } - } - - newResult := <-newPeerResultCh - if newResult.err != nil { - // not critical - p2p service may not be running, so log as info - s.logger.Infof("error getting new peer info: %v", newResult.err) - } else { - for _, np := range newResult.resp { - s.logger.Debugf("new peer: %v", np) - } - - for _, p := range newResult.resp { - info := &bsvjson.GetPeerInfoResult{ - PeerID: p.ID.String(), - Addr: p.DataHubURL, // Use DataHub URL as address - SubVer: p.ClientName, - CurrentHeight: int32(p.Height), - StartingHeight: int32(p.Height), // Use current height as starting height - BanScore: int32(p.BanScore), - BytesRecv: p.BytesReceived, - BytesSent: 0, // P2P doesn't track bytes sent currently - ConnTime: p.ConnectedAt.Unix(), - TimeOffset: 0, // P2P doesn't track time offset - PingTime: p.AvgResponseTime.Seconds(), - Version: 0, // P2P doesn't track protocol version - LastSend: p.LastMessageTime.Unix(), // Last time we sent/received any message - LastRecv: p.LastBlockTime.Unix(), // Last time we received a block - Inbound: p.IsConnected, // Whether peer is currently connected + } + } + + if hasP2PClient { + newResult := <-newPeerResultCh + if newResult.err != nil { + // not critical - p2p service may not be running, so log as info + s.logger.Infof("error getting new peer info: %v", newResult.err) + } else { + for _, np := range newResult.resp { + s.logger.Debugf("new peer: %v", np) + } + + for _, p := range newResult.resp { + info := &bsvjson.GetPeerInfoResult{ + PeerID: p.ID.String(), + Addr: p.DataHubURL, // Use DataHub URL as address + SubVer: p.ClientName, + CurrentHeight: int32(p.Height), + StartingHeight: int32(p.Height), // Use current height as starting height + BanScore: int32(p.BanScore), + BytesRecv: p.BytesReceived, + BytesSent: 0, // P2P doesn't track bytes sent currently + ConnTime: p.ConnectedAt.Unix(), + TimeOffset: 0, // P2P doesn't track time offset + PingTime: p.AvgResponseTime.Seconds(), + Version: 0, // P2P doesn't track protocol version + LastSend: p.LastMessageTime.Unix(), // Last time we sent/received any message + LastRecv: p.LastBlockTime.Unix(), // Last time we received a block + Inbound: p.IsConnected, // Whether peer is currently connected + } + infos = append(infos, info) } - infos = append(infos, info) } } diff --git a/test/e2e/daemon/ready/multi_node_inject_test.go b/test/e2e/daemon/ready/multi_node_inject_test.go index 32735dbbac..196eca8f06 100644 --- a/test/e2e/daemon/ready/multi_node_inject_test.go +++ b/test/e2e/daemon/ready/multi_node_inject_test.go @@ -57,7 +57,7 @@ func printPeerRegistry(t *testing.T, td *daemon.TestDaemon) { fmt.Printf("\nPeer %s (%s) registry:\n", td.Settings.ClientName, td.Settings.P2P.PeerID) for _, peerInfo := range registry { - fmt.Printf("\tName: %s (%s): Height=%d, BlockHash=%s, DataHubURL=%s", peerInfo.ClientName, peerInfo.ID, peerInfo.Height, peerInfo.BlockHash, peerInfo.DataHubURL) + fmt.Printf("\tName: %s (%s): Height=%d, BlockHash=%s, DataHubURL=%s\n", peerInfo.ClientName, peerInfo.ID, peerInfo.Height, peerInfo.BlockHash, peerInfo.DataHubURL) } fmt.Println() From fd2edb681eee5529f9090dc06750a079056f3248 Mon Sep 17 00:00:00 2001 From: Simon Ordish <71426+ordishs@users.noreply.github.com> Date: Thu, 27 Nov 2025 10:32:49 +0100 Subject: [PATCH 6/9] fix(daemon): add explicit return after t.FailNow in WaitForBlockhash --- daemon/test_daemon.go | 1 + 1 file changed, 1 insertion(+) diff --git a/daemon/test_daemon.go b/daemon/test_daemon.go index 7033022c0d..268ba0908d 100644 --- a/daemon/test_daemon.go +++ b/daemon/test_daemon.go @@ -1312,6 +1312,7 @@ func (td *TestDaemon) WaitForBlockhash(t *testing.T, blockHash *chainhash.Hash, case <-ctx.Done(): t.Errorf("Timeout waiting for block with hash %s", blockHash.String()) t.FailNow() + return case <-ticker.C: _, err := td.BlockchainClient.GetBlock(ctx, blockHash) if err == nil { From 8434b8c4fb8d2ef5511603dea549c6426ad947d2 Mon Sep 17 00:00:00 2001 From: Simon Ordish <71426+ordishs@users.noreply.github.com> Date: Thu, 27 Nov 2025 12:26:33 +0100 Subject: [PATCH 7/9] refactor(daemon): simplify WaitForBlockhash using require.Eventually --- daemon/test_daemon.go | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/daemon/test_daemon.go b/daemon/test_daemon.go index 268ba0908d..98c96f0318 100644 --- a/daemon/test_daemon.go +++ b/daemon/test_daemon.go @@ -1301,25 +1301,10 @@ func (td *TestDaemon) WaitForBlockStateChange(t *testing.T, expectedBlock *model } func (td *TestDaemon) WaitForBlockhash(t *testing.T, blockHash *chainhash.Hash, timeout time.Duration) { - ctx, cancel := context.WithTimeout(td.Ctx, timeout) - defer cancel() - - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - t.Errorf("Timeout waiting for block with hash %s", blockHash.String()) - t.FailNow() - return - case <-ticker.C: - _, err := td.BlockchainClient.GetBlock(ctx, blockHash) - if err == nil { - return - } - } - } + require.Eventually(t, func() bool { + _, err := td.BlockchainClient.GetBlock(td.Ctx, blockHash) + return err == nil + }, timeout, 100*time.Millisecond, "Timeout waiting for block with hash %s", blockHash.String()) } func (td *TestDaemon) WaitForBlock(t *testing.T, expectedBlock *model.Block, timeout time.Duration, skipVerifyChain ...bool) { From 0357899c18c33df456ca56baf9bda2cdfdd013a6 Mon Sep 17 00:00:00 2001 From: Simon Ordish <71426+ordishs@users.noreply.github.com> Date: Thu, 27 Nov 2025 12:34:58 +0100 Subject: [PATCH 8/9] fix(rpc): correct mock setup in TestHandleGetpeerinfoComprehensive Changed getPeersFunc to getPeerRegistryFunc in test mocks to match the actual method being called by handleGetpeerinfo after the channel deadlock fix. --- services/rpc/handlers_additional_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/rpc/handlers_additional_test.go b/services/rpc/handlers_additional_test.go index a2238d3034..a7ea12b8e1 100644 --- a/services/rpc/handlers_additional_test.go +++ b/services/rpc/handlers_additional_test.go @@ -4912,7 +4912,7 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { t.Run("p2p client with stats", func(t *testing.T) { // Create mock p2p client mockP2PClient := &mockP2PClient{ - getPeersFunc: func(ctx context.Context) ([]*p2p.PeerInfo, error) { + getPeerRegistryFunc: func(ctx context.Context) ([]*p2p.PeerInfo, error) { peerID, err := peer.Decode("12D3KooWL1NF6fdTJ9cucEuwvuX8V8KtpJZZnUE4umdLBuK15eUZ") require.NoError(t, err, "Failed to decode peer ID") return []*p2p.PeerInfo{ @@ -4995,7 +4995,7 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } mockP2PClient := &mockP2PClient{ - getPeersFunc: func(ctx context.Context) ([]*p2p.PeerInfo, error) { + getPeerRegistryFunc: func(ctx context.Context) ([]*p2p.PeerInfo, error) { peerID, err := peer.Decode("12D3KooWJZZnUE4umdLBuK15eUZL1NF6fdTJ9cucEuwvuX8V8Ktp") require.NoError(t, err, "Failed to decode peer ID") return []*p2p.PeerInfo{ From 1ec2ed0606f22e03fb77fc9df14b7b5b2a14b515 Mon Sep 17 00:00:00 2001 From: Simon Ordish <71426+ordishs@users.noreply.github.com> Date: Thu, 27 Nov 2025 13:03:03 +0100 Subject: [PATCH 9/9] fix(p2p): prevent panic from zero interval in SyncCoordinator Add defensive check in periodicEvaluation to use default 30s interval if SyncCoordinatorPeriodicEvaluationInterval is zero or negative, preventing time.NewTicker panic. --- services/p2p/sync_coordinator.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/services/p2p/sync_coordinator.go b/services/p2p/sync_coordinator.go index c771104025..de681f9c24 100644 --- a/services/p2p/sync_coordinator.go +++ b/services/p2p/sync_coordinator.go @@ -503,7 +503,13 @@ func (sc *SyncCoordinator) logCandidateList(candidates []*PeerInfo) { func (sc *SyncCoordinator) periodicEvaluation(ctx context.Context) { defer sc.wg.Done() - ticker := time.NewTicker(sc.settings.P2P.SyncCoordinatorPeriodicEvaluationInterval) + interval := sc.settings.P2P.SyncCoordinatorPeriodicEvaluationInterval + if interval <= 0 { + sc.logger.Warnf("[SyncCoordinator] Invalid periodic evaluation interval %v, using default 30s", interval) + interval = 30 * time.Second + } + + ticker := time.NewTicker(interval) defer ticker.Stop() for {