From 3c66950c37aa46711219d0e86ee73efffc027eb1 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Thu, 25 Sep 2025 16:27:39 +0500 Subject: [PATCH] Disable Metrics --- p2p/kademlia/dht.go | 78 +++++++++++-------- p2p/p2p.go | 61 ++++++++------- supernode/cmd/start.go | 42 +++++----- supernode/services/cascade/adaptors/p2p.go | 17 ++-- supernode/services/cascade/config.go | 6 +- supernode/services/cascade/download.go | 35 +++++---- supernode/services/cascade/register.go | 14 ++-- supernode/services/cascade/service.go | 14 ++-- tests/integration/p2p/p2p_integration_test.go | 5 +- 9 files changed, 154 insertions(+), 118 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 69c45023..b83fb34e 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -42,9 +42,10 @@ const ( delKeysCountThreshold = 10 lowSpaceThreshold = 50 // GB batchRetrieveSize = 1000 - storeSameSymbolsBatchConcurrency = 3 - fetchSymbolsBatchConcurrency = 6 - minimumDataStoreSuccessRate = 75.0 + + storeSameSymbolsBatchConcurrency = 3 + fetchSymbolsBatchConcurrency = 6 + minimumDataStoreSuccessRate = 75.0 maxIterations = 4 macConcurrentNetworkStoreCalls = 16 @@ -124,7 +125,7 @@ func (s *DHT) ConnPoolSnapshot() map[string]int64 { // Options contains configuration options for the queries node type Options struct { - ID []byte + ID []byte // The queries IPv4 or IPv6 address IP string @@ -139,8 +140,11 @@ type Options struct { // Lumera client for interacting with the blockchain LumeraClient lumera.Client - // Keyring for credentials - Keyring keyring.Keyring + // Keyring for credentials + Keyring keyring.Keyring + + // MetricsDisabled gates DHT-level metrics emission (p2pmetrics hooks and snapshots) + MetricsDisabled bool } // NewDHT returns a new DHT node @@ -739,7 +743,9 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, return nil, fmt.Errorf("fetch and add local keys: %v", err) } // Report how many were found locally, for event metrics - p2pmetrics.ReportFoundLocal(p2pmetrics.TaskIDFromContext(ctx), int(foundLocalCount)) + if !s.options.MetricsDisabled { + p2pmetrics.ReportFoundLocal(p2pmetrics.TaskIDFromContext(ctx), int(foundLocalCount)) + } if foundLocalCount >= required { return result, nil } @@ -788,7 +794,9 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, // Record batch retrieve stats for internal DHT snapshot window s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Since(start)) // Also feed retrieve counts into the per-task collector for stream events - p2pmetrics.SetRetrieveBatchSummary(p2pmetrics.TaskIDFromContext(ctx), len(keys), int(required), int(foundLocalCount), netFound, time.Since(start).Milliseconds()) + if !s.options.MetricsDisabled { + p2pmetrics.SetRetrieveBatchSummary(p2pmetrics.TaskIDFromContext(ctx), len(keys), int(required), int(foundLocalCount), netFound, time.Since(start).Milliseconds()) + } return result, nil } @@ -946,14 +954,16 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, } mu.Unlock() // record failed RPC per-node - p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{ - IP: node.IP, - Address: node.String(), - Keys: 0, - Success: false, - Error: err.Error(), - DurationMS: time.Since(callStart).Milliseconds(), - }) + if !s.options.MetricsDisabled { + p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{ + IP: node.IP, + Address: node.String(), + Keys: 0, + Success: false, + Error: err.Error(), + DurationMS: time.Since(callStart).Milliseconds(), + }) + } return } @@ -976,14 +986,16 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, } // record successful RPC per-node (returned may be 0). Success is true when no error. - p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{ - IP: node.IP, - Address: node.String(), - Keys: returned, - Success: true, - Error: "", - DurationMS: time.Since(callStart).Milliseconds(), - }) + if !s.options.MetricsDisabled { + p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{ + IP: node.IP, + Address: node.String(), + Keys: returned, + Success: true, + Error: "", + DurationMS: time.Since(callStart).Milliseconds(), + }) + } }(node, nodeID) } @@ -1713,14 +1725,16 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i } // Emit per-node store RPC call via metrics bridge (no P2P API coupling) - p2pmetrics.RecordStore(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{ - IP: nodeIP, - Address: nodeAddr, - Keys: response.KeysCount, - Success: errMsg == "" && response.Error == nil, - Error: errMsg, - DurationMS: response.DurationMS, - }) + if !s.options.MetricsDisabled { + p2pmetrics.RecordStore(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{ + IP: nodeIP, + Address: nodeAddr, + Keys: response.KeysCount, + Success: errMsg == "" && response.Error == nil, + Error: errMsg, + DurationMS: response.DurationMS, + }) + } } diff --git a/p2p/p2p.go b/p2p/p2p.go index e3d6b40a..006c469a 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -40,14 +40,15 @@ type P2P interface { // p2p structure to implements interface type p2p struct { - store kademlia.Store // the store for kademlia network - metaStore kademlia.MetaStore - dht *kademlia.DHT // the kademlia network - config *Config // the service configuration - running bool // if the kademlia network is ready - lumeraClient lumera.Client - keyring keyring.Keyring // Add the keyring field - rqstore rqstore.Store + store kademlia.Store // the store for kademlia network + metaStore kademlia.MetaStore + dht *kademlia.DHT // the kademlia network + config *Config // the service configuration + running bool // if the kademlia network is ready + lumeraClient lumera.Client + keyring keyring.Keyring // Add the keyring field + rqstore rqstore.Store + metricsDisabled bool } // Run the kademlia network @@ -226,14 +227,15 @@ func (s *p2p) NClosestNodesWithIncludingNodeList(ctx context.Context, n int, key // configure the distributed hash table for p2p service func (s *p2p) configure(ctx context.Context) error { // new the queries storage - kadOpts := &kademlia.Options{ - LumeraClient: s.lumeraClient, - Keyring: s.keyring, // Pass the keyring - BootstrapNodes: []*kademlia.Node{}, - IP: s.config.ListenAddress, - Port: s.config.Port, - ID: []byte(s.config.ID), - } + kadOpts := &kademlia.Options{ + LumeraClient: s.lumeraClient, + Keyring: s.keyring, // Pass the keyring + BootstrapNodes: []*kademlia.Node{}, + IP: s.config.ListenAddress, + Port: s.config.Port, + ID: []byte(s.config.ID), + MetricsDisabled: s.metricsDisabled, + } if len(kadOpts.ID) == 0 { errors.Errorf("node id is empty") @@ -251,25 +253,26 @@ func (s *p2p) configure(ctx context.Context) error { } // New returns a new p2p instance. -func New(ctx context.Context, config *Config, lumeraClient lumera.Client, kr keyring.Keyring, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (P2P, error) { - store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst) - if err != nil { - return nil, errors.Errorf("new kademlia store: %w", err) - } +func New(ctx context.Context, config *Config, lumeraClient lumera.Client, kr keyring.Keyring, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore, metricsDisabled bool) (P2P, error) { + store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst) + if err != nil { + return nil, errors.Errorf("new kademlia store: %w", err) + } meta, err := meta.NewStore(ctx, config.DataDir) if err != nil { return nil, errors.Errorf("new kademlia meta store: %w", err) } - return &p2p{ - store: store, - metaStore: meta, - config: config, - lumeraClient: lumeraClient, - keyring: kr, // Store the keyring - rqstore: rqstore, - }, nil + return &p2p{ + store: store, + metaStore: meta, + config: config, + lumeraClient: lumeraClient, + keyring: kr, // Store the keyring + rqstore: rqstore, + metricsDisabled: metricsDisabled, + }, nil } // LocalStore store data into the kademlia network diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index 92ccc700..eaf1339e 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -84,11 +84,14 @@ The supernode will connect to the Lumera network and begin participating in the logtrace.Fatal(ctx, "Failed to initialize RaptorQ store", logtrace.Fields{"error": err.Error()}) } - // Initialize P2P service - p2pService, err := initP2PService(ctx, appConfig, lumeraClient, kr, rqStore, nil, nil) - if err != nil { - logtrace.Fatal(ctx, "Failed to initialize P2P service", logtrace.Fields{"error": err.Error()}) - } + // Manually set the disable flag at the highest level + disableMetrics := true + + // Initialize P2P service with explicit disable flag + p2pService, err := initP2PService(ctx, appConfig, lumeraClient, kr, rqStore, nil, nil, disableMetrics) + if err != nil { + logtrace.Fatal(ctx, "Failed to initialize P2P service", logtrace.Fields{"error": err.Error()}) + } // Initialize the supernode supernodeInstance, err := NewSupernode(ctx, appConfig, kr, p2pService, rqStore, lumeraClient) @@ -97,18 +100,19 @@ The supernode will connect to the Lumera network and begin participating in the } // Configure cascade service - cService := cascadeService.NewCascadeService( - &cascadeService.Config{ - Config: common.Config{ - SupernodeAccountAddress: appConfig.SupernodeConfig.Identity, - }, - RqFilesDir: appConfig.GetRaptorQFilesDir(), - }, - lumeraClient, - *p2pService, - codec.NewRaptorQCodec(appConfig.GetRaptorQFilesDir()), - rqStore, - ) + cService := cascadeService.NewCascadeService( + &cascadeService.Config{ + Config: common.Config{ + SupernodeAccountAddress: appConfig.SupernodeConfig.Identity, + }, + RqFilesDir: appConfig.GetRaptorQFilesDir(), + MetricsDisabled: disableMetrics, + }, + lumeraClient, + *p2pService, + codec.NewRaptorQCodec(appConfig.GetRaptorQFilesDir()), + rqStore, + ) // Create cascade action server cascadeActionServer := cascade.NewCascadeActionServer(cService) @@ -190,7 +194,7 @@ func init() { } // initP2PService initializes the P2P service -func initP2PService(ctx context.Context, config *config.Config, lumeraClient lumera.Client, kr cKeyring.Keyring, rqStore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (*p2p.P2P, error) { +func initP2PService(ctx context.Context, config *config.Config, lumeraClient lumera.Client, kr cKeyring.Keyring, rqStore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore, metricsDisabled bool) (*p2p.P2P, error) { // Get the supernode address from the keyring keyInfo, err := kr.Key(config.SupernodeConfig.KeyName) if err != nil { @@ -206,7 +210,7 @@ func initP2PService(ctx context.Context, config *config.Config, lumeraClient lum logtrace.Info(ctx, "Initializing P2P service", logtrace.Fields{"address": p2pConfig.ListenAddress, "port": p2pConfig.Port, "data_dir": p2pConfig.DataDir, "supernode_id": address.String()}) - p2pService, err := p2p.New(ctx, p2pConfig, lumeraClient, kr, rqStore, cloud, mst) + p2pService, err := p2p.New(ctx, p2pConfig, lumeraClient, kr, rqStore, cloud, mst, metricsDisabled) if err != nil { return nil, fmt.Errorf("failed to initialize p2p service: %w", err) } diff --git a/supernode/services/cascade/adaptors/p2p.go b/supernode/services/cascade/adaptors/p2p.go index 116d6810..a29e2b99 100644 --- a/supernode/services/cascade/adaptors/p2p.go +++ b/supernode/services/cascade/adaptors/p2p.go @@ -39,13 +39,14 @@ type P2PService interface { // p2pImpl is the default implementation of the P2PService interface. type p2pImpl struct { - p2p p2p.Client - rqStore rqstore.Store + p2p p2p.Client + rqStore rqstore.Store + metricsDisabled bool } // NewP2PService returns a concrete implementation of P2PService. -func NewP2PService(client p2p.Client, store rqstore.Store) P2PService { - return &p2pImpl{p2p: client, rqStore: store} +func NewP2PService(client p2p.Client, store rqstore.Store, metricsDisabled bool) P2PService { + return &p2pImpl{p2p: client, rqStore: store, metricsDisabled: metricsDisabled} } type StoreArtefactsRequest struct { @@ -58,9 +59,11 @@ type StoreArtefactsRequest struct { func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error { logtrace.Info(ctx, "About to store artefacts (metadata + symbols)", logtrace.Fields{"taskID": req.TaskID, "id_files": len(req.IDFiles)}) - // Enable per-node store RPC capture for this task - cm.StartStoreCapture(req.TaskID) - defer cm.StopStoreCapture(req.TaskID) + // Optionally enable per-node store RPC capture for this task + if !p.metricsDisabled { + cm.StartStoreCapture(req.TaskID) + defer cm.StopStoreCapture(req.TaskID) + } start := time.Now() firstPassSymbols, totalSymbols, err := p.storeCascadeSymbolsAndData(ctx, req.TaskID, req.ActionID, req.SymbolsDir, req.IDFiles) diff --git a/supernode/services/cascade/config.go b/supernode/services/cascade/config.go index 7a0f1ef2..01401d41 100644 --- a/supernode/services/cascade/config.go +++ b/supernode/services/cascade/config.go @@ -8,6 +8,8 @@ import ( type Config struct { common.Config `mapstructure:",squash" json:"-"` - RaptorQServiceAddress string `mapstructure:"-" json:"-"` - RqFilesDir string `mapstructure:"rq_files_dir" json:"rq_files_dir,omitempty"` + RaptorQServiceAddress string `mapstructure:"-" json:"-"` + RqFilesDir string `mapstructure:"rq_files_dir" json:"rq_files_dir,omitempty"` + // MetricsDisabled toggles upload/download metrics for cascade service + MetricsDisabled bool `mapstructure:"-" json:"-"` } diff --git a/supernode/services/cascade/download.go b/supernode/services/cascade/download.go index 363834bc..9da3dc1e 100644 --- a/supernode/services/cascade/download.go +++ b/supernode/services/cascade/download.go @@ -165,9 +165,11 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( } logtrace.Info(ctx, "Retrieving target-required symbols for decode", fields) - // Enable retrieve metrics capture for this action - cm.StartRetrieveCapture(actionID) - defer cm.StopRetrieveCapture(actionID) + + if !task.config.MetricsDisabled { + cm.StartRetrieveCapture(actionID) + defer cm.StopRetrieveCapture(actionID) + } // Measure symbols batch retrieve duration retrieveStart := time.Now() @@ -201,17 +203,22 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( } decodeMS := time.Since(decodeStart).Milliseconds() - // Set minimal retrieve summary and emit event strictly from internal collector - cm.SetRetrieveSummary(actionID, retrieveMS, decodeMS) - payload := cm.BuildDownloadEventPayloadFromCollector(actionID) - if retrieve, ok := payload["retrieve"].(map[string]any); ok { - retrieve["target_required_percent"] = targetRequiredPercent - retrieve["target_required_count"] = targetRequiredCount - retrieve["total_symbols"] = totalSymbols - } - if b, err := json.MarshalIndent(payload, "", " "); err == nil { - task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), "", "", send) - } + // Set minimal retrieve summary and emit event strictly from internal collector + if !task.config.MetricsDisabled { + cm.SetRetrieveSummary(actionID, retrieveMS, decodeMS) + payload := cm.BuildDownloadEventPayloadFromCollector(actionID) + if retrieve, ok := payload["retrieve"].(map[string]any); ok { + retrieve["target_required_percent"] = targetRequiredPercent + retrieve["target_required_count"] = targetRequiredCount + retrieve["total_symbols"] = totalSymbols + } + if b, err := json.MarshalIndent(payload, "", " "); err == nil { + task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), "", "", send) + } + } else { + // Send minimal hardcoded event when metrics disabled + task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, "Download completed (metrics disabled)", "", "", send) + } fileHash, err := crypto.HashFileIncrementally(decodeInfo.FilePath, 0) if err != nil { diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go index dd6e1e77..4739e0d9 100644 --- a/supernode/services/cascade/register.go +++ b/supernode/services/cascade/register.go @@ -1,11 +1,11 @@ package cascade import ( - "context" - "os" + "context" + "os" - "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" - "github.com/LumeraProtocol/supernode/v2/supernode/services/common" + "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" + "github.com/LumeraProtocol/supernode/v2/supernode/services/common" ) // RegisterRequest contains parameters for upload request @@ -162,8 +162,10 @@ func (task *CascadeRegistrationTask) Register( if err := task.storeArtefacts(ctx, action.ActionID, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields); err != nil { return err } - // Emit compact analytics payload from centralized metrics collector - task.emitArtefactsStored(ctx, fields, encResp.Metadata, send) + // Emit compact analytics payload from centralized metrics collector (optional) + if !task.config.MetricsDisabled { + task.emitArtefactsStored(ctx, fields, encResp.Metadata, send) + } resp, err := task.LumeraClient.FinalizeAction(ctx, action.ActionID, rqidResp.RQIDs) if err != nil { diff --git a/supernode/services/cascade/service.go b/supernode/services/cascade/service.go index a1d9898b..b5b2870a 100644 --- a/supernode/services/cascade/service.go +++ b/supernode/services/cascade/service.go @@ -56,11 +56,11 @@ func (service *CascadeService) GetRunningTasks() []string { // NewCascadeService returns a new CascadeService instance func NewCascadeService(config *Config, lumera lumera.Client, p2pClient p2p.Client, codec codec.Codec, rqstore rqstore.Store) *CascadeService { - return &CascadeService{ - config: config, - SuperNodeService: base.NewSuperNodeService(p2pClient), - LumeraClient: adaptors.NewLumeraClient(lumera), - P2P: adaptors.NewP2PService(p2pClient, rqstore), - RQ: adaptors.NewCodecService(codec), - } + return &CascadeService{ + config: config, + SuperNodeService: base.NewSuperNodeService(p2pClient), + LumeraClient: adaptors.NewLumeraClient(lumera), + P2P: adaptors.NewP2PService(p2pClient, rqstore, config.MetricsDisabled), + RQ: adaptors.NewCodecService(codec), + } } diff --git a/tests/integration/p2p/p2p_integration_test.go b/tests/integration/p2p/p2p_integration_test.go index bce71f58..d5df6dc2 100644 --- a/tests/integration/p2p/p2p_integration_test.go +++ b/tests/integration/p2p/p2p_integration_test.go @@ -108,7 +108,7 @@ func TestP2PBasicIntegration(t *testing.T) { // Add debug logging log.Printf("Storing batch with keys: %v", expectedKeys) - err := services[0].StoreBatch(ctx, batchData, 0, taskID) + err := services[0].StoreBatch(ctx, batchData, 0, taskID) require.NoError(t, err) // Add immediate verification @@ -203,7 +203,8 @@ func SetupTestP2PNodes(t *testing.T, ctx context.Context) ([]p2p.Client, []*rqst require.NoError(t, err, "failed to create rqstore for node %d: %v", i, err) rqStores = append(rqStores, rqStore) - service, err := p2p.New(ctx, p2pConfig, mockClient, kr, rqStore, nil, nil) + // Disable metrics in integration tests by default + service, err := p2p.New(ctx, p2pConfig, mockClient, kr, rqStore, nil, nil, true) require.NoError(t, err, "failed to create p2p service for node %d: %v", i, err) // Start P2P service