Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,16 +1395,16 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node {
if node.IP == "" || node.IP == "0.0.0.0" || (!isIntegrationTest && node.IP == "127.0.0.1") {
logtrace.Info(ctx, "Rejecting node: invalid IP", logtrace.Fields{
logtrace.FieldModule: "p2p",
"ip": node.IP,
"node": node.String(),
"integration_test": isIntegrationTest,
"ip": node.IP,
"node": node.String(),
"integration_test": isIntegrationTest,
})
return nil
}
if bytes.Equal(node.ID, s.ht.self.ID) {
logtrace.Info(ctx, "Rejecting node: is self", logtrace.Fields{
logtrace.FieldModule: "p2p",
"node": node.String(),
"node": node.String(),
})
return nil
}
Expand Down
33 changes: 14 additions & 19 deletions sdk/task/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,8 @@ func (t *CascadeTask) Run(ctx context.Context) error {
return err
}

// Initial concurrent balance filter (one-time)
supernodes = t.filterByMinBalance(ctx, supernodes)

// Rank by available free RAM (descending). Unknown RAM stays after known.
supernodes = t.orderByFreeRAM(ctx, supernodes)
t.LogEvent(ctx, event.SDKSupernodesFound, "Supernodes found.", event.EventData{event.KeyCount: len(supernodes)})
// Log available candidates; streaming will happen within registration
t.LogEvent(ctx, event.SDKSupernodesFound, "Supernodes fetched", event.EventData{event.KeyCount: len(supernodes)})

// 2 - Register with the supernodes
if err := t.registerWithSupernodes(ctx, supernodes); err != nil {
Expand All @@ -77,15 +73,18 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
TaskId: t.TaskID,
}

// Strict XOR-first qualification and attempts
fileSize := getFileSizeBytes(t.filePath)
var minRam uint64
if fileSize > 0 {
minRam = uint64(fileSize) * uploadRAMMultiplier
}
ordered := t.orderByXORDistance(ctx, supernodes)

var lastErr error
attempted := 0
// Work on a copy; re-rank by free RAM between attempts
remaining := append(lumera.Supernodes(nil), supernodes...)
for len(remaining) > 0 {
// Re-rank remaining nodes by available RAM (descending)
remaining = t.orderByFreeRAM(ctx, remaining)
sn := remaining[0]
iteration := attempted + 1
for i, sn := range ordered {
iteration := i + 1

t.LogEvent(ctx, event.SDKRegistrationAttempt, "attempting registration with supernode", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
Expand All @@ -94,10 +93,8 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
})

// Re-check serving status just-in-time to avoid calling a node that became down/underpeered
if !t.isServing(ctx, sn) {
t.logger.Info(ctx, "skip supernode: not serving", "supernode", sn.GrpcEndpoint, "sn-address", sn.CosmosAddress, "iteration", iteration)
// Drop this node and retry with the rest
remaining = remaining[1:]
// Ensure node qualifies before attempt
if !t.nodeQualifies(ctx, sn, minStorageThresholdBytes, minRam) {
continue
}

Expand All @@ -110,8 +107,6 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
event.KeyError: err.Error(),
})
lastErr = err
// Drop this node and retry with the rest (re-ranked next loop)
remaining = remaining[1:]
continue
}

Expand Down
38 changes: 9 additions & 29 deletions sdk/task/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,8 @@ func (t *CascadeDownloadTask) Run(ctx context.Context) error {
t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()})
return err
}
// Initial concurrent balance filter (one-time)
supernodes = t.filterByMinBalance(ctx, supernodes)

// Rank by available free RAM (descending). Unknown RAM stays after known.
supernodes = t.orderByFreeRAM(ctx, supernodes)
t.LogEvent(ctx, event.SDKSupernodesFound, "super-nodes found", event.EventData{event.KeyCount: len(supernodes)})
// Log available candidates; streaming will happen within download phase
t.LogEvent(ctx, event.SDKSupernodesFound, "super-nodes fetched", event.EventData{event.KeyCount: len(supernodes)})

// 2 – download from super-nodes
if err := t.downloadFromSupernodes(ctx, supernodes); err != nil {
Expand Down Expand Up @@ -81,15 +77,13 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern
}
}

// Try supernodes sequentially with re-ranking between attempts
// Strict XOR-first qualification and attempts (downloads: storage-only threshold)
ordered := t.orderByXORDistance(ctx, supernodes)

var lastErr error
remaining := append(lumera.Supernodes(nil), supernodes...)
attempted := 0
for len(remaining) > 0 {
// Re-rank remaining nodes by available RAM (descending)
remaining = t.orderByFreeRAM(ctx, remaining)
sn := remaining[0]
iteration := attempted + 1
for i, sn := range ordered {
iteration := i + 1

// Log download attempt
t.LogEvent(ctx, event.SDKDownloadAttempt, "attempting download from super-node", event.EventData{
Expand All @@ -98,10 +92,8 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern
event.KeyIteration: iteration,
})

// Re-check serving status just-in-time to avoid calling a node that became down/underpeered
if !t.isServing(ctx, sn) {
t.logger.Info(ctx, "skip supernode: not serving", "supernode", sn.GrpcEndpoint, "sn-address", sn.CosmosAddress, "iteration", iteration)
remaining = remaining[1:]
// Ensure node qualifies before attempt
if !t.nodeQualifies(ctx, sn, minStorageThresholdBytes, 0) {
continue
}

Expand All @@ -115,7 +107,6 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern
event.KeyError: err.Error(),
})
lastErr = err
remaining = remaining[1:]
continue
}

Expand All @@ -135,17 +126,6 @@ func (t *CascadeDownloadTask) attemptDownload(
factory *net.ClientFactory,
req *supernodeservice.CascadeSupernodeDownloadRequest,
) error {
// Recheck liveness/busyness just before attempting download to handle delays
if !t.isServing(parent, sn) {
// Emit a concise event; detailed rejection reasons are logged inside isServing
t.LogEvent(parent, event.SDKDownloadFailure, "precheck: supernode not serving/busy", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
event.KeyReason: "precheck_not_serving_or_busy",
})
return fmt.Errorf("precheck: supernode not serving/busy")
}

ctx, cancel := context.WithTimeout(parent, downloadTimeout)
defer cancel()

Expand Down
22 changes: 2 additions & 20 deletions sdk/task/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"fmt"
"math/big"
"os"
"path/filepath"
"sort"
"strings"

"github.com/LumeraProtocol/supernode/v2/pkg/utils"
"github.com/LumeraProtocol/supernode/v2/sdk/adapters/lumera"
Expand Down Expand Up @@ -50,7 +48,7 @@ func (m *ManagerImpl) validateAction(ctx context.Context, actionID string) (lume
}

// validateSignature verifies the authenticity of a signature against an action's data hash.
//

// This function performs the following steps:
// 1. Decodes the CASCADE metadata from the provided Lumera action
// 2. Extracts the base64-encoded data hash from the metadata
Expand Down Expand Up @@ -103,7 +101,7 @@ func (m *ManagerImpl) validateSignature(ctx context.Context, action lumera.Actio
return nil
}

// (Removed) Peers connectivity preflight is now enforced during discovery in isServing.
//

func (m *ManagerImpl) validateDownloadAction(ctx context.Context, actionID string) (lumera.Action, error) {
action, err := m.lumeraClient.GetAction(ctx, actionID)
Expand All @@ -124,22 +122,6 @@ func (m *ManagerImpl) validateDownloadAction(ctx context.Context, actionID strin
return action, nil
}

// Helper function to ensure output path has the correct filename
func ensureOutputPathWithFilename(outputPath, filename string) string {
// If outputPath is empty, just return the filename
if outputPath == "" {
return filename
}

// Check if the path already ends with the filename
if strings.HasSuffix(outputPath, filename) {
return outputPath
}

// Otherwise, append the filename to the path
return filepath.Join(outputPath, filename)
}

func orderSupernodesByDeterministicDistance(seed string, sns lumera.Supernodes) lumera.Supernodes {
if len(sns) == 0 || seed == "" {
return sns
Expand Down
Loading
Loading