diff --git a/daemon/daemon_services.go b/daemon/daemon_services.go index 11b4d4c92..ed6edee11 100644 --- a/daemon/daemon_services.go +++ b/daemon/daemon_services.go @@ -452,7 +452,7 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett return err } - peerClient, err := peer.NewClient(ctx, createLogger("rpc"), appSettings) + legacyPeerClient, err := peer.NewClient(ctx, createLogger("rpc"), appSettings) if err != nil { return err } @@ -489,7 +489,7 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett // Create the RPC server with the necessary parts var rpcServer *rpc.RPCServer - rpcServer, err = rpc.NewServer(createLogger(loggerRPC), appSettings, blockchainClient, blockValidationClient, utxoStore, blockAssemblyClient, peerClient, p2pClient, txStore, validatorClient) + rpcServer, err = rpc.NewServer(createLogger(loggerRPC), appSettings, blockchainClient, blockValidationClient, utxoStore, blockAssemblyClient, legacyPeerClient, p2pClient, txStore, validatorClient) if err != nil { return err } diff --git a/daemon/test_daemon.go b/daemon/test_daemon.go index 4f889d7bf..98c96f031 100644 --- a/daemon/test_daemon.go +++ b/daemon/test_daemon.go @@ -47,6 +47,7 @@ import ( "github.com/bsv-blockchain/teranode/test/utils/wait" "github.com/bsv-blockchain/teranode/ulogger" "github.com/bsv-blockchain/teranode/util" + libp2pPeer "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" tc "github.com/testcontainers/testcontainers-go/modules/compose" @@ -109,7 +110,7 @@ func (je *JSONError) Error() string { // NewTestDaemon creates a new TestDaemon instance with the provided options. func NewTestDaemon(t *testing.T, opts TestOptions) *TestDaemon { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(t.Context()) var ( composeDependencies tc.ComposeStack @@ -460,8 +461,13 @@ func NewTestDaemon(t *testing.T, opts TestOptions) *TestDaemon { blockAssembler, ok := blockAssemblyService.(*blockassembly.BlockAssembly) require.True(t, ok) + assetURL := fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort) + if appSettings.Asset.APIPrefix != "" { + assetURL += appSettings.Asset.APIPrefix + } + return &TestDaemon{ - AssetURL: fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort), + AssetURL: assetURL, BlockAssembler: blockAssembler.GetBlockAssembler(), BlockAssemblyClient: blockAssemblyClient, BlockValidationClient: blockValidationClient, @@ -1294,6 +1300,13 @@ func (td *TestDaemon) WaitForBlockStateChange(t *testing.T, expectedBlock *model } } +func (td *TestDaemon) WaitForBlockhash(t *testing.T, blockHash *chainhash.Hash, timeout time.Duration) { + require.Eventually(t, func() bool { + _, err := td.BlockchainClient.GetBlock(td.Ctx, blockHash) + return err == nil + }, timeout, 100*time.Millisecond, "Timeout waiting for block with hash %s", blockHash.String()) +} + func (td *TestDaemon) WaitForBlock(t *testing.T, expectedBlock *model.Block, timeout time.Duration, skipVerifyChain ...bool) { ctx, cancel := context.WithTimeout(td.Ctx, timeout) defer cancel() @@ -1865,6 +1878,25 @@ func (td *TestDaemon) DisconnectFromPeer(t *testing.T, peer *TestDaemon) { require.NoError(t, err, "Failed to disconnect from peer") } +func (td *TestDaemon) InjectPeer(t *testing.T, peer *TestDaemon) { + peerID, err := libp2pPeer.Decode(peer.Settings.P2P.PeerID) + require.NoError(t, err, "Failed to decode peer ID") + + p2pService, err := td.d.ServiceManager.GetService("P2P") + require.NoError(t, err, "Failed to get P2P service") + + p2pServer, ok := p2pService.(*p2p.Server) + require.True(t, ok, "Failed to cast P2P service to Server") + + // Inject my peer info to other peer... + header, meta, err := peer.BlockchainClient.GetBestBlockHeader(td.Ctx) + require.NoError(t, err, "Failed to get best block header") + + p2pServer.InjectPeerForTesting(peerID, peer.Settings.Context, peer.AssetURL, meta.Height, header.Hash()) + + t.Logf("Injected peer %s into %s's registry (PeerID: %s)", peer.Settings.Context, td.Settings.Context, peerID) +} + func peerAddress(peer *TestDaemon) string { return fmt.Sprintf("/dns/127.0.0.1/tcp/%d/p2p/%s", peer.Settings.P2P.Port, peer.Settings.P2P.PeerID) } diff --git a/services/p2p/Server.go b/services/p2p/Server.go index 62d5174a5..f9626c11a 100644 --- a/services/p2p/Server.go +++ b/services/p2p/Server.go @@ -1390,15 +1390,19 @@ func (s *Server) processBlockchainNotification(ctx context.Context, notification return errors.NewError(fmt.Sprintf("error getting chainhash from notification hash %s: %%w", notification.Hash), err) } - s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String()) - switch notification.Type { case model.NotificationType_Block: + s.logger.Infof("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String()) return s.handleBlockNotification(ctx, hash) // These handlers return wrapped errors + case model.NotificationType_Subtree: + s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String()) return s.handleSubtreeNotification(ctx, hash) + case model.NotificationType_PeerFailure: + s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String()) return s.handlePeerFailureNotification(ctx, notification) + default: s.logger.Warnf("[processBlockchainNotification] Received unhandled notification type: %s for hash %s", notification.Type, hash.String()) } diff --git a/services/p2p/server_helpers.go b/services/p2p/server_helpers.go index 5fa4d043a..adf88fba9 100644 --- a/services/p2p/server_helpers.go +++ b/services/p2p/server_helpers.go @@ -480,6 +480,18 @@ func (s *Server) addConnectedPeer(peerID peer.ID, clientName string, height uint } } +// InjectPeerForTesting directly injects a peer into the registry for testing purposes. +// This method allows deterministic peer setup without requiring actual P2P network connections. +func (s *Server) InjectPeerForTesting(peerID peer.ID, clientName, dataHubURL string, height uint32, blockHash *chainhash.Hash) { + if s.peerRegistry == nil { + return + } + + s.peerRegistry.Put(peerID, clientName, height, blockHash, dataHubURL) + + s.peerRegistry.UpdateStorage(peerID, "full") +} + func (s *Server) removePeer(peerID peer.ID) { if s.peerRegistry != nil { // Mark as disconnected before removing diff --git a/services/p2p/sync_coordinator.go b/services/p2p/sync_coordinator.go index 1aa37eefe..de681f9c2 100644 --- a/services/p2p/sync_coordinator.go +++ b/services/p2p/sync_coordinator.go @@ -503,7 +503,13 @@ func (sc *SyncCoordinator) logCandidateList(candidates []*PeerInfo) { func (sc *SyncCoordinator) periodicEvaluation(ctx context.Context) { defer sc.wg.Done() - ticker := time.NewTicker(30 * time.Second) + interval := sc.settings.P2P.SyncCoordinatorPeriodicEvaluationInterval + if interval <= 0 { + sc.logger.Warnf("[SyncCoordinator] Invalid periodic evaluation interval %v, using default 30s", interval) + interval = 30 * time.Second + } + + ticker := time.NewTicker(interval) defer ticker.Stop() for { diff --git a/services/rpc/Server.go b/services/rpc/Server.go index 019807566..3838a1354 100644 --- a/services/rpc/Server.go +++ b/services/rpc/Server.go @@ -669,9 +669,9 @@ type RPCServer struct { // Used for mining-related RPC commands like getminingcandidate and generate blockAssemblyClient blockassembly.ClientI - // peerClient provides access to legacy peer network services + // legacyP2PClient provides access to legacy peer network services // Used for peer management and information retrieval - peerClient peer.ClientI + legacyP2PClient peer.ClientI // p2pClient provides access to the P2P network services // Used for modern peer management and network operations @@ -1384,7 +1384,7 @@ func (s *RPCServer) Start(ctx context.Context, readyCh chan<- struct{}) error { // Returns: // - *RPCServer: Configured server instance ready for initialization // - error: Any error encountered during configuration -func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainClient blockchain.ClientI, blockValidationClient blockvalidation.Interface, utxoStore utxo.Store, blockAssemblyClient blockassembly.ClientI, peerClient peer.ClientI, p2pClient p2p.ClientI, txStore blob.Store, validatorClient validator.Interface) (*RPCServer, error) { +func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainClient blockchain.ClientI, blockValidationClient blockvalidation.Interface, utxoStore utxo.Store, blockAssemblyClient blockassembly.ClientI, legacyPeerClient peer.ClientI, p2pClient p2p.ClientI, txStore blob.Store, validatorClient validator.Interface) (*RPCServer, error) { initPrometheusMetrics() assetHTTPAddress := tSettings.Asset.HTTPAddress @@ -1409,7 +1409,7 @@ func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainCl helpCacher: newHelpCacher(), utxoStore: utxoStore, blockAssemblyClient: blockAssemblyClient, - peerClient: peerClient, + legacyP2PClient: legacyPeerClient, p2pClient: p2pClient, txStore: txStore, validatorClient: validatorClient, diff --git a/services/rpc/handlers.go b/services/rpc/handlers.go index 58656ac40..ed9f3f6ed 100644 --- a/services/rpc/handlers.go +++ b/services/rpc/handlers.go @@ -37,6 +37,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/bsv-blockchain/go-bt/v2" @@ -1097,143 +1098,124 @@ func handleGetpeerinfo(ctx context.Context, s *RPCServer, cmd interface{}, _ <-c return cached, nil } - peerCount := 0 + // use a goroutine with select to handle timeouts more reliably + type legacyPeerResult struct { + resp *peer_api.GetPeersResponse + err error + } - var legacyPeerInfo *peer_api.GetPeersResponse + legacyResultCh := make(chan legacyPeerResult, 1) - var newPeerInfo []*p2p.PeerInfo + type newPeerResult struct { + resp []*p2p.PeerInfo + err error + } + newPeerResultCh := make(chan newPeerResult, 1) - // get legacy peer info - if s.peerClient != nil { - // create a timeout context to prevent hanging if legacy peer service is not responding - peerCtx, cancel := context.WithTimeout(ctx, s.settings.RPC.ClientCallTimeout) - defer cancel() + // create a timeout context to prevent hanging if legacy peer service is not responding + peerCtx, cancel := context.WithTimeout(ctx, s.settings.RPC.ClientCallTimeout) + defer cancel() - // use a goroutine with select to handle timeouts more reliably - type peerResult struct { - resp *peer_api.GetPeersResponse - err error - } - resultCh := make(chan peerResult, 1) + wg := sync.WaitGroup{} + hasLegacyClient := s.legacyP2PClient != nil + hasP2PClient := s.p2pClient != nil + + // get legacy peer info + if hasLegacyClient { + wg.Add(1) go func() { - resp, err := s.peerClient.GetPeers(peerCtx) - resultCh <- peerResult{resp: resp, err: err} + defer wg.Done() + resp, err := s.legacyP2PClient.GetPeers(peerCtx) + legacyResultCh <- legacyPeerResult{resp: resp, err: err} }() - - select { - case result := <-resultCh: - if result.err != nil { - // not critical - legacy service may not be running, so log as info - s.logger.Infof("error getting legacy peer info: %v", result.err) - } else { - legacyPeerInfo = result.resp - } - case <-peerCtx.Done(): - // timeout reached - s.logger.Infof("timeout getting legacy peer info from peer service") - } - } - if legacyPeerInfo != nil { - peerCount += len(legacyPeerInfo.Peers) } // get new peer info from p2p service - if s.p2pClient != nil { - // create a timeout context to prevent hanging if p2p service is not responding - peerCtx, cancel := context.WithTimeout(ctx, s.settings.RPC.ClientCallTimeout) - defer cancel() - - // use a goroutine with select to handle timeouts more reliably - type peerResult struct { - resp []*p2p.PeerInfo - err error - } - resultCh := make(chan peerResult, 1) - + if hasP2PClient { + wg.Add(1) go func() { - resp, err := s.p2pClient.GetPeers(peerCtx) - resultCh <- peerResult{resp: resp, err: err} + defer wg.Done() + resp, err := s.p2pClient.GetPeerRegistry(peerCtx) + newPeerResultCh <- newPeerResult{resp: resp, err: err} }() + } - select { - case result := <-resultCh: - if result.err != nil { - // not critical - p2p service may not be running, so log as warning - s.logger.Warnf("error getting new peer info: %v", result.err) - } else { - newPeerInfo = result.resp + wg.Wait() + + infos := make([]*bsvjson.GetPeerInfoResult, 0, 32) + + if hasLegacyClient { + legacyResult := <-legacyResultCh + if legacyResult.err != nil { + // not critical - legacy service may not be running, so log as info + s.logger.Infof("error getting legacy peer info: %v", legacyResult.err) + } else { + for _, p := range legacyResult.resp.Peers { + info := &bsvjson.GetPeerInfoResult{ + ID: p.Id, + Addr: p.Addr, + AddrLocal: p.AddrLocal, + // Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)), + ServicesStr: p.Services, + // RelayTxes: !p.IsTxRelayDisabled(), + LastSend: p.LastSend, + LastRecv: p.LastRecv, + BytesSent: p.BytesSent, + BytesRecv: p.BytesReceived, + ConnTime: p.ConnTime, + PingTime: float64(p.PingTime), + TimeOffset: p.TimeOffset, + Version: p.Version, + SubVer: p.SubVer, + Inbound: p.Inbound, + StartingHeight: p.StartingHeight, + CurrentHeight: p.CurrentHeight, + BanScore: p.Banscore, + Whitelisted: p.Whitelisted, + FeeFilter: p.FeeFilter, + // SyncNode: p.ID == syncPeerID, + } + // if p.ToPeer().LastPingNonce() != 0 { + // wait := float64(time.Since(p.LastPingTime).Nanoseconds()) + // // We actually want microseconds. + // info.PingWait = wait / 1000 + // } + infos = append(infos, info) } - case <-peerCtx.Done(): - // timeout reached - s.logger.Warnf("timeout getting new peer info from p2p service") - } - } - if newPeerInfo != nil { - peerCount += len(newPeerInfo) - - for _, np := range newPeerInfo { - s.logger.Debugf("new peer: %v", np) - } - } - - infos := make([]*bsvjson.GetPeerInfoResult, 0, peerCount) - - if legacyPeerInfo != nil { - for _, p := range legacyPeerInfo.Peers { - info := &bsvjson.GetPeerInfoResult{ - ID: p.Id, - Addr: p.Addr, - AddrLocal: p.AddrLocal, - // Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)), - ServicesStr: p.Services, - // RelayTxes: !p.IsTxRelayDisabled(), - LastSend: p.LastSend, - LastRecv: p.LastRecv, - BytesSent: p.BytesSent, - BytesRecv: p.BytesReceived, - ConnTime: p.ConnTime, - PingTime: float64(p.PingTime), - TimeOffset: p.TimeOffset, - Version: p.Version, - SubVer: p.SubVer, - Inbound: p.Inbound, - StartingHeight: p.StartingHeight, - CurrentHeight: p.CurrentHeight, - BanScore: p.Banscore, - Whitelisted: p.Whitelisted, - FeeFilter: p.FeeFilter, - // SyncNode: p.ID == syncPeerID, + } + } + + if hasP2PClient { + newResult := <-newPeerResultCh + if newResult.err != nil { + // not critical - p2p service may not be running, so log as info + s.logger.Infof("error getting new peer info: %v", newResult.err) + } else { + for _, np := range newResult.resp { + s.logger.Debugf("new peer: %v", np) } - // if p.ToPeer().LastPingNonce() != 0 { - // wait := float64(time.Since(p.LastPingTime).Nanoseconds()) - // // We actually want microseconds. - // info.PingWait = wait / 1000 - // } - infos = append(infos, info) - } - } - - if newPeerInfo != nil { - for _, p := range newPeerInfo { - info := &bsvjson.GetPeerInfoResult{ - PeerID: p.ID.String(), - Addr: p.DataHubURL, // Use DataHub URL as address - SubVer: p.ClientName, - CurrentHeight: int32(p.Height), - StartingHeight: int32(p.Height), // Use current height as starting height - BanScore: int32(p.BanScore), - BytesRecv: p.BytesReceived, - BytesSent: 0, // P2P doesn't track bytes sent currently - ConnTime: p.ConnectedAt.Unix(), - TimeOffset: 0, // P2P doesn't track time offset - PingTime: p.AvgResponseTime.Seconds(), - Version: 0, // P2P doesn't track protocol version - LastSend: p.LastMessageTime.Unix(), // Last time we sent/received any message - LastRecv: p.LastBlockTime.Unix(), // Last time we received a block - Inbound: p.IsConnected, // Whether peer is currently connected + + for _, p := range newResult.resp { + info := &bsvjson.GetPeerInfoResult{ + PeerID: p.ID.String(), + Addr: p.DataHubURL, // Use DataHub URL as address + SubVer: p.ClientName, + CurrentHeight: int32(p.Height), + StartingHeight: int32(p.Height), // Use current height as starting height + BanScore: int32(p.BanScore), + BytesRecv: p.BytesReceived, + BytesSent: 0, // P2P doesn't track bytes sent currently + ConnTime: p.ConnectedAt.Unix(), + TimeOffset: 0, // P2P doesn't track time offset + PingTime: p.AvgResponseTime.Seconds(), + Version: 0, // P2P doesn't track protocol version + LastSend: p.LastMessageTime.Unix(), // Last time we sent/received any message + LastRecv: p.LastBlockTime.Unix(), // Last time we received a block + Inbound: p.IsConnected, // Whether peer is currently connected + } + infos = append(infos, info) } - infos = append(infos, info) } } @@ -1576,7 +1558,7 @@ func handleGetInfo(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan } var legacyConnections *peer_api.GetPeersResponse - if s.peerClient != nil { + if s.legacyP2PClient != nil { // create a timeout context to prevent hanging if legacy peer service is not responding peerCtx, cancel := context.WithTimeout(ctx, s.settings.RPC.ClientCallTimeout) defer cancel() @@ -1589,7 +1571,7 @@ func handleGetInfo(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan resultCh := make(chan peerResult, 1) go func() { - resp, err := s.peerClient.GetPeers(peerCtx) + resp, err := s.legacyP2PClient.GetPeers(peerCtx) resultCh <- peerResult{resp: resp, err: err} }() @@ -2020,8 +2002,8 @@ func handleIsBanned(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan // check if legacy peer service is available var peerBanned bool - if s.peerClient != nil { - isBannedLegacy, err := s.peerClient.IsBanned(ctx, &peer_api.IsBannedRequest{IpOrSubnet: c.IPOrSubnet}) + if s.legacyP2PClient != nil { + isBannedLegacy, err := s.legacyP2PClient.IsBanned(ctx, &peer_api.IsBannedRequest{IpOrSubnet: c.IPOrSubnet}) if err != nil { s.logger.Warnf("Failed to check if banned in legacy peer service: %v", err) } else { @@ -2103,7 +2085,7 @@ func handleListBanned(ctx context.Context, s *RPCServer, cmd interface{}, _ <-ch } // check if legacy peer service is available - if s.peerClient != nil { + if s.legacyP2PClient != nil { // Create a timeout context for the legacy peer client call legacyCtx, cancel := context.WithTimeout(ctx, clientCallTimeout) defer cancel() @@ -2116,7 +2098,7 @@ func handleListBanned(ctx context.Context, s *RPCServer, cmd interface{}, _ <-ch resultCh := make(chan legacyResult, 1) go func() { - resp, err := s.peerClient.ListBanned(legacyCtx, &emptypb.Empty{}) + resp, err := s.legacyP2PClient.ListBanned(legacyCtx, &emptypb.Empty{}) resultCh <- legacyResult{resp: resp, err: err} }() @@ -2180,8 +2162,8 @@ func handleClearBanned(ctx context.Context, s *RPCServer, cmd interface{}, _ <-c } } // check if legacy peer service is available - if s.peerClient != nil { - _, err := s.peerClient.ClearBanned(ctx, &emptypb.Empty{}) + if s.legacyP2PClient != nil { + _, err := s.legacyP2PClient.ClearBanned(ctx, &emptypb.Empty{}) if err != nil { s.logger.Warnf("Failed to clear banned list in legacy peer service: %v", err) } @@ -2277,10 +2259,10 @@ func handleSetBan(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan s } // and ban legacy peers - if s.peerClient != nil { + if s.legacyP2PClient != nil { until := expirationTimeInt64 - resp, err := s.peerClient.BanPeer(ctx, &peer_api.BanPeerRequest{ + resp, err := s.legacyP2PClient.BanPeer(ctx, &peer_api.BanPeerRequest{ Addr: c.IPOrSubnet, Until: until, }) @@ -2318,8 +2300,8 @@ func handleSetBan(ctx context.Context, s *RPCServer, cmd interface{}, _ <-chan s } // unban legacy peer - if s.peerClient != nil { - resp, err := s.peerClient.UnbanPeer(ctx, &peer_api.UnbanPeerRequest{ + if s.legacyP2PClient != nil { + resp, err := s.legacyP2PClient.UnbanPeer(ctx, &peer_api.UnbanPeerRequest{ Addr: c.IPOrSubnet, }) if err != nil { diff --git a/services/rpc/handlers_additional_test.go b/services/rpc/handlers_additional_test.go index 0900b4cac..a7ea12b8e 100644 --- a/services/rpc/handlers_additional_test.go +++ b/services/rpc/handlers_additional_test.go @@ -3048,8 +3048,8 @@ func TestHandleIsBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeer, + logger: logger, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3081,9 +3081,9 @@ func TestHandleIsBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3115,9 +3115,9 @@ func TestHandleIsBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3149,9 +3149,9 @@ func TestHandleIsBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3272,8 +3272,8 @@ func TestHandleListBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeer, + logger: logger, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3305,9 +3305,9 @@ func TestHandleListBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3343,9 +3343,9 @@ func TestHandleListBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3428,9 +3428,9 @@ func TestHandleClearBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3458,9 +3458,9 @@ func TestHandleClearBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3505,8 +3505,8 @@ func TestHandleClearBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeer, + logger: logger, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3550,9 +3550,9 @@ func TestHandleClearBannedComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3640,9 +3640,9 @@ func TestHandleSetBanComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3743,9 +3743,9 @@ func TestHandleSetBanComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3861,9 +3861,9 @@ func TestHandleSetBanComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3900,9 +3900,9 @@ func TestHandleSetBanComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - p2pClient: mockP2P, - peerClient: mockPeer, + logger: logger, + p2pClient: mockP2P, + legacyP2PClient: mockPeer, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, }, @@ -3996,7 +3996,7 @@ func TestHandleGetInfoComprehensive(t *testing.T) { blockchainClient: mockBlockchainClient, blockAssemblyClient: mockBlockAssemblyClient, p2pClient: mockP2PClient, - peerClient: mockLegacyPeerClient, + legacyP2PClient: mockLegacyPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -4222,7 +4222,7 @@ func TestHandleGetInfoComprehensive(t *testing.T) { logger: logger, blockchainClient: mockBlockchainClient, blockAssemblyClient: mockBlockAssemblyClient, - peerClient: mockLegacyPeerClient, + legacyP2PClient: mockLegacyPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -4864,8 +4864,8 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, + logger: logger, + legacyP2PClient: mockPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -4912,7 +4912,7 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { t.Run("p2p client with stats", func(t *testing.T) { // Create mock p2p client mockP2PClient := &mockP2PClient{ - getPeersFunc: func(ctx context.Context) ([]*p2p.PeerInfo, error) { + getPeerRegistryFunc: func(ctx context.Context) ([]*p2p.PeerInfo, error) { peerID, err := peer.Decode("12D3KooWL1NF6fdTJ9cucEuwvuX8V8KtpJZZnUE4umdLBuK15eUZ") require.NoError(t, err, "Failed to decode peer ID") return []*p2p.PeerInfo{ @@ -4995,7 +4995,7 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } mockP2PClient := &mockP2PClient{ - getPeersFunc: func(ctx context.Context) ([]*p2p.PeerInfo, error) { + getPeerRegistryFunc: func(ctx context.Context) ([]*p2p.PeerInfo, error) { peerID, err := peer.Decode("12D3KooWJZZnUE4umdLBuK15eUZL1NF6fdTJ9cucEuwvuX8V8Ktp") require.NoError(t, err, "Failed to decode peer ID") return []*p2p.PeerInfo{ @@ -5011,9 +5011,9 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, - p2pClient: mockP2PClient, + logger: logger, + legacyP2PClient: mockPeerClient, + p2pClient: mockP2PClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -5073,8 +5073,8 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, + logger: logger, + legacyP2PClient: mockPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -5108,9 +5108,9 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, - p2pClient: mockP2PClient, + logger: logger, + legacyP2PClient: mockPeerClient, + p2pClient: mockP2PClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ @@ -5150,8 +5150,8 @@ func TestHandleGetpeerinfoComprehensive(t *testing.T) { } s := &RPCServer{ - logger: logger, - peerClient: mockPeerClient, + logger: logger, + legacyP2PClient: mockPeerClient, settings: &settings.Settings{ ChainCfgParams: &chaincfg.MainNetParams, RPC: settings.RPCSettings{ diff --git a/settings/interface.go b/settings/interface.go index 2fbfb5ce2..cc3b1ba43 100644 --- a/settings/interface.go +++ b/settings/interface.go @@ -458,6 +458,9 @@ type P2PSettings struct { // Node mode configuration (full vs pruned) AllowPrunedNodeFallback bool // If true, fall back to pruned nodes when no full nodes available (default: true). Selects youngest pruned node (smallest height) to minimize UTXO pruning risk. + + // This is the time we trigger a periodic evaluation in the sync coordinator + SyncCoordinatorPeriodicEvaluationInterval time.Duration } type CoinbaseSettings struct { diff --git a/settings/settings.go b/settings/settings.go index 66420d668..8dd6f5def 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -405,7 +405,8 @@ func NewSettings(alternativeContext ...string) *Settings { EnableMDNS: getBool("p2p_enable_mdns", false, alternativeContext...), // Default false to prevent LAN scanning AllowPrivateIPs: getBool("p2p_allow_private_ips", false, alternativeContext...), // Default false for production safety // Full/pruned node selection configuration - AllowPrunedNodeFallback: getBool("p2p_allow_pruned_node_fallback", true, alternativeContext...), + AllowPrunedNodeFallback: getBool("p2p_allow_pruned_node_fallback", true, alternativeContext...), + SyncCoordinatorPeriodicEvaluationInterval: getDuration("p2p_sync_coordinator_periodic_evaluation_interval", 30*time.Second, alternativeContext...), }, Coinbase: CoinbaseSettings{ DB: getString("coinbaseDB", "", alternativeContext...), diff --git a/test/e2e/daemon/ready/multi_node_inject_test.go b/test/e2e/daemon/ready/multi_node_inject_test.go new file mode 100644 index 000000000..196eca8f0 --- /dev/null +++ b/test/e2e/daemon/ready/multi_node_inject_test.go @@ -0,0 +1,130 @@ +package smoke + +import ( + "fmt" + "net/url" + "testing" + "time" + + "github.com/bsv-blockchain/teranode/daemon" + "github.com/bsv-blockchain/teranode/services/blockchain" + "github.com/bsv-blockchain/teranode/settings" + "github.com/bsv-blockchain/teranode/test/utils/aerospike" + "github.com/stretchr/testify/require" +) + +func getAerospikeInstance(t *testing.T) *url.URL { + urlStr, teardownFn, err := aerospike.InitAerospikeContainer() + require.NoError(t, err, "Failed to setup Aerospike container") + + url, err := url.Parse(urlStr) + require.NoError(t, err, "Failed to parse UTXO store URL") + + t.Cleanup(func() { + _ = teardownFn() + }) + + return url +} + +func getTestDaemon(t *testing.T, settingsContext string, aerospikeURL *url.URL) *daemon.TestDaemon { + d := daemon.NewTestDaemon(t, daemon.TestOptions{ + EnableRPC: true, + EnableP2P: true, + EnableValidator: true, + SettingsContext: settingsContext, + SettingsOverrideFunc: func(s *settings.Settings) { + s.P2P.PeerCacheDir = t.TempDir() + s.UtxoStore.UtxoStore = aerospikeURL + s.ChainCfgParams.CoinbaseMaturity = 2 + s.P2P.SyncCoordinatorPeriodicEvaluationInterval = 1 * time.Second + }, + FSMState: blockchain.FSMStateRUNNING, + // EnableFullLogging: true, + }) + + t.Cleanup(func() { + d.Stop(t) + }) + + return d +} + +func printPeerRegistry(t *testing.T, td *daemon.TestDaemon) { + registry, err := td.P2PClient.GetPeerRegistry(t.Context()) + require.NoError(t, err) + + fmt.Printf("\nPeer %s (%s) registry:\n", td.Settings.ClientName, td.Settings.P2P.PeerID) + + for _, peerInfo := range registry { + fmt.Printf("\tName: %s (%s): Height=%d, BlockHash=%s, DataHubURL=%s\n", peerInfo.ClientName, peerInfo.ID, peerInfo.Height, peerInfo.BlockHash, peerInfo.DataHubURL) + } + + fmt.Println() + fmt.Println() +} + +// This test creates 2 nodes, and nodeA mines 3 blocks. Then we inject nodeA into nodeB, and nodeB should sync up to nodeA's height. +func Test_NodeB_Inject_After_NodeA_Mined(t *testing.T) { + SharedTestLock.Lock() + defer SharedTestLock.Unlock() + + sharedAerospike := getAerospikeInstance(t) + nodeA := getTestDaemon(t, "docker.host.teranode1.daemon", sharedAerospike) + nodeB := getTestDaemon(t, "docker.host.teranode2.daemon", sharedAerospike) + + t.Log(" Creating initial blockchain: [Genesis] -> [Block1] -> [Block2] -> [Block3]") + coinbaseTx := nodeA.MineToMaturityAndGetSpendableCoinbaseTx(t, nodeA.Ctx) + t.Logf(" Coinbase transaction available for spending: %s", coinbaseTx.TxIDChainHash().String()) + + // nodeA.InjectPeer(t, nodeB) + nodeB.InjectPeer(t, nodeA) + + printPeerRegistry(t, nodeB) + + nodeABestBlockHeader, _, err := nodeA.BlockchainClient.GetBestBlockHeader(nodeA.Ctx) + require.NoError(t, err) + + nodeB.WaitForBlockhash(t, nodeABestBlockHeader.Hash(), 10*time.Second) + +} + +// This test creates 2 nodes, and nodeB injects nodeA before nodeA mines any blocks. Then we mine 3 blocks on nodeA, and nodeB should sync up to nodeA's height. +func Test_NodeB_Inject_Before_NodeA_Mined(t *testing.T) { + t.SkipNow() + + SharedTestLock.Lock() + defer SharedTestLock.Unlock() + + sharedAerospike := getAerospikeInstance(t) + nodeA := getTestDaemon(t, "docker.host.teranode1.daemon", sharedAerospike) + nodeB := getTestDaemon(t, "docker.host.teranode2.daemon", sharedAerospike) + + // nodeA.InjectPeer(t, nodeB) + nodeB.InjectPeer(t, nodeA) + + printPeerRegistry(t, nodeB) + + // go func() { + // for { + // time.Sleep(5 * time.Second) + // printPeerRegistry(t, nodeB) + // } + // }() + + t.Log(" Creating initial blockchain: [Genesis] -> [Block1] -> [Block2] -> [Block3]") + coinbaseTx := nodeA.MineToMaturityAndGetSpendableCoinbaseTx(t, nodeA.Ctx) + t.Logf(" Coinbase transaction available for spending: %s", coinbaseTx.TxIDChainHash().String()) + + printPeerRegistry(t, nodeB) + + s, err := nodeB.CallRPC(t.Context(), "getpeerinfo", []interface{}{}) + require.NoError(t, err) + t.Logf(" NodeB peer info: %s", s) + + nodeABestBlockHeader, _, err := nodeA.BlockchainClient.GetBestBlockHeader(nodeA.Ctx) + require.NoError(t, err) + + nodeB.WaitForBlockhash(t, nodeABestBlockHeader.Hash(), 26*time.Second) + +}