Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 46 additions & 32 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ const (
delKeysCountThreshold = 10
lowSpaceThreshold = 50 // GB
batchRetrieveSize = 1000
storeSameSymbolsBatchConcurrency = 3
fetchSymbolsBatchConcurrency = 6
minimumDataStoreSuccessRate = 75.0

storeSameSymbolsBatchConcurrency = 3
fetchSymbolsBatchConcurrency = 6
minimumDataStoreSuccessRate = 75.0

maxIterations = 4
macConcurrentNetworkStoreCalls = 16
Expand Down Expand Up @@ -124,7 +125,7 @@ func (s *DHT) ConnPoolSnapshot() map[string]int64 {

// Options contains configuration options for the queries node
type Options struct {
ID []byte
ID []byte

// The queries IPv4 or IPv6 address
IP string
Expand All @@ -139,8 +140,11 @@ type Options struct {
// Lumera client for interacting with the blockchain
LumeraClient lumera.Client

// Keyring for credentials
Keyring keyring.Keyring
// Keyring for credentials
Keyring keyring.Keyring

// MetricsDisabled gates DHT-level metrics emission (p2pmetrics hooks and snapshots)
MetricsDisabled bool
}

// NewDHT returns a new DHT node
Expand Down Expand Up @@ -739,7 +743,9 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
return nil, fmt.Errorf("fetch and add local keys: %v", err)
}
// Report how many were found locally, for event metrics
p2pmetrics.ReportFoundLocal(p2pmetrics.TaskIDFromContext(ctx), int(foundLocalCount))
if !s.options.MetricsDisabled {
p2pmetrics.ReportFoundLocal(p2pmetrics.TaskIDFromContext(ctx), int(foundLocalCount))
}
if foundLocalCount >= required {
return result, nil
}
Expand Down Expand Up @@ -788,7 +794,9 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
// Record batch retrieve stats for internal DHT snapshot window
s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Since(start))
// Also feed retrieve counts into the per-task collector for stream events
p2pmetrics.SetRetrieveBatchSummary(p2pmetrics.TaskIDFromContext(ctx), len(keys), int(required), int(foundLocalCount), netFound, time.Since(start).Milliseconds())
if !s.options.MetricsDisabled {
p2pmetrics.SetRetrieveBatchSummary(p2pmetrics.TaskIDFromContext(ctx), len(keys), int(required), int(foundLocalCount), netFound, time.Since(start).Milliseconds())
}

return result, nil
}
Expand Down Expand Up @@ -946,14 +954,16 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node,
}
mu.Unlock()
// record failed RPC per-node
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
IP: node.IP,
Address: node.String(),
Keys: 0,
Success: false,
Error: err.Error(),
DurationMS: time.Since(callStart).Milliseconds(),
})
if !s.options.MetricsDisabled {
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
IP: node.IP,
Address: node.String(),
Keys: 0,
Success: false,
Error: err.Error(),
DurationMS: time.Since(callStart).Milliseconds(),
})
}
return
}

Expand All @@ -976,14 +986,16 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node,
}

// record successful RPC per-node (returned may be 0). Success is true when no error.
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
IP: node.IP,
Address: node.String(),
Keys: returned,
Success: true,
Error: "",
DurationMS: time.Since(callStart).Milliseconds(),
})
if !s.options.MetricsDisabled {
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
IP: node.IP,
Address: node.String(),
Keys: returned,
Success: true,
Error: "",
DurationMS: time.Since(callStart).Milliseconds(),
})
}
}(node, nodeID)
}

Expand Down Expand Up @@ -1713,14 +1725,16 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
}

// Emit per-node store RPC call via metrics bridge (no P2P API coupling)
p2pmetrics.RecordStore(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
IP: nodeIP,
Address: nodeAddr,
Keys: response.KeysCount,
Success: errMsg == "" && response.Error == nil,
Error: errMsg,
DurationMS: response.DurationMS,
})
if !s.options.MetricsDisabled {
p2pmetrics.RecordStore(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
IP: nodeIP,
Address: nodeAddr,
Keys: response.KeysCount,
Success: errMsg == "" && response.Error == nil,
Error: errMsg,
DurationMS: response.DurationMS,
})
}

}

