diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 61fe0ee..2c744f2 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -684,6 +684,7 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result } func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, localOnly ...bool) (result map[string][]byte, err error) { + start := time.Now() logtrace.Debug(ctx, "DHT BatchRetrieve begin", logtrace.Fields{"txid": txID, "keys": len(keys), "required": required}) result = make(map[string][]byte) var resMap sync.Map @@ -731,16 +732,20 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, // Found locally count is logged via summary below; no external metrics if foundLocalCount >= required { - logtrace.Debug(ctx, "DHT BatchRetrieve satisfied from local storage", logtrace.Fields{ - "txid": txID, "found_local": foundLocalCount, "required": required, - }) + f := logtrace.Fields{"txid": txID, "found_local": foundLocalCount, "found_network": 0, "required": required, "ms": time.Since(start).Milliseconds(), logtrace.FieldRole: "client"} + if o := logtrace.OriginFromContext(ctx); o != "" { + f[logtrace.FieldOrigin] = o + } + logtrace.Info(ctx, "dht: batch retrieve summary", f) return result, nil } if len(localOnly) > 0 && localOnly[0] { - logtrace.Debug(ctx, "DHT BatchRetrieve local-only mode, insufficient keys", logtrace.Fields{ - "txid": txID, "found_local": foundLocalCount, "required": required, - }) + f := logtrace.Fields{"txid": txID, "found_local": foundLocalCount, "found_network": 0, "required": required, "ms": time.Since(start).Milliseconds(), logtrace.FieldRole: "client"} + if o := logtrace.OriginFromContext(ctx); o != "" { + f[logtrace.FieldOrigin] = o + } + logtrace.Info(ctx, "dht: batch retrieve summary", f) return result, fmt.Errorf("local-only: found %d, required %d", foundLocalCount, required) } @@ -839,13 +844,13 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, wg.Wait() netFound := int(atomic.LoadInt32(&networkFound)) - { - f := logtrace.Fields{"txid": txID, "found_local": foundLocalCount, "found_network": netFound, "required": required, "ms": time.Since(netStart).Milliseconds(), logtrace.FieldRole: "client"} - if o := logtrace.OriginFromContext(ctx); o != "" { - f[logtrace.FieldOrigin] = o - } - logtrace.Info(ctx, "dht: batch retrieve summary", f) + + f := logtrace.Fields{"txid": txID, "found_local": foundLocalCount, "found_network": netFound, "required": required, "ms": time.Since(start).Milliseconds(), logtrace.FieldRole: "client"} + if o := logtrace.OriginFromContext(ctx); o != "" { + f[logtrace.FieldOrigin] = o } + logtrace.Info(ctx, "dht: batch retrieve summary", f) + // Record batch retrieve stats for internal DHT snapshot window (network phase only) s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Since(netStart)) // No per-task metrics collector updates diff --git a/sdk/adapters/lumera/adapter.go b/sdk/adapters/lumera/adapter.go index 0317e16..1a3dc72 100644 --- a/sdk/adapters/lumera/adapter.go +++ b/sdk/adapters/lumera/adapter.go @@ -289,7 +289,8 @@ func (a *Adapter) GetSupernodes(ctx context.Context, height int64) ([]Supernode, resp, err := a.client.SuperNode().GetTopSuperNodesForBlock(ctx, &sntypes.QueryGetTopSuperNodesForBlockRequest{ BlockHeight: int32(blockHeight), - //TODO : Update after hotfix on chain + State: string(SUPERNODE_STATE_ACTIVE), + Limit: 10, }) if err != nil { a.logger.Error(ctx, "Failed to get supernodes", "height", height, "error", err) @@ -377,7 +378,9 @@ func toSdkAction(resp *actiontypes.QueryGetActionResponse) Action { } } +// func toSdkSupernodes(resp *sntypes.QueryListSuperNodesResponse) []Supernode { func toSdkSupernodes(resp *sntypes.QueryGetTopSuperNodesForBlockResponse) []Supernode { + var result []Supernode for _, sn := range resp.Supernodes { ipAddress, err := getLatestIP(sn) diff --git a/sdk/task/timeouts.go b/sdk/task/timeouts.go index 4498fda..9d8e537 100644 --- a/sdk/task/timeouts.go +++ b/sdk/task/timeouts.go @@ -4,4 +4,4 @@ import "time" // connectionTimeout bounds supernode health/connection probing. // Keep this short to preserve snappy discovery without impacting long uploads. -const connectionTimeout = 10 * time.Second +const connectionTimeout = 30 * time.Second