diff --git a/p2p/client.go b/p2p/client.go index 5d4a44be..9ed9b9d9 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -18,6 +18,9 @@ type Client interface { // reqCount is the minimum number of keys that are actually required by the caller // to successfully perform the reuquired operation BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) + + BatchRetrieveStream(ctx context.Context, keys []string, required int32, txID string, onSymbol func(base58Key string, data []byte) error, localOnly ...bool) (written int32, err error) + // Store store data to the network, which will trigger the iterative store message // - the base58 encoded identifier will be returned Store(ctx context.Context, data []byte, typ int) (string, error) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 13615deb..61fe0eeb 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -41,14 +41,15 @@ const ( defaultDeleteDataInterval = 11 * time.Hour delKeysCountThreshold = 10 lowSpaceThreshold = 50 // GB - batchRetrieveSize = 1000 + batchRetrieveSize = 1500 - storeSameSymbolsBatchConcurrency = 3 - fetchSymbolsBatchConcurrency = 6 + storeSameSymbolsBatchConcurrency = 4 + fetchSymbolsBatchConcurrency = 8 minimumDataStoreSuccessRate = 75.0 + perNodeRequestCap = 600 // ~36 MB per RPC at 60KB/symbol maxIterations = 4 - macConcurrentNetworkStoreCalls = 16 + maxConcurrentNetworkStoreCalls = 16 ) // DHT represents the state of the queries node in the distributed hash table @@ -1045,6 +1046,450 @@ func (s *DHT) doBatchGetValuesCall(ctx context.Context, node *Node, requestKeys return resp.Data, nil } +func (s *DHT) BatchRetrieveStream( + ctx context.Context, + keys []string, + required int32, + txID string, + onSymbol func(base58Key string, data []byte) error, // write-to-disk sink + localOnly ...bool, +) (written int32, err error) { + logtrace.Debug(ctx, "DHT BatchRetrieveStream begin", logtrace.Fields{"txid": txID, "keys": len(keys), "required": required}) + if required <= 0 || len(keys) == 0 { + return 0, nil + } + + hexKeys := make([]string, len(keys)) + hashes := make([][]byte, len(keys)) + for i, key := range keys { + decoded := base58.Decode(key) + if len(decoded) != B/8 { + return 0, fmt.Errorf("invalid key: %v", key) + } + hashes[i] = decoded + hexKeys[i] = hex.EncodeToString(decoded) + } + + // resSeen tracks which symbols we already persisted (hexKey -> struct{}) + var resSeen sync.Map + + // 1) Local-first (BATCHED) + foundLocalCount, lerr := s.fetchAndWriteLocalKeysBatched(ctx, hexKeys, &resSeen, required, onSymbol) + if lerr != nil { + return 0, fmt.Errorf("fetch&write local (batched): %w", lerr) + } + if foundLocalCount >= required { + logtrace.Debug(ctx, "DHT BatchRetrieveStream satisfied from local storage", logtrace.Fields{ + "txid": txID, "found_local": foundLocalCount, "required": required, + }) + return foundLocalCount, nil + } + if len(localOnly) > 0 && localOnly[0] { + logtrace.Debug(ctx, "DHT BatchRetrieveStream local-only mode, insufficient keys", logtrace.Fields{ + "txid": txID, "found_local": foundLocalCount, "required": required, + }) + return foundLocalCount, fmt.Errorf("local-only: found %d, required %d", foundLocalCount, required) + } + + // Effective network requirement (avoid over-fetch) + needNetwork := required - foundLocalCount + if needNetwork <= 0 { + return foundLocalCount, nil + } + + // 2) Network phase: breadth-first + supernodeAddr, addrErr := s.getSupernodeAddress(ctx) + if addrErr != nil { + logtrace.Warn(ctx, "Failed to get supernode address", logtrace.Fields{ + logtrace.FieldModule: "dht", logtrace.FieldError: addrErr.Error(), + }) + } + hostIP := parseSupernodeAddress(supernodeAddr) + self := &Node{ID: s.ht.self.ID, IP: hostIP, Port: s.ht.self.Port} + self.SetHashedID() + + knownNodes := make(map[string]*Node) + var knownMu sync.Mutex + for _, n := range s.ht.nodes() { + nn := &Node{ID: n.ID, IP: n.IP, Port: n.Port} + nn.SetHashedID() + knownNodes[string(nn.ID)] = nn + } + delete(knownNodes, string(self.ID)) + + ignoreList := s.ignorelist.ToNodeList() + globalClosestContacts := make(map[string]*NodeList) + var closestMu sync.RWMutex + + // Build top-k once for unsatisfied keys + for i := range keys { + if _, found := resSeen.Load(hexKeys[i]); found { + continue + } + topK := s.ht.closestContactsWithIncludingNode(Alpha, hashes[i], ignoreList, nil) + closestMu.Lock() + globalClosestContacts[keys[i]] = topK + closestMu.Unlock() + s.addKnownNodesSafe(ctx, topK.Nodes, knownNodes, &knownMu) + } + + var networkFound int32 + totalFound := func() int32 { return foundLocalCount + atomic.LoadInt32(&networkFound) } + + batchSize := batchRetrieveSize + totalBatches := int(math.Ceil(float64(len(keys)) / float64(batchSize))) + parallelBatches := int(math.Min(float64(totalBatches), float64(fetchSymbolsBatchConcurrency))) + + sem := semaphore.NewWeighted(int64(parallelBatches)) + var wg sync.WaitGroup + gctx, cancel := context.WithCancel(ctx) + defer cancel() + + netStart := time.Now() + for start := 0; start < len(keys); start += batchSize { + end := start + batchSize + if end > len(keys) { + end = len(keys) + } + if totalFound() >= required { + break + } + + wg.Add(1) + go func(start, end int) { + defer wg.Done() + if err := sem.Acquire(gctx, 1); err != nil { + return + } + defer sem.Release(1) + if totalFound() >= required { + return + } + // IMPORTANT: pass needNetwork (not total required) into the batch + s.processBatchStream( + gctx, + keys[start:end], + hexKeys[start:end], + globalClosestContacts, &closestMu, + knownNodes, &knownMu, + &resSeen, + needNetwork, + &networkFound, + onSymbol, + cancel, + txID, + ) + }(start, end) + } + wg.Wait() + + netFound := int(atomic.LoadInt32(&networkFound)) + { + f := logtrace.Fields{"txid": txID, "found_local": foundLocalCount, "found_network": netFound, "required": required, "ms": time.Since(netStart).Milliseconds(), logtrace.FieldRole: "client"} + if o := logtrace.OriginFromContext(ctx); o != "" { + f[logtrace.FieldOrigin] = o + } + logtrace.Info(ctx, "dht: batch retrieve stream summary", f) + } + s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Since(netStart)) + return foundLocalCount + int32(netFound), nil +} + +func (s *DHT) processBatchStream( + ctx context.Context, + batchKeys []string, + batchHexKeys []string, + globalClosestContacts map[string]*NodeList, + closestMu *sync.RWMutex, + knownNodes map[string]*Node, + knownMu *sync.Mutex, + resSeen *sync.Map, + needNetwork int32, // << only network requirement + networkFound *int32, + onSymbol func(base58Key string, data []byte) error, + cancel context.CancelFunc, + txID string, +) { + select { + case <-ctx.Done(): + return + default: + } + + // Snapshot contacts for this batch once + closestMu.RLock() + localContacts := make(map[string]*NodeList, len(batchKeys)) + for _, key := range batchKeys { + localContacts[key] = globalClosestContacts[key] + } + closestMu.RUnlock() + + // Determine max K across keys in this batch + maxK := 0 + for _, key := range batchKeys { + if nl := localContacts[key]; nl != nil && len(nl.Nodes) > maxK { + maxK = len(nl.Nodes) + } + } + if maxK == 0 { + return + } + + remainingNeeded := func() int32 { return needNetwork - atomic.LoadInt32(networkFound) } + + // Up to K waves; stop early if we've met the network target + for wave := 0; wave < maxK; wave++ { + if remainingNeeded() <= 0 || ctx.Err() != nil { + break + } + + // Build node->indices for this wave using the wave's provider + fetchMap := make(map[string][]int) + for idx, key := range batchKeys { + hexK := batchHexKeys[idx] + if _, seen := resSeen.Load(hexK); seen { + continue + } + nl := localContacts[key] + if nl == nil || len(nl.Nodes) == 0 { + continue + } + k := len(nl.Nodes) + base := providerIndexForKey(key, txID, k) + provider := nl.Nodes[(base+wave)%k] // rotate per wave + + nodeID := string(provider.ID) + fetchMap[nodeID] = append(fetchMap[nodeID], idx) + } + + // Per-node request cap to bound payload sizes + if perNodeRequestCap > 0 { + for nodeID, idxs := range fetchMap { + if len(idxs) > perNodeRequestCap { + fetchMap[nodeID] = idxs[:perNodeRequestCap] + } + } + } + + foundCount, _ := s.iterateBatchGetValuesStream( + ctx, + knownNodes, + batchKeys, // base58 for sink + batchHexKeys, // hex for dedup + fetchMap, + resSeen, + needNetwork, // << pass network-only target + atomic.LoadInt32(networkFound), + onSymbol, + ) + if foundCount > 0 { + atomic.AddInt32(networkFound, int32(foundCount)) + } + if atomic.LoadInt32(networkFound) >= needNetwork { + cancel() + break + } + } +} +func (s *DHT) iterateBatchGetValuesStream( + ctx context.Context, + nodes map[string]*Node, + base58Keys []string, + hexKeys []string, + fetchMap map[string][]int, + resSeen *sync.Map, // hexKey -> struct{} + needNetwork int32, // << network-only target for this call + alreadyFound int32, + onSymbol func(base58Key string, data []byte) error, +) (int, error) { + sem := semaphore.NewWeighted(int64(storeSameSymbolsBatchConcurrency)) + var wg sync.WaitGroup + var firstErr error + var mu sync.Mutex + foundCount := int32(0) + + gctx, cancel := context.WithCancel(ctx) + defer cancel() + + for nodeID := range fetchMap { + node, ok := nodes[nodeID] + if !ok { + continue + } + if s.ignorelist.Banned(node) { + logtrace.Debug(ctx, "Ignore banned node in iterate batch get values (stream)", logtrace.Fields{ + logtrace.FieldModule: "dht", "node": node.String(), + }) + continue + } + + wg.Add(1) + go func(node *Node, nodeID string) { + defer wg.Done() + + if err := sem.Acquire(gctx, 1); err != nil { + return + } + defer sem.Release(1) + + if atomic.LoadInt32(&foundCount) >= int32(needNetwork-alreadyFound) { + return + } + + // Prepare request set for this node (only for keys not yet seen) + indices := fetchMap[nodeID] + requestKeys := make(map[string]KeyValWithClosest) // hex -> payload placeholder + for _, idx := range indices { + if idx < len(hexKeys) { + if _, loaded := resSeen.Load(hexKeys[idx]); !loaded { + requestKeys[hexKeys[idx]] = KeyValWithClosest{} + } + } + } + if len(requestKeys) == 0 { + return + } + + decompressedData, err := s.doBatchGetValuesCall(gctx, node, requestKeys) + if err != nil { + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() + return + } + + // Build hex->base58 for this set + hexToB58 := make(map[string]string, len(indices)) + for _, idx := range indices { + if idx < len(hexKeys) { + hexToB58[hexKeys[idx]] = base58Keys[idx] + } + } + + for hexK, v := range decompressedData { + if len(v.Value) == 0 { + continue + } + if _, loaded := resSeen.LoadOrStore(hexK, struct{}{}); loaded { + continue + } + b58, ok := hexToB58[hexK] + if !ok { + continue + } + if err := onSymbol(b58, v.Value); err != nil { + // allow retry from another node + resSeen.Delete(hexK) + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() + continue + } + atomic.AddInt32(&foundCount, 1) + if atomic.LoadInt32(&foundCount) >= int32(needNetwork-alreadyFound) { + cancel() + break + } + } + }(node, nodeID) + } + + wg.Wait() + + logtrace.Debug(ctx, "Iterate batch get values (stream) done", logtrace.Fields{ + logtrace.FieldModule: "dht", "found_count": atomic.LoadInt32(&foundCount), + }) + if firstErr != nil { + logtrace.Error(ctx, "Encountered error in iterate batch get values (stream)", logtrace.Fields{ + logtrace.FieldModule: "dht", "found_count": atomic.LoadInt32(&foundCount), logtrace.FieldError: firstErr.Error(), + }) + } + return int(foundCount), firstErr +} + +func (s *DHT) fetchAndWriteLocalKeysBatched( + ctx context.Context, + hexKeys []string, + resSeen *sync.Map, // hexKey -> struct{} + req int32, + onSymbol func(string, []byte) error, // base58 key -> data +) (int32, error) { + const batchSize = 5000 + var count int32 + + for start := 0; start < len(hexKeys); start += batchSize { + end := start + batchSize + if end > len(hexKeys) { + end = len(hexKeys) + } + batchHexKeys := hexKeys[start:end] + + logtrace.Debug(ctx, "Processing batch of local keys (stream)", logtrace.Fields{ + logtrace.FieldModule: "dht", "batch_size": len(batchHexKeys), "total_keys": len(hexKeys), + }) + + localValues, _, batchErr := s.store.RetrieveBatchValues(ctx, batchHexKeys, false) + if batchErr != nil { + logtrace.Error(ctx, "Failed to retrieve local batch values", logtrace.Fields{ + logtrace.FieldModule: "dht", logtrace.FieldError: batchErr.Error(), + }) + // Continue to next batch (same behavior as your fetchAndAddLocalKeys) + continue + } + + for i, val := range localValues { + if len(val) == 0 { + continue + } + hk := batchHexKeys[i] + if _, loaded := resSeen.LoadOrStore(hk, struct{}{}); loaded { + continue + } + raw, err := hex.DecodeString(hk) + if err != nil { + // Skip malformed key; very unlikely because hk came from our own decode + continue + } + b58 := base58.Encode(raw) + if err := onSymbol(b58, val); err != nil { + // allow retry later + resSeen.Delete(hk) + continue + } + count++ + if count >= req { + return count, nil + } + } + } + return count, nil +} + +func providerIndexForKey(key string, txID string, k int) int { + // cheap FNV-1a; any stable hash works + const ( + offset64 = 1469598103934665603 + prime64 = 1099511628211 + ) + h := uint64(offset64) + for i := 0; i < len(key); i++ { + h ^= uint64(key[i]) + h *= prime64 + } + for i := 0; i < len(txID); i++ { + h ^= uint64(txID[i]) + h *= prime64 + } + if k <= 0 { + return 0 + } + return int(h % uint64(k)) +} + // Iterate does an iterative search through the kademlia network // - IterativeStore - used to store new information in the kademlia network // - IterativeFindNode - used to bootstrap the network @@ -1762,7 +2207,7 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[string]*Node, storageMap map[string][]int, typ int) chan *MessageWithError { responses := make(chan *MessageWithError, len(nodes)) - maxStore := macConcurrentNetworkStoreCalls + maxStore := maxConcurrentNetworkStoreCalls if ln := len(nodes); ln < maxStore { maxStore = ln } diff --git a/p2p/p2p.go b/p2p/p2p.go index f9a5f74e..c613a32d 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -146,6 +146,16 @@ func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, tx return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, localOnly...) } +func (s *p2p) BatchRetrieveStream(ctx context.Context, keys []string, required int32, txID string, + onSymbol func(base58Key string, data []byte) error, localOnly ...bool) (written int32, err error) { + + if !s.running { + return 0, errors.New("p2p service is not running") + } + + return s.dht.BatchRetrieveStream(ctx, keys, int32(required), txID, onSymbol, localOnly...) +} + // Delete delete key in queries node func (s *p2p) Delete(ctx context.Context, key string) error { diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index 73c31a2a..ded96b7b 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -47,4 +47,12 @@ type Codec interface { Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) // without generating RaptorQ symbols. CreateMetadata(ctx context.Context, req CreateMetadataRequest) (CreateMetadataResponse, error) + PrepareDecode(ctx context.Context, actionID string, layout Layout) ( + blockPaths []string, + Write func(block int, symbolID string, data []byte) (string, error), + Cleanup func() error, + ws *Workspace, + err error, + ) + DecodeFromPrepared(ctx context.Context, ws *Workspace, layout Layout) (DecodeResponse, error) } diff --git a/supernode/adaptors/rq.go b/supernode/adaptors/rq.go index b8efa1dd..ebf5e315 100644 --- a/supernode/adaptors/rq.go +++ b/supernode/adaptors/rq.go @@ -11,6 +11,16 @@ import ( type CodecService interface { EncodeInput(ctx context.Context, actionID string, filePath string) (EncodeResult, error) Decode(ctx context.Context, req DecodeRequest) (DecodeResult, error) + + PrepareDecode(ctx context.Context, actionID string, layout codec.Layout) ( + blockPaths []string, + Write func(block int, symbolID string, data []byte) (string, error), + Cleanup func() error, + Workspace *codec.Workspace, + err error, + ) + + DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (DecodeResult, error) } type EncodeResult struct { @@ -52,3 +62,23 @@ func (c *codecImpl) Decode(ctx context.Context, req DecodeRequest) (DecodeResult } return DecodeResult{FilePath: res.FilePath, DecodeTmpDir: res.DecodeTmpDir}, nil } + +func (c *codecImpl) PrepareDecode( + ctx context.Context, + actionID string, + layout codec.Layout, +) ([]string, func(block int, symbolID string, data []byte) (string, error), func() error, *codec.Workspace, error) { + return c.codec.PrepareDecode(ctx, actionID, layout) +} + +func (c *codecImpl) DecodeFromPrepared( + ctx context.Context, + ws *codec.Workspace, + layout codec.Layout, +) (DecodeResult, error) { + res, err := c.codec.DecodeFromPrepared(ctx, ws, layout) + if err != nil { + return DecodeResult{}, err + } + return DecodeResult{FilePath: res.FilePath, DecodeTmpDir: res.DecodeTmpDir}, nil +} diff --git a/supernode/cascade/download.go b/supernode/cascade/download.go index 7378f920..aa61a82a 100644 --- a/supernode/cascade/download.go +++ b/supernode/cascade/download.go @@ -6,6 +6,9 @@ import ( "fmt" "os" "sort" + "strings" + "sync" + "sync/atomic" "time" actiontypes "github.com/LumeraProtocol/lumera/x/action/v1/types" @@ -170,7 +173,7 @@ func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, acti return task.restoreFileFromLayout(ctx, layout, metadata.DataHash, actionID, send) } -func (task *CascadeRegistrationTask) restoreFileFromLayout(ctx context.Context, layout codec.Layout, dataHash string, actionID string, send func(resp *DownloadResponse) error) (string, string, error) { +func (task *CascadeRegistrationTask) restoreFileFromLayoutDeprecated(ctx context.Context, layout codec.Layout, dataHash string, actionID string, send func(resp *DownloadResponse) error) (string, string, error) { fields := logtrace.Fields{logtrace.FieldActionID: actionID} symSet := make(map[string]struct{}) for _, block := range layout.Blocks { @@ -245,3 +248,160 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(ctx context.Context, } return decodeInfo.FilePath, decodeInfo.DecodeTmpDir, nil } + +func (task *CascadeRegistrationTask) restoreFileFromLayout( + ctx context.Context, + layout codec.Layout, + dataHash string, + actionID string, + send func(resp *DownloadResponse) error, +) (string, string, error) { + + fields := logtrace.Fields{logtrace.FieldActionID: actionID} + + // Unique symbols + symSet := make(map[string]struct{}, 1024) + for _, block := range layout.Blocks { + for _, s := range block.Symbols { + symSet[s] = struct{}{} + } + } + allSymbols := make([]string, 0, len(symSet)) + for s := range symSet { + allSymbols = append(allSymbols, s) + } + sort.Strings(allSymbols) + totalSymbols := len(allSymbols) + fields["totalSymbols"] = totalSymbols + + targetRequiredCount := (totalSymbols*targetRequiredPercent + 99) / 100 + if targetRequiredCount < 1 && totalSymbols > 0 { + targetRequiredCount = 1 + } + if targetRequiredCount > totalSymbols { + targetRequiredCount = totalSymbols + } + logtrace.Info(ctx, "download: plan symbols", + logtrace.Fields{"total_symbols": totalSymbols, "target_required_percent": targetRequiredPercent, "target_required_count": targetRequiredCount}) + + if totalSymbols == 0 { + return "", "", errors.New("no symbols present in layout") + } + + // Prepare RQ workspace once; stream symbols directly into it + logtrace.Info(ctx, "download: prepare RQ workspace", logtrace.Fields{"action_id": actionID}) + _, writeSymbol, cleanup, ws, perr := task.RQ.PrepareDecode(ctx, actionID, layout) + if perr != nil { + fields[logtrace.FieldError] = perr.Error() + logtrace.Error(ctx, "rq prepare-decode failed", fields) + return "", "", fmt.Errorf("prepare decode: %w", perr) + } + success := false + defer func() { + if !success && cleanup != nil { + _ = cleanup() + } + }() + + // Track exactly which symbol IDs we wrote (base58 IDs) + var written int32 + var writtenSet sync.Map // b58 symbol id -> struct{} + onSymbol := func(symbolID string, data []byte) error { + if _, err := writeSymbol(-1, symbolID, data); err != nil { + return err + } + writtenSet.Store(symbolID, struct{}{}) + atomic.AddInt32(&written, 1) + return nil + } + + // 1) Local batched streaming + retrieveStart := time.Now() + logtrace.Info(ctx, "download: local scan start", logtrace.Fields{"action_id": actionID, "requested": targetRequiredCount, "total_candidates": totalSymbols}) + localFound, lerr := task.P2PClient.BatchRetrieveStream(ctx, allSymbols, int32(targetRequiredCount), actionID, onSymbol, true) + if lerr != nil && !strings.Contains(strings.ToLower(lerr.Error()), "local-only") { + fields[logtrace.FieldError] = lerr.Error() + logtrace.Error(ctx, "local batch retrieve stream failed", fields) + return "", ws.SymbolsDir, fmt.Errorf("local batch retrieve stream: %w", lerr) + } + + // If needed, compute the remaining keys that were NOT written in pass 1 + if int(localFound) < targetRequiredCount { + remaining := int32(targetRequiredCount) - localFound + + // Build a compact slice of only the symbols not written by the local pass + remainingKeys := make([]string, 0, len(allSymbols)) + for _, k := range allSymbols { + if _, ok := writtenSet.Load(k); !ok { + remainingKeys = append(remainingKeys, k) + } + } + + logtrace.Info(ctx, "download: network retrieve start", logtrace.Fields{ + "action_id": actionID, "remaining": remaining, "candidate_keys": len(remainingKeys), + }) + + if len(remainingKeys) == 0 { + logtrace.Warn(ctx, "no remaining keys after local pass but remaining > 0; proceeding with allSymbols as fallback", + logtrace.Fields{"action_id": actionID, "remaining": remaining}) + remainingKeys = allSymbols + } + + // Network phase on only the remaining keys; avoids a second local scan & duplicate writes + if _, nerr := task.P2PClient.BatchRetrieveStream(ctx, remainingKeys, remaining, actionID, onSymbol /* network allowed */); nerr != nil { + fields[logtrace.FieldError] = nerr.Error() + logtrace.Error(ctx, "network batch retrieve stream failed", fields) + return "", ws.SymbolsDir, fmt.Errorf("network batch retrieve stream: %w", nerr) + } + } + + retrieveMS := time.Since(retrieveStart).Milliseconds() + logtrace.Info(ctx, "download: batch retrieve (stream) ok", logtrace.Fields{ + "action_id": actionID, "received": atomic.LoadInt32(&written), "retrieve_ms": retrieveMS, + }) + + // 2) Decode from prepared workspace + decodeStart := time.Now() + logtrace.Info(ctx, "download: decode start", logtrace.Fields{"action_id": actionID}) + decodeInfo, derr := task.RQ.DecodeFromPrepared(ctx, ws, layout) + if derr != nil { + fields[logtrace.FieldError] = derr.Error() + logtrace.Error(ctx, "decode failed", fields) + return "", ws.SymbolsDir, fmt.Errorf("decode RaptorQ: %w", derr) + } + decodeMS := time.Since(decodeStart).Milliseconds() + logtrace.Info(ctx, "download: decode ok", logtrace.Fields{ + "action_id": actionID, "ms": decodeMS, "tmp_dir": decodeInfo.DecodeTmpDir, "file_path": decodeInfo.FilePath, + }) + logtrace.Debug(ctx, "download: timing", logtrace.Fields{"action_id": actionID, "retrieve_ms": retrieveMS, "decode_ms": decodeMS}) + + // 3) Verify hash + fileHash, herr := utils.Blake3HashFile(decodeInfo.FilePath) + if herr != nil { + fields[logtrace.FieldError] = herr.Error() + logtrace.Error(ctx, "failed to hash file", fields) + return "", ws.SymbolsDir, fmt.Errorf("hash file: %w", herr) + } + if fileHash == nil { + fields[logtrace.FieldError] = "file hash is nil" + logtrace.Error(ctx, "failed to hash file", fields) + return "", ws.SymbolsDir, errors.New("file hash is nil") + } + if verr := cascadekit.VerifyB64DataHash(fileHash, dataHash); verr != nil { + fields[logtrace.FieldError] = verr.Error() + logtrace.Error(ctx, "failed to verify hash", fields) + return "", decodeInfo.DecodeTmpDir, verr + } + + logtrace.Debug(ctx, "request data-hash has been matched with the action data-hash", fields) + logtrace.Info(ctx, "download: file verified", fields) + + // Event + info := map[string]interface{}{"action_id": actionID, "found_symbols": int(atomic.LoadInt32(&written)), "target_percent": targetRequiredPercent} + if b, err := json.Marshal(info); err == nil { + task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), decodeInfo.FilePath, decodeInfo.DecodeTmpDir, send) + } + + success = true + return decodeInfo.FilePath, decodeInfo.DecodeTmpDir, nil +}