Expand Down
61 changes: 32 additions & 29 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ type P2P interface {

// p2p structure to implements interface
type p2p struct {
store kademlia.Store // the store for kademlia network
metaStore kademlia.MetaStore
dht *kademlia.DHT // the kademlia network
config *Config // the service configuration
running bool // if the kademlia network is ready
lumeraClient lumera.Client
keyring keyring.Keyring // Add the keyring field
rqstore rqstore.Store
store kademlia.Store // the store for kademlia network
metaStore kademlia.MetaStore
dht *kademlia.DHT // the kademlia network
config *Config // the service configuration
running bool // if the kademlia network is ready
lumeraClient lumera.Client
keyring keyring.Keyring // Add the keyring field
rqstore rqstore.Store
metricsDisabled bool
}

// Run the kademlia network
Expand Down Expand Up @@ -226,14 +227,15 @@ func (s *p2p) NClosestNodesWithIncludingNodeList(ctx context.Context, n int, key
// configure the distributed hash table for p2p service
func (s *p2p) configure(ctx context.Context) error {
// new the queries storage
kadOpts := &kademlia.Options{
LumeraClient: s.lumeraClient,
Keyring: s.keyring, // Pass the keyring
BootstrapNodes: []*kademlia.Node{},
IP: s.config.ListenAddress,
Port: s.config.Port,
ID: []byte(s.config.ID),
}
kadOpts := &kademlia.Options{
LumeraClient: s.lumeraClient,
Keyring: s.keyring, // Pass the keyring
BootstrapNodes: []*kademlia.Node{},
IP: s.config.ListenAddress,
Port: s.config.Port,
ID: []byte(s.config.ID),
MetricsDisabled: s.metricsDisabled,
}

if len(kadOpts.ID) == 0 {
errors.Errorf("node id is empty")
Expand All @@ -251,25 +253,26 @@ func (s *p2p) configure(ctx context.Context) error {
}

// New returns a new p2p instance.
func New(ctx context.Context, config *Config, lumeraClient lumera.Client, kr keyring.Keyring, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (P2P, error) {
store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst)
if err != nil {
return nil, errors.Errorf("new kademlia store: %w", err)
}
func New(ctx context.Context, config *Config, lumeraClient lumera.Client, kr keyring.Keyring, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore, metricsDisabled bool) (P2P, error) {
store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst)
if err != nil {
return nil, errors.Errorf("new kademlia store: %w", err)
}

meta, err := meta.NewStore(ctx, config.DataDir)
if err != nil {
return nil, errors.Errorf("new kademlia meta store: %w", err)
}

return &p2p{
store: store,
metaStore: meta,
config: config,
lumeraClient: lumeraClient,
keyring: kr, // Store the keyring
rqstore: rqstore,
}, nil
return &p2p{
store: store,
metaStore: meta,
config: config,
lumeraClient: lumeraClient,
keyring: kr, // Store the keyring
rqstore: rqstore,
metricsDisabled: metricsDisabled,
}, nil
}

// LocalStore store data into the kademlia network
Expand Down
42 changes: 23 additions & 19 deletions supernode/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,14 @@ The supernode will connect to the Lumera network and begin participating in the
logtrace.Fatal(ctx, "Failed to initialize RaptorQ store", logtrace.Fields{"error": err.Error()})
}

// Initialize P2P service
p2pService, err := initP2PService(ctx, appConfig, lumeraClient, kr, rqStore, nil, nil)
if err != nil {
logtrace.Fatal(ctx, "Failed to initialize P2P service", logtrace.Fields{"error": err.Error()})
}
// Manually set the disable flag at the highest level
disableMetrics := true

// Initialize P2P service with explicit disable flag
p2pService, err := initP2PService(ctx, appConfig, lumeraClient, kr, rqStore, nil, nil, disableMetrics)
if err != nil {
logtrace.Fatal(ctx, "Failed to initialize P2P service", logtrace.Fields{"error": err.Error()})
}

// Initialize the supernode
supernodeInstance, err := NewSupernode(ctx, appConfig, kr, p2pService, rqStore, lumeraClient)
Expand All @@ -97,18 +100,19 @@ The supernode will connect to the Lumera network and begin participating in the
}

// Configure cascade service
cService := cascadeService.NewCascadeService(
&cascadeService.Config{
Config: common.Config{
SupernodeAccountAddress: appConfig.SupernodeConfig.Identity,
},
RqFilesDir: appConfig.GetRaptorQFilesDir(),
},
lumeraClient,
*p2pService,
codec.NewRaptorQCodec(appConfig.GetRaptorQFilesDir()),
rqStore,
)
cService := cascadeService.NewCascadeService(
&cascadeService.Config{
Config: common.Config{
SupernodeAccountAddress: appConfig.SupernodeConfig.Identity,
},
RqFilesDir: appConfig.GetRaptorQFilesDir(),
MetricsDisabled: disableMetrics,
},
lumeraClient,
*p2pService,
codec.NewRaptorQCodec(appConfig.GetRaptorQFilesDir()),
rqStore,
)

// Create cascade action server
cascadeActionServer := cascade.NewCascadeActionServer(cService)
Expand Down Expand Up @@ -190,7 +194,7 @@ func init() {
}

// initP2PService initializes the P2P service
func initP2PService(ctx context.Context, config *config.Config, lumeraClient lumera.Client, kr cKeyring.Keyring, rqStore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (*p2p.P2P, error) {
func initP2PService(ctx context.Context, config *config.Config, lumeraClient lumera.Client, kr cKeyring.Keyring, rqStore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore, metricsDisabled bool) (*p2p.P2P, error) {
// Get the supernode address from the keyring
keyInfo, err := kr.Key(config.SupernodeConfig.KeyName)
if err != nil {
Expand All @@ -206,7 +210,7 @@ func initP2PService(ctx context.Context, config *config.Config, lumeraClient lum

logtrace.Info(ctx, "Initializing P2P service", logtrace.Fields{"address": p2pConfig.ListenAddress, "port": p2pConfig.Port, "data_dir": p2pConfig.DataDir, "supernode_id": address.String()})

p2pService, err := p2p.New(ctx, p2pConfig, lumeraClient, kr, rqStore, cloud, mst)
p2pService, err := p2p.New(ctx, p2pConfig, lumeraClient, kr, rqStore, cloud, mst, metricsDisabled)
if err != nil {
return nil, fmt.Errorf("failed to initialize p2p service: %w", err)
}
Expand Down
17 changes: 10 additions & 7 deletions supernode/services/cascade/adaptors/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ type P2PService interface {

// p2pImpl is the default implementation of the P2PService interface.
type p2pImpl struct {
p2p p2p.Client
rqStore rqstore.Store
p2p p2p.Client
rqStore rqstore.Store
metricsDisabled bool
}

// NewP2PService returns a concrete implementation of P2PService.
func NewP2PService(client p2p.Client, store rqstore.Store) P2PService {
return &p2pImpl{p2p: client, rqStore: store}
func NewP2PService(client p2p.Client, store rqstore.Store, metricsDisabled bool) P2PService {
return &p2pImpl{p2p: client, rqStore: store, metricsDisabled: metricsDisabled}
}

type StoreArtefactsRequest struct {
Expand All @@ -58,9 +59,11 @@ type StoreArtefactsRequest struct {
func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error {
logtrace.Info(ctx, "About to store artefacts (metadata + symbols)", logtrace.Fields{"taskID": req.TaskID, "id_files": len(req.IDFiles)})

// Enable per-node store RPC capture for this task
cm.StartStoreCapture(req.TaskID)
defer cm.StopStoreCapture(req.TaskID)
// Optionally enable per-node store RPC capture for this task
if !p.metricsDisabled {
cm.StartStoreCapture(req.TaskID)
defer cm.StopStoreCapture(req.TaskID)
}

start := time.Now()
firstPassSymbols, totalSymbols, err := p.storeCascadeSymbolsAndData(ctx, req.TaskID, req.ActionID, req.SymbolsDir, req.IDFiles)
Expand Down
6 changes: 4 additions & 2 deletions supernode/services/cascade/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
type Config struct {
common.Config `mapstructure:",squash" json:"-"`

RaptorQServiceAddress string `mapstructure:"-" json:"-"`
RqFilesDir string `mapstructure:"rq_files_dir" json:"rq_files_dir,omitempty"`
RaptorQServiceAddress string `mapstructure:"-" json:"-"`
RqFilesDir string `mapstructure:"rq_files_dir" json:"rq_files_dir,omitempty"`
// MetricsDisabled toggles upload/download metrics for cascade service
MetricsDisabled bool `mapstructure:"-" json:"-"`
}
35 changes: 21 additions & 14 deletions supernode/services/cascade/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
}
logtrace.Info(ctx, "Retrieving target-required symbols for decode", fields)

// Enable retrieve metrics capture for this action
cm.StartRetrieveCapture(actionID)
defer cm.StopRetrieveCapture(actionID)

if !task.config.MetricsDisabled {
cm.StartRetrieveCapture(actionID)
defer cm.StopRetrieveCapture(actionID)
}

// Measure symbols batch retrieve duration
retrieveStart := time.Now()
Expand Down Expand Up @@ -201,17 +203,22 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
}
decodeMS := time.Since(decodeStart).Milliseconds()

// Set minimal retrieve summary and emit event strictly from internal collector
cm.SetRetrieveSummary(actionID, retrieveMS, decodeMS)
payload := cm.BuildDownloadEventPayloadFromCollector(actionID)
if retrieve, ok := payload["retrieve"].(map[string]any); ok {
retrieve["target_required_percent"] = targetRequiredPercent
retrieve["target_required_count"] = targetRequiredCount
retrieve["total_symbols"] = totalSymbols
}
if b, err := json.MarshalIndent(payload, "", " "); err == nil {
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), "", "", send)
}
// Set minimal retrieve summary and emit event strictly from internal collector
if !task.config.MetricsDisabled {
cm.SetRetrieveSummary(actionID, retrieveMS, decodeMS)
payload := cm.BuildDownloadEventPayloadFromCollector(actionID)
if retrieve, ok := payload["retrieve"].(map[string]any); ok {
retrieve["target_required_percent"] = targetRequiredPercent
retrieve["target_required_count"] = targetRequiredCount
retrieve["total_symbols"] = totalSymbols
}
if b, err := json.MarshalIndent(payload, "", " "); err == nil {
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), "", "", send)
}
} else {
// Send minimal hardcoded event when metrics disabled
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, "Download completed (metrics disabled)", "", "", send)
}

fileHash, err := crypto.HashFileIncrementally(decodeInfo.FilePath, 0)
if err != nil {
Expand Down
Loading
Loading