Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions daemon/daemon_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett
return err
}

peerClient, err := peer.NewClient(ctx, createLogger("rpc"), appSettings)
legacyPeerClient, err := peer.NewClient(ctx, createLogger("rpc"), appSettings)
if err != nil {
return err
}
Expand Down Expand Up @@ -489,7 +489,7 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett
// Create the RPC server with the necessary parts
var rpcServer *rpc.RPCServer

rpcServer, err = rpc.NewServer(createLogger(loggerRPC), appSettings, blockchainClient, blockValidationClient, utxoStore, blockAssemblyClient, peerClient, p2pClient, txStore, validatorClient)
rpcServer, err = rpc.NewServer(createLogger(loggerRPC), appSettings, blockchainClient, blockValidationClient, utxoStore, blockAssemblyClient, legacyPeerClient, p2pClient, txStore, validatorClient)
if err != nil {
return err
}
Expand Down
36 changes: 34 additions & 2 deletions daemon/test_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/bsv-blockchain/teranode/test/utils/wait"
"github.com/bsv-blockchain/teranode/ulogger"
"github.com/bsv-blockchain/teranode/util"
libp2pPeer "github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
tc "github.com/testcontainers/testcontainers-go/modules/compose"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (je *JSONError) Error() string {

// NewTestDaemon creates a new TestDaemon instance with the provided options.
func NewTestDaemon(t *testing.T, opts TestOptions) *TestDaemon {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(t.Context())

var (
composeDependencies tc.ComposeStack
Expand Down Expand Up @@ -460,8 +461,13 @@ func NewTestDaemon(t *testing.T, opts TestOptions) *TestDaemon {
blockAssembler, ok := blockAssemblyService.(*blockassembly.BlockAssembly)
require.True(t, ok)

assetURL := fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort)
if appSettings.Asset.APIPrefix != "" {
assetURL += appSettings.Asset.APIPrefix
}

return &TestDaemon{
AssetURL: fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort),
AssetURL: assetURL,
BlockAssembler: blockAssembler.GetBlockAssembler(),
BlockAssemblyClient: blockAssemblyClient,
BlockValidationClient: blockValidationClient,
Expand Down Expand Up @@ -1294,6 +1300,13 @@ func (td *TestDaemon) WaitForBlockStateChange(t *testing.T, expectedBlock *model
}
}

func (td *TestDaemon) WaitForBlockhash(t *testing.T, blockHash *chainhash.Hash, timeout time.Duration) {
require.Eventually(t, func() bool {
_, err := td.BlockchainClient.GetBlock(td.Ctx, blockHash)
return err == nil
}, timeout, 100*time.Millisecond, "Timeout waiting for block with hash %s", blockHash.String())
}

func (td *TestDaemon) WaitForBlock(t *testing.T, expectedBlock *model.Block, timeout time.Duration, skipVerifyChain ...bool) {
ctx, cancel := context.WithTimeout(td.Ctx, timeout)
defer cancel()
Expand Down Expand Up @@ -1865,6 +1878,25 @@ func (td *TestDaemon) DisconnectFromPeer(t *testing.T, peer *TestDaemon) {
require.NoError(t, err, "Failed to disconnect from peer")
}

func (td *TestDaemon) InjectPeer(t *testing.T, peer *TestDaemon) {
peerID, err := libp2pPeer.Decode(peer.Settings.P2P.PeerID)
require.NoError(t, err, "Failed to decode peer ID")

p2pService, err := td.d.ServiceManager.GetService("P2P")
require.NoError(t, err, "Failed to get P2P service")

p2pServer, ok := p2pService.(*p2p.Server)
require.True(t, ok, "Failed to cast P2P service to Server")

// Inject my peer info to other peer...
header, meta, err := peer.BlockchainClient.GetBestBlockHeader(td.Ctx)
require.NoError(t, err, "Failed to get best block header")

p2pServer.InjectPeerForTesting(peerID, peer.Settings.Context, peer.AssetURL, meta.Height, header.Hash())

t.Logf("Injected peer %s into %s's registry (PeerID: %s)", peer.Settings.Context, td.Settings.Context, peerID)
}

func peerAddress(peer *TestDaemon) string {
return fmt.Sprintf("/dns/127.0.0.1/tcp/%d/p2p/%s", peer.Settings.P2P.Port, peer.Settings.P2P.PeerID)
}
Expand Down
8 changes: 6 additions & 2 deletions services/p2p/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,15 +1390,19 @@ func (s *Server) processBlockchainNotification(ctx context.Context, notification
return errors.NewError(fmt.Sprintf("error getting chainhash from notification hash %s: %%w", notification.Hash), err)
}

s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String())

switch notification.Type {
case model.NotificationType_Block:
s.logger.Infof("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String())
return s.handleBlockNotification(ctx, hash) // These handlers return wrapped errors

case model.NotificationType_Subtree:
s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String())
return s.handleSubtreeNotification(ctx, hash)

case model.NotificationType_PeerFailure:
s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String())
return s.handlePeerFailureNotification(ctx, notification)

default:
s.logger.Warnf("[processBlockchainNotification] Received unhandled notification type: %s for hash %s", notification.Type, hash.String())
}
Expand Down
12 changes: 12 additions & 0 deletions services/p2p/server_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,18 @@ func (s *Server) addConnectedPeer(peerID peer.ID, clientName string, height uint
}
}

// InjectPeerForTesting directly injects a peer into the registry for testing purposes.
// This method allows deterministic peer setup without requiring actual P2P network connections.
func (s *Server) InjectPeerForTesting(peerID peer.ID, clientName, dataHubURL string, height uint32, blockHash *chainhash.Hash) {
if s.peerRegistry == nil {
return
}

s.peerRegistry.Put(peerID, clientName, height, blockHash, dataHubURL)

s.peerRegistry.UpdateStorage(peerID, "full")
}

func (s *Server) removePeer(peerID peer.ID) {
if s.peerRegistry != nil {
// Mark as disconnected before removing
Expand Down
8 changes: 7 additions & 1 deletion services/p2p/sync_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,13 @@ func (sc *SyncCoordinator) logCandidateList(candidates []*PeerInfo) {
func (sc *SyncCoordinator) periodicEvaluation(ctx context.Context) {
defer sc.wg.Done()

ticker := time.NewTicker(30 * time.Second)
interval := sc.settings.P2P.SyncCoordinatorPeriodicEvaluationInterval
if interval <= 0 {
sc.logger.Warnf("[SyncCoordinator] Invalid periodic evaluation interval %v, using default 30s", interval)
interval = 30 * time.Second
}

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
Expand Down
8 changes: 4 additions & 4 deletions services/rpc/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,9 @@ type RPCServer struct {
// Used for mining-related RPC commands like getminingcandidate and generate
blockAssemblyClient blockassembly.ClientI

// peerClient provides access to legacy peer network services
// legacyP2PClient provides access to legacy peer network services
// Used for peer management and information retrieval
peerClient peer.ClientI
legacyP2PClient peer.ClientI

// p2pClient provides access to the P2P network services
// Used for modern peer management and network operations
Expand Down Expand Up @@ -1384,7 +1384,7 @@ func (s *RPCServer) Start(ctx context.Context, readyCh chan<- struct{}) error {
// Returns:
// - *RPCServer: Configured server instance ready for initialization
// - error: Any error encountered during configuration
func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainClient blockchain.ClientI, blockValidationClient blockvalidation.Interface, utxoStore utxo.Store, blockAssemblyClient blockassembly.ClientI, peerClient peer.ClientI, p2pClient p2p.ClientI, txStore blob.Store, validatorClient validator.Interface) (*RPCServer, error) {
func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainClient blockchain.ClientI, blockValidationClient blockvalidation.Interface, utxoStore utxo.Store, blockAssemblyClient blockassembly.ClientI, legacyPeerClient peer.ClientI, p2pClient p2p.ClientI, txStore blob.Store, validatorClient validator.Interface) (*RPCServer, error) {
initPrometheusMetrics()

assetHTTPAddress := tSettings.Asset.HTTPAddress
Expand All @@ -1409,7 +1409,7 @@ func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainCl
helpCacher: newHelpCacher(),
utxoStore: utxoStore,
blockAssemblyClient: blockAssemblyClient,
peerClient: peerClient,
legacyP2PClient: legacyPeerClient,
p2pClient: p2pClient,
txStore: txStore,
validatorClient: validatorClient,
Expand Down
Loading
Loading