From b108e944e4dff2def90296acfe77158a4e28533d Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Thu, 23 Oct 2025 14:24:34 +0500 Subject: [PATCH] xor-rank --- p2p/kademlia/dht.go | 8 +- sdk/task/cascade.go | 33 +++--- sdk/task/download.go | 38 ++----- sdk/task/helpers.go | 22 +--- sdk/task/task.go | 234 ++++++++++++++++++------------------------- 5 files changed, 125 insertions(+), 210 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index bf7a7fd0..13615deb 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -1395,16 +1395,16 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node { if node.IP == "" || node.IP == "0.0.0.0" || (!isIntegrationTest && node.IP == "127.0.0.1") { logtrace.Info(ctx, "Rejecting node: invalid IP", logtrace.Fields{ logtrace.FieldModule: "p2p", - "ip": node.IP, - "node": node.String(), - "integration_test": isIntegrationTest, + "ip": node.IP, + "node": node.String(), + "integration_test": isIntegrationTest, }) return nil } if bytes.Equal(node.ID, s.ht.self.ID) { logtrace.Info(ctx, "Rejecting node: is self", logtrace.Fields{ logtrace.FieldModule: "p2p", - "node": node.String(), + "node": node.String(), }) return nil } diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index c10614d3..a2cdcd3a 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -46,12 +46,8 @@ func (t *CascadeTask) Run(ctx context.Context) error { return err } - // Initial concurrent balance filter (one-time) - supernodes = t.filterByMinBalance(ctx, supernodes) - - // Rank by available free RAM (descending). Unknown RAM stays after known. - supernodes = t.orderByFreeRAM(ctx, supernodes) - t.LogEvent(ctx, event.SDKSupernodesFound, "Supernodes found.", event.EventData{event.KeyCount: len(supernodes)}) + // Log available candidates; streaming will happen within registration + t.LogEvent(ctx, event.SDKSupernodesFound, "Supernodes fetched", event.EventData{event.KeyCount: len(supernodes)}) // 2 - Register with the supernodes if err := t.registerWithSupernodes(ctx, supernodes); err != nil { @@ -77,15 +73,18 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum TaskId: t.TaskID, } + // Strict XOR-first qualification and attempts + fileSize := getFileSizeBytes(t.filePath) + var minRam uint64 + if fileSize > 0 { + minRam = uint64(fileSize) * uploadRAMMultiplier + } + ordered := t.orderByXORDistance(ctx, supernodes) + var lastErr error attempted := 0 - // Work on a copy; re-rank by free RAM between attempts - remaining := append(lumera.Supernodes(nil), supernodes...) - for len(remaining) > 0 { - // Re-rank remaining nodes by available RAM (descending) - remaining = t.orderByFreeRAM(ctx, remaining) - sn := remaining[0] - iteration := attempted + 1 + for i, sn := range ordered { + iteration := i + 1 t.LogEvent(ctx, event.SDKRegistrationAttempt, "attempting registration with supernode", event.EventData{ event.KeySupernode: sn.GrpcEndpoint, @@ -94,10 +93,8 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum }) // Re-check serving status just-in-time to avoid calling a node that became down/underpeered - if !t.isServing(ctx, sn) { - t.logger.Info(ctx, "skip supernode: not serving", "supernode", sn.GrpcEndpoint, "sn-address", sn.CosmosAddress, "iteration", iteration) - // Drop this node and retry with the rest - remaining = remaining[1:] + // Ensure node qualifies before attempt + if !t.nodeQualifies(ctx, sn, minStorageThresholdBytes, minRam) { continue } @@ -110,8 +107,6 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum event.KeyError: err.Error(), }) lastErr = err - // Drop this node and retry with the rest (re-ranked next loop) - remaining = remaining[1:] continue } diff --git a/sdk/task/download.go b/sdk/task/download.go index ed9d98ef..d9b2d800 100644 --- a/sdk/task/download.go +++ b/sdk/task/download.go @@ -43,12 +43,8 @@ func (t *CascadeDownloadTask) Run(ctx context.Context) error { t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()}) return err } - // Initial concurrent balance filter (one-time) - supernodes = t.filterByMinBalance(ctx, supernodes) - - // Rank by available free RAM (descending). Unknown RAM stays after known. - supernodes = t.orderByFreeRAM(ctx, supernodes) - t.LogEvent(ctx, event.SDKSupernodesFound, "super-nodes found", event.EventData{event.KeyCount: len(supernodes)}) + // Log available candidates; streaming will happen within download phase + t.LogEvent(ctx, event.SDKSupernodesFound, "super-nodes fetched", event.EventData{event.KeyCount: len(supernodes)}) // 2 – download from super-nodes if err := t.downloadFromSupernodes(ctx, supernodes); err != nil { @@ -81,15 +77,13 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern } } - // Try supernodes sequentially with re-ranking between attempts + // Strict XOR-first qualification and attempts (downloads: storage-only threshold) + ordered := t.orderByXORDistance(ctx, supernodes) + var lastErr error - remaining := append(lumera.Supernodes(nil), supernodes...) attempted := 0 - for len(remaining) > 0 { - // Re-rank remaining nodes by available RAM (descending) - remaining = t.orderByFreeRAM(ctx, remaining) - sn := remaining[0] - iteration := attempted + 1 + for i, sn := range ordered { + iteration := i + 1 // Log download attempt t.LogEvent(ctx, event.SDKDownloadAttempt, "attempting download from super-node", event.EventData{ @@ -98,10 +92,8 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern event.KeyIteration: iteration, }) - // Re-check serving status just-in-time to avoid calling a node that became down/underpeered - if !t.isServing(ctx, sn) { - t.logger.Info(ctx, "skip supernode: not serving", "supernode", sn.GrpcEndpoint, "sn-address", sn.CosmosAddress, "iteration", iteration) - remaining = remaining[1:] + // Ensure node qualifies before attempt + if !t.nodeQualifies(ctx, sn, minStorageThresholdBytes, 0) { continue } @@ -115,7 +107,6 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern event.KeyError: err.Error(), }) lastErr = err - remaining = remaining[1:] continue } @@ -135,17 +126,6 @@ func (t *CascadeDownloadTask) attemptDownload( factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeDownloadRequest, ) error { - // Recheck liveness/busyness just before attempting download to handle delays - if !t.isServing(parent, sn) { - // Emit a concise event; detailed rejection reasons are logged inside isServing - t.LogEvent(parent, event.SDKDownloadFailure, "precheck: supernode not serving/busy", event.EventData{ - event.KeySupernode: sn.GrpcEndpoint, - event.KeySupernodeAddress: sn.CosmosAddress, - event.KeyReason: "precheck_not_serving_or_busy", - }) - return fmt.Errorf("precheck: supernode not serving/busy") - } - ctx, cancel := context.WithTimeout(parent, downloadTimeout) defer cancel() diff --git a/sdk/task/helpers.go b/sdk/task/helpers.go index 1612f12d..2e9ee4c3 100644 --- a/sdk/task/helpers.go +++ b/sdk/task/helpers.go @@ -6,9 +6,7 @@ import ( "fmt" "math/big" "os" - "path/filepath" "sort" - "strings" "github.com/LumeraProtocol/supernode/v2/pkg/utils" "github.com/LumeraProtocol/supernode/v2/sdk/adapters/lumera" @@ -50,7 +48,7 @@ func (m *ManagerImpl) validateAction(ctx context.Context, actionID string) (lume } // validateSignature verifies the authenticity of a signature against an action's data hash. -// + // This function performs the following steps: // 1. Decodes the CASCADE metadata from the provided Lumera action // 2. Extracts the base64-encoded data hash from the metadata @@ -103,7 +101,7 @@ func (m *ManagerImpl) validateSignature(ctx context.Context, action lumera.Actio return nil } -// (Removed) Peers connectivity preflight is now enforced during discovery in isServing. +// func (m *ManagerImpl) validateDownloadAction(ctx context.Context, actionID string) (lumera.Action, error) { action, err := m.lumeraClient.GetAction(ctx, actionID) @@ -124,22 +122,6 @@ func (m *ManagerImpl) validateDownloadAction(ctx context.Context, actionID strin return action, nil } -// Helper function to ensure output path has the correct filename -func ensureOutputPathWithFilename(outputPath, filename string) string { - // If outputPath is empty, just return the filename - if outputPath == "" { - return filename - } - - // Check if the path already ends with the filename - if strings.HasSuffix(outputPath, filename) { - return outputPath - } - - // Otherwise, append the filename to the path - return filepath.Join(outputPath, filename) -} - func orderSupernodesByDeterministicDistance(seed string, sns lumera.Supernodes) lumera.Supernodes { if len(sns) == 0 || seed == "" { return sns diff --git a/sdk/task/task.go b/sdk/task/task.go index bce7cf1b..8588c3f3 100644 --- a/sdk/task/task.go +++ b/sdk/task/task.go @@ -4,8 +4,7 @@ import ( "context" "errors" "fmt" - "sort" - "sync" + "os" sdkmath "cosmossdk.io/math" txmod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" @@ -26,6 +25,14 @@ const ( TaskTypeCascade TaskType = "CASCADE" ) +// Package-level thresholds and tuning +const ( + // Minimum available storage required on any volume (bytes) + minStorageThresholdBytes uint64 = 50 * 1024 * 1024 * 1024 // 50 GB + // Upload requires free RAM to be at least 8x the file size + uploadRAMMultiplier uint64 = 8 +) + // EventCallback is a function that processes events from tasks type EventCallback func(ctx context.Context, e event.Event) @@ -76,35 +83,6 @@ func (t *BaseTask) LogEvent(ctx context.Context, evt event.EventType, msg string t.emitEvent(ctx, evt, additionalInfo) } -// isServing pings the super-node once with a short timeout. -func (t *BaseTask) isServing(parent context.Context, sn lumera.Supernode) bool { - ctx, cancel := context.WithTimeout(parent, connectionTimeout) - defer cancel() - - client, err := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, net.FactoryConfig{ - LocalCosmosAddress: t.config.Account.LocalCosmosAddress, - PeerType: t.config.Account.PeerType, - }).CreateClient(ctx, sn) - if err != nil { - t.logger.Info(ctx, "reject supernode: client create failed", "reason", err.Error(), "endpoint", sn.GrpcEndpoint, "cosmos", sn.CosmosAddress) - return false - } - defer client.Close(ctx) - - // First check gRPC health - resp, err := client.HealthCheck(ctx) - if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { - statusStr := "nil" - if resp != nil { - statusStr = resp.Status.String() - } - t.logger.Info(ctx, "reject supernode: health not SERVING", "error", err, "status", statusStr) - return false - } - return true -} - -// No health, status, balance or load checks are done here. func (t *BaseTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Supernodes, error) { sns, err := t.client.GetSupernodes(ctx, height) if err != nil { @@ -116,128 +94,108 @@ func (t *BaseTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Su return sns, nil } -func (t *BaseTask) orderByFreeRAM(parent context.Context, sns lumera.Supernodes) lumera.Supernodes { +// orderByXORDistance ranks supernodes by XOR distance to the action's data hash. +// If decoding metadata fails, falls back to using the action ID as the seed. +func (t *BaseTask) orderByXORDistance(ctx context.Context, sns lumera.Supernodes) lumera.Supernodes { if len(sns) <= 1 { return sns } - - type scored struct { - idx int - sn lumera.Supernode - ramGb float64 - known bool - } - - out := make([]scored, len(sns)) - // Best-effort parallel status fetch; do not filter or fail. - // We intentionally avoid health/peer/balance checks here. - for i, sn := range sns { - out[i] = scored{idx: i, sn: sn, ramGb: 0, known: false} - } - - // Query in parallel with a short timeout to avoid blocking too long per node - // Reuse the connectionTimeout constant for symmetry with health probes. - type result struct { - i int - ram float64 - ok bool - } - ch := make(chan result, len(sns)) - for i, sn := range sns { - i, sn := i, sn - go func() { - cctx, cancel := context.WithTimeout(parent, connectionTimeout) - defer cancel() - client, err := net.NewClientFactory(cctx, t.logger, t.keyring, t.client, net.FactoryConfig{ - LocalCosmosAddress: t.config.Account.LocalCosmosAddress, - PeerType: t.config.Account.PeerType, - }).CreateClient(cctx, sn) - if err != nil { - ch <- result{i: i, ram: 0, ok: false} - return - } - defer client.Close(cctx) - - status, err := client.GetSupernodeStatus(cctx) - if err != nil || status == nil { - ch <- result{i: i, ram: 0, ok: false} - return - } - res := status.GetResources() - if res == nil || res.GetMemory() == nil { - ch <- result{i: i, ram: 0, ok: false} - return - } - ch <- result{i: i, ram: res.GetMemory().GetAvailableGb(), ok: true} - }() - } - // Collect results with a cap bounded by len(sns) - for k := 0; k < len(sns); k++ { - r := <-ch - if r.ok { - out[r.i].ramGb = r.ram - out[r.i].known = true + // Try to decode the action metadata to get the Cascade data hash as seed + seed := t.ActionID + if t.client != nil && (t.Action.Metadata != nil || t.Action.ActionType != "") { + if meta, err := t.client.DecodeCascadeMetadata(ctx, t.Action); err == nil && meta.DataHash != "" { + seed = meta.DataHash } } + return orderSupernodesByDeterministicDistance(seed, sns) +} - // Known RAM first, then by RAM desc. For ties and unknowns, preserve original order. - sort.SliceStable(out, func(i, j int) bool { - ai, aj := out[i], out[j] - if ai.known != aj.known { - return ai.known - } - if ai.known && aj.known && ai.ramGb != aj.ramGb { - return ai.ramGb > aj.ramGb - } - return ai.idx < aj.idx - }) +// filterByResourceThresholds removes supernodes that do not satisfy minimum +// available storage and free RAM thresholds. +// - minStorageBytes: minimum available storage on any volume (bytes) +// - minFreeRamBytes: minimum free RAM (bytes). If 0, RAM check is skipped. - res := make(lumera.Supernodes, len(out)) - for i := range out { - res[i] = out[i].sn +// helper: get file size (bytes). returns 0 on error +func getFileSizeBytes(p string) int64 { + fi, err := os.Stat(p) + if err != nil { + return 0 } - return res + return fi.Size() } -// filterByMinBalance filters supernodes by requiring at least a minimum balance -// in the default fee denom. This runs concurrently and is intended to be used -// once during initial discovery only. -func (t *BaseTask) filterByMinBalance(parent context.Context, sns lumera.Supernodes) lumera.Supernodes { - if len(sns) == 0 { - return sns +// nodeQualifies performs balance, health, and resource checks for a supernode. +func (t *BaseTask) nodeQualifies(parent context.Context, sn lumera.Supernode, minStorageBytes uint64, minFreeRamBytes uint64) bool { + // 1) Balance check (require at least 1 LUME) + if !t.balanceOK(parent, sn) { + return false } - // Require at least 1 LUME = 10^6 ulume by default. - min := sdkmath.NewInt(1_000_000) + + // 2) Health + resources via a single client session + ctx, cancel := context.WithTimeout(parent, connectionTimeout) + defer cancel() + client, err := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, net.FactoryConfig{ + LocalCosmosAddress: t.config.Account.LocalCosmosAddress, + PeerType: t.config.Account.PeerType, + }).CreateClient(ctx, sn) + if err != nil { + return false + } + defer client.Close(ctx) + + // Health check + h, err := client.HealthCheck(ctx) + if err != nil || h == nil || h.Status != grpc_health_v1.HealthCheckResponse_SERVING { + return false + } + + // Resource thresholds + return t.resourcesOK(ctx, client, sn, minStorageBytes, minFreeRamBytes) +} + +func (t *BaseTask) balanceOK(parent context.Context, sn lumera.Supernode) bool { + ctx, cancel := context.WithTimeout(parent, connectionTimeout) + defer cancel() + min := sdkmath.NewInt(1_000_000) // 1 LUME in ulume denom := txmod.DefaultFeeDenom + bal, err := t.client.GetBalance(ctx, sn.CosmosAddress, denom) + if err != nil || bal == nil || bal.Balance == nil { + return false + } + if bal.Balance.Amount.LT(min) { + return false + } + return true +} - keep := make([]bool, len(sns)) - var wg sync.WaitGroup - wg.Add(len(sns)) - for i, sn := range sns { - i, sn := i, sn - go func() { - defer wg.Done() - ctx, cancel := context.WithTimeout(parent, connectionTimeout) - defer cancel() - bal, err := t.client.GetBalance(ctx, sn.CosmosAddress, denom) - if err != nil || bal == nil || bal.Balance == nil { - t.logger.Info(ctx, "reject supernode: balance fetch failed or empty", "error", err, "address", sn.CosmosAddress) - return - } - if bal.Balance.Amount.LT(min) { - t.logger.Info(ctx, "reject supernode: insufficient balance", "amount", bal.Balance.Amount.String(), "min", min.String(), "address", sn.CosmosAddress) - return +func (t *BaseTask) resourcesOK(ctx context.Context, client net.SupernodeClient, sn lumera.Supernode, minStorageBytes uint64, minFreeRamBytes uint64) bool { + status, err := client.GetSupernodeStatus(ctx) + if err != nil || status == nil || status.Resources == nil { + return false + } + // Storage: any volume must satisfy available >= minStorageBytes + if minStorageBytes > 0 { + ok := false + for _, vol := range status.Resources.StorageVolumes { + if vol != nil && vol.AvailableBytes >= minStorageBytes { + ok = true + break } - keep[i] = true - }() + } + if !ok { + return false + } } - wg.Wait() - - out := make(lumera.Supernodes, 0, len(sns)) - for i, sn := range sns { - if keep[i] { - out = append(out, sn) + // RAM: available_gb must be >= required GiB + if minFreeRamBytes > 0 { + mem := status.Resources.Memory + if mem == nil { + return false + } + requiredGiB := float64(minFreeRamBytes) / (1024.0 * 1024.0 * 1024.0) + if mem.AvailableGb < requiredGiB { + return false } } - return out + return true }