diff --git a/p2p/kademlia/node.go b/p2p/kademlia/node.go index 0011c8be..23ec7b8f 100644 --- a/p2p/kademlia/node.go +++ b/p2p/kademlia/node.go @@ -23,8 +23,8 @@ type Node struct { // port of the node Port uint16 `json:"port,omitempty"` - // Version of the supernode binary (advertised to peers; may be used by min-version gating) - Version string `json:"version,omitempty"` + // Version of the supernode binary (advertised to peers; may be used by min-version gating) + Version string `json:"version,omitempty"` HashedID []byte } diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index 68184ea7..c10614d3 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -37,8 +37,8 @@ func (t *CascadeTask) Run(ctx context.Context) error { return err } - // 1 - Fetch the supernodes (single-pass probe: sanitize + load snapshot) - supernodes, loads, err := t.fetchSupernodesWithLoads(ctx, t.Action.Height) + // 1 - Fetch the supernodes + supernodes, err := t.fetchSupernodes(ctx, t.Action.Height) if err != nil { t.LogEvent(ctx, event.SDKSupernodesUnavailable, "Supernodes unavailable", event.EventData{event.KeyError: err.Error()}) @@ -46,8 +46,11 @@ func (t *CascadeTask) Run(ctx context.Context) error { return err } - // Rank by current load snapshot (fewest first), tie-break deterministically - supernodes = t.orderByLoadSnapshotThenDeterministic(supernodes, loads) + // Initial concurrent balance filter (one-time) + supernodes = t.filterByMinBalance(ctx, supernodes) + + // Rank by available free RAM (descending). Unknown RAM stays after known. + supernodes = t.orderByFreeRAM(ctx, supernodes) t.LogEvent(ctx, event.SDKSupernodesFound, "Supernodes found.", event.EventData{event.KeyCount: len(supernodes)}) // 2 - Register with the supernodes @@ -76,11 +79,11 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum var lastErr error attempted := 0 - // Work on a copy and re-rank between attempts to avoid stale ordering + // Work on a copy; re-rank by free RAM between attempts remaining := append(lumera.Supernodes(nil), supernodes...) for len(remaining) > 0 { - // Refresh load-aware ordering for remaining candidates - remaining = t.orderByLoadThenDeterministic(ctx, remaining) + // Re-rank remaining nodes by available RAM (descending) + remaining = t.orderByFreeRAM(ctx, remaining) sn := remaining[0] iteration := attempted + 1 diff --git a/sdk/task/download.go b/sdk/task/download.go index eb9ad8eb..ed9d98ef 100644 --- a/sdk/task/download.go +++ b/sdk/task/download.go @@ -36,15 +36,18 @@ func NewCascadeDownloadTask(base BaseTask, actionId string, outputPath string, s func (t *CascadeDownloadTask) Run(ctx context.Context) error { t.LogEvent(ctx, event.SDKTaskStarted, "Running cascade download task", nil) - // 1 – fetch super-nodes (single-pass probe: sanitize + load snapshot) - supernodes, loads, err := t.fetchSupernodesWithLoads(ctx, t.Action.Height) + // 1 – fetch super-nodes (plain) + supernodes, err := t.fetchSupernodes(ctx, t.Action.Height) if err != nil { t.LogEvent(ctx, event.SDKSupernodesUnavailable, "super-nodes unavailable", event.EventData{event.KeyError: err.Error()}) t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()}) return err } - // Rank by current load snapshot (fewest first), tie-break deterministically - supernodes = t.orderByLoadSnapshotThenDeterministic(supernodes, loads) + // Initial concurrent balance filter (one-time) + supernodes = t.filterByMinBalance(ctx, supernodes) + + // Rank by available free RAM (descending). Unknown RAM stays after known. + supernodes = t.orderByFreeRAM(ctx, supernodes) t.LogEvent(ctx, event.SDKSupernodesFound, "super-nodes found", event.EventData{event.KeyCount: len(supernodes)}) // 2 – download from super-nodes @@ -83,7 +86,8 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern remaining := append(lumera.Supernodes(nil), supernodes...) attempted := 0 for len(remaining) > 0 { - remaining = t.orderByLoadThenDeterministic(ctx, remaining) + // Re-rank remaining nodes by available RAM (descending) + remaining = t.orderByFreeRAM(ctx, remaining) sn := remaining[0] iteration := attempted + 1 diff --git a/sdk/task/task.go b/sdk/task/task.go index 1779a93a..bce7cf1b 100644 --- a/sdk/task/task.go +++ b/sdk/task/task.go @@ -8,7 +8,6 @@ import ( "sync" sdkmath "cosmossdk.io/math" - "github.com/LumeraProtocol/supernode/v2/pkg/errgroup" txmod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" "github.com/LumeraProtocol/supernode/v2/sdk/adapters/lumera" "github.com/LumeraProtocol/supernode/v2/sdk/config" @@ -77,8 +76,6 @@ func (t *BaseTask) LogEvent(ctx context.Context, evt event.EventType, msg string t.emitEvent(ctx, evt, additionalInfo) } -// (removed) fetchSupernodes: replaced by fetchSupernodesWithLoads single-pass probe - // isServing pings the super-node once with a short timeout. func (t *BaseTask) isServing(parent context.Context, sn lumera.Supernode) bool { ctx, cancel := context.WithTimeout(parent, connectionTimeout) @@ -104,266 +101,95 @@ func (t *BaseTask) isServing(parent context.Context, sn lumera.Supernode) bool { t.logger.Info(ctx, "reject supernode: health not SERVING", "error", err, "status", statusStr) return false } - - // Then check P2P peers count via status - status, err := client.GetSupernodeStatus(ctx) - if err != nil { - t.logger.Info(ctx, "reject supernode: status fetch failed", "error", err) - return false - } - if status.Network.PeersCount <= 1 { - t.logger.Info(ctx, "reject supernode: insufficient peers", "peers_count", status.Network.PeersCount) - return false - } - - denom := txmod.DefaultFeeDenom // base denom (micro), e.g., "ulume" - bal, err := t.client.GetBalance(ctx, sn.CosmosAddress, denom) - if err != nil || bal == nil || bal.Balance == nil { - t.logger.Info(ctx, "reject supernode: balance fetch failed or empty", "error", err) - return false - } - // Require at least 1 LUME = 10^6 micro (ulume) - min := sdkmath.NewInt(1_000_000) - if bal.Balance.Amount.LT(min) { - t.logger.Info(ctx, "reject supernode: insufficient balance", "amount", bal.Balance.Amount.String(), "min", min.String()) - return false - } - return true } -// fetchSupernodesWithLoads performs a single-pass probe that both sanitizes candidates -// and captures their current running task load for initial ranking. -// Returns the healthy supernodes and a map of node-key -> load. -func (t *BaseTask) fetchSupernodesWithLoads(ctx context.Context, height int64) (lumera.Supernodes, map[string]int, error) { +// No health, status, balance or load checks are done here. +func (t *BaseTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Supernodes, error) { sns, err := t.client.GetSupernodes(ctx, height) if err != nil { - return nil, nil, fmt.Errorf("fetch supernodes: %w", err) + return nil, fmt.Errorf("fetch supernodes: %w", err) } if len(sns) == 0 { - return nil, nil, errors.New("no supernodes found") + return nil, errors.New("no supernodes found") } - - healthy := make(lumera.Supernodes, 0, len(sns)) - loads := make(map[string]int, len(sns)) - mu := sync.Mutex{} - - eg, ctx := errgroup.WithContext(ctx) - for _, sn := range sns { - sn := sn - eg.Go(func() error { - cctx, cancel := context.WithTimeout(ctx, connectionTimeout) - defer cancel() - - client, err := net.NewClientFactory(cctx, t.logger, t.keyring, t.client, net.FactoryConfig{ - LocalCosmosAddress: t.config.Account.LocalCosmosAddress, - PeerType: t.config.Account.PeerType, - }).CreateClient(cctx, sn) - if err != nil { - t.logger.Info(cctx, "reject supernode: client create failed", "reason", err.Error(), "endpoint", sn.GrpcEndpoint, "cosmos", sn.CosmosAddress) - return nil - } - defer client.Close(cctx) - - // Health - resp, err := client.HealthCheck(cctx) - if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { - statusStr := "nil" - if resp != nil { - statusStr = resp.Status.String() - } - t.logger.Info(cctx, "reject supernode: health not SERVING", "error", err, "status", statusStr) - return nil - } - - // Status (for peers + load) - status, err := client.GetSupernodeStatus(cctx) - if err != nil { - t.logger.Info(cctx, "reject supernode: status fetch failed", "error", err) - return nil - } - // Removed SDK-level version gating; rely on network/node policies instead. - - // Compute load from running tasks (sum of task_count across services) - total := 0 - for _, st := range status.GetRunningTasks() { - if st == nil { - continue - } - if c := int(st.GetTaskCount()); c > 0 { - total += c - } else if ids := st.GetTaskIds(); len(ids) > 0 { - total += len(ids) - } - } - - // Balance - denom := txmod.DefaultFeeDenom - bal, err := t.client.GetBalance(cctx, sn.CosmosAddress, denom) - if err != nil || bal == nil || bal.Balance == nil { - t.logger.Info(cctx, "reject supernode: balance fetch failed or empty", "error", err) - return nil - } - min := sdkmath.NewInt(1_000_000) - if bal.Balance.Amount.LT(min) { - t.logger.Info(cctx, "reject supernode: insufficient balance", "amount", bal.Balance.Amount.String(), "min", min.String()) - return nil - } - - // Accept - mu.Lock() - healthy = append(healthy, sn) - key := sn.CosmosAddress - if key == "" { - key = sn.GrpcEndpoint - } - loads[key] = total - mu.Unlock() - return nil - }) - } - if err := eg.Wait(); err != nil { - return nil, nil, fmt.Errorf("health-check goroutines: %w", err) - } - if len(healthy) == 0 { - return nil, nil, errors.New("no healthy supernodes found") - } - return healthy, loads, nil + return sns, nil } -// orderByLoadSnapshotThenDeterministic sorts using a provided load snapshot; nodes missing -// in the snapshot are considered unknown-load and placed after known-load nodes. -func (t *BaseTask) orderByLoadSnapshotThenDeterministic(sns lumera.Supernodes, loads map[string]int) lumera.Supernodes { +func (t *BaseTask) orderByFreeRAM(parent context.Context, sns lumera.Supernodes) lumera.Supernodes { if len(sns) <= 1 { return sns } - det := orderSupernodesByDeterministicDistance(t.ActionID, append(lumera.Supernodes(nil), sns...)) - idx := make(map[string]int, len(det)) - for i, sn := range det { - key := sn.CosmosAddress - if key == "" { - key = sn.GrpcEndpoint - } - idx[key] = i - } - type scored struct { - sn lumera.Supernode - load int - loadKnown bool - tieIdx int - } - arr := make([]scored, 0, len(sns)) - for _, sn := range sns { - key := sn.CosmosAddress - if key == "" { - key = sn.GrpcEndpoint - } - l, ok := loads[key] - arr = append(arr, scored{sn: sn, load: l, loadKnown: ok, tieIdx: idx[key]}) - } - - sort.Slice(arr, func(i, j int) bool { - ai, aj := arr[i], arr[j] - if ai.loadKnown != aj.loadKnown { - return ai.loadKnown - } - if ai.loadKnown && aj.loadKnown && ai.load != aj.load { - return ai.load < aj.load - } - return ai.tieIdx < aj.tieIdx - }) - - out := make(lumera.Supernodes, len(arr)) - for i := range arr { - out[i] = arr[i].sn + idx int + sn lumera.Supernode + ramGb float64 + known bool } - return out -} -// orderByLoadThenDeterministic ranks supernodes by their current running task count (ascending). -// Ties are broken deterministically using orderSupernodesByDeterministicDistance with ActionID as seed. -func (t *BaseTask) orderByLoadThenDeterministic(parent context.Context, sns lumera.Supernodes) lumera.Supernodes { - if len(sns) <= 1 { - return sns + out := make([]scored, len(sns)) + // Best-effort parallel status fetch; do not filter or fail. + // We intentionally avoid health/peer/balance checks here. + for i, sn := range sns { + out[i] = scored{idx: i, sn: sn, ramGb: 0, known: false} } - // Precompute deterministic tie-break order index per node - det := orderSupernodesByDeterministicDistance(t.ActionID, append(lumera.Supernodes(nil), sns...)) - idx := make(map[string]int, len(det)) - for i, sn := range det { - key := sn.CosmosAddress - if key == "" { - key = sn.GrpcEndpoint - } - idx[key] = i + // Query in parallel with a short timeout to avoid blocking too long per node + // Reuse the connectionTimeout constant for symmetry with health probes. + type result struct { + i int + ram float64 + ok bool } - - type scored struct { - sn lumera.Supernode - load int - loadKnown bool - tieIdx int - } - - out := make([]scored, len(sns)) - - // Collect loads in parallel under the same short connection timeout. - eg, ctx := errgroup.WithContext(parent) + ch := make(chan result, len(sns)) for i, sn := range sns { i, sn := i, sn - out[i] = scored{sn: sn, load: 0, loadKnown: false, tieIdx: func() int { - k := sn.CosmosAddress - if k == "" { - k = sn.GrpcEndpoint - } - return idx[k] - }()} - eg.Go(func() error { - cctx, cancel := context.WithTimeout(ctx, connectionTimeout) + go func() { + cctx, cancel := context.WithTimeout(parent, connectionTimeout) defer cancel() client, err := net.NewClientFactory(cctx, t.logger, t.keyring, t.client, net.FactoryConfig{ LocalCosmosAddress: t.config.Account.LocalCosmosAddress, PeerType: t.config.Account.PeerType, }).CreateClient(cctx, sn) if err != nil { - return nil // unknown load; keep candidate + ch <- result{i: i, ram: 0, ok: false} + return } defer client.Close(cctx) + status, err := client.GetSupernodeStatus(cctx) if err != nil || status == nil { - return nil + ch <- result{i: i, ram: 0, ok: false} + return } - // Sum total running tasks across services - total := 0 - for _, st := range status.GetRunningTasks() { - if st == nil { - continue - } - if c := int(st.GetTaskCount()); c > 0 { - total += c - } else if ids := st.GetTaskIds(); len(ids) > 0 { - total += len(ids) - } + res := status.GetResources() + if res == nil || res.GetMemory() == nil { + ch <- result{i: i, ram: 0, ok: false} + return } - out[i].load = total - out[i].loadKnown = true - return nil - }) + ch <- result{i: i, ram: res.GetMemory().GetAvailableGb(), ok: true} + }() + } + // Collect results with a cap bounded by len(sns) + for k := 0; k < len(sns); k++ { + r := <-ch + if r.ok { + out[r.i].ramGb = r.ram + out[r.i].known = true + } } - _ = eg.Wait() // best-effort; unknown loads are placed after known ones below - sort.Slice(out, func(i, j int) bool { + // Known RAM first, then by RAM desc. For ties and unknowns, preserve original order. + sort.SliceStable(out, func(i, j int) bool { ai, aj := out[i], out[j] - if ai.loadKnown != aj.loadKnown { - return ai.loadKnown // known loads first + if ai.known != aj.known { + return ai.known } - if ai.loadKnown && aj.loadKnown && ai.load != aj.load { - return ai.load < aj.load + if ai.known && aj.known && ai.ramGb != aj.ramGb { + return ai.ramGb > aj.ramGb } - // Tie-break deterministically - return ai.tieIdx < aj.tieIdx + return ai.idx < aj.idx }) res := make(lumera.Supernodes, len(out)) @@ -372,3 +198,46 @@ func (t *BaseTask) orderByLoadThenDeterministic(parent context.Context, sns lume } return res } + +// filterByMinBalance filters supernodes by requiring at least a minimum balance +// in the default fee denom. This runs concurrently and is intended to be used +// once during initial discovery only. +func (t *BaseTask) filterByMinBalance(parent context.Context, sns lumera.Supernodes) lumera.Supernodes { + if len(sns) == 0 { + return sns + } + // Require at least 1 LUME = 10^6 ulume by default. + min := sdkmath.NewInt(1_000_000) + denom := txmod.DefaultFeeDenom + + keep := make([]bool, len(sns)) + var wg sync.WaitGroup + wg.Add(len(sns)) + for i, sn := range sns { + i, sn := i, sn + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(parent, connectionTimeout) + defer cancel() + bal, err := t.client.GetBalance(ctx, sn.CosmosAddress, denom) + if err != nil || bal == nil || bal.Balance == nil { + t.logger.Info(ctx, "reject supernode: balance fetch failed or empty", "error", err, "address", sn.CosmosAddress) + return + } + if bal.Balance.Amount.LT(min) { + t.logger.Info(ctx, "reject supernode: insufficient balance", "amount", bal.Balance.Amount.String(), "min", min.String(), "address", sn.CosmosAddress) + return + } + keep[i] = true + }() + } + wg.Wait() + + out := make(lumera.Supernodes, 0, len(sns)) + for i, sn := range sns { + if keep[i] { + out = append(out, sn) + } + } + return out +}