From cd2ba4dfb35f151305654cb08c0dd199f4d72c0a Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 18 May 2026 12:15:15 +0200 Subject: [PATCH 1/7] feat(smoke): add topology health probe for download failure diagnostics --- pkg/bee/api/node.go | 20 +- pkg/bee/api/status.go | 10 + pkg/bee/client.go | 10 +- pkg/check/smoke/metrics.go | 117 +++++++++-- pkg/check/smoke/smoke.go | 96 ++++++++- pkg/orchestration/cluster.go | 1 + pkg/orchestration/k8s/cluster.go | 57 +++++ pkg/topohealth/cluster.go | 93 +++++++++ pkg/topohealth/health.go | 344 +++++++++++++++++++++++++++++++ pkg/topohealth/health_test.go | 119 +++++++++++ pkg/topohealth/log.go | 56 +++++ pkg/topohealth/thresholds.go | 29 +++ 12 files changed, 927 insertions(+), 25 deletions(-) create mode 100644 pkg/topohealth/cluster.go create mode 100644 pkg/topohealth/health.go create mode 100644 pkg/topohealth/health_test.go create mode 100644 pkg/topohealth/log.go create mode 100644 pkg/topohealth/thresholds.go diff --git a/pkg/bee/api/node.go b/pkg/bee/api/node.go index 789eeccc3..e808a83ff 100644 --- a/pkg/bee/api/node.go +++ b/pkg/bee/api/node.go @@ -78,7 +78,11 @@ func (n *NodeService) Balances(ctx context.Context) (resp Balances, err error) { return resp, err } -// HasChunk returns true/false if node has a chunk +// HasChunk returns true/false if node has a chunk. +// +// NOTE: This uses GET /chunks/{addr}, which on bee falls back to network +// retrieval if the chunk is not present locally. It is therefore NOT a clean +// local-only check. Use LocalHasChunk for that. func (n *NodeService) HasChunk(ctx context.Context, a swarm.Address) (bool, error) { resp := struct { Message string `json:"message,omitempty"` @@ -95,6 +99,20 @@ func (n *NodeService) HasChunk(ctx context.Context, a swarm.Address) (bool, erro return true, nil } +// LocalHasChunk reports whether the chunk is stored locally on the node +// without triggering a network retrieval. Uses HEAD /chunks/{addr} which +// maps to storer.ChunkStore().Has on the bee side. +func (n *NodeService) LocalHasChunk(ctx context.Context, a swarm.Address) (bool, error) { + err := n.client.request(ctx, http.MethodHead, "/chunks/"+a.String(), nil, nil) + if IsHTTPStatusErrorCode(err, http.StatusNotFound) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil +} + // Health represents node's health type Health struct { Status string `json:"status"` diff --git a/pkg/bee/api/status.go b/pkg/bee/api/status.go index 32e98a163..44906c3d3 100644 --- a/pkg/bee/api/status.go +++ b/pkg/bee/api/status.go @@ -7,6 +7,15 @@ import ( type StatusService service +// Canonical values for the BeeMode field of StatusResponse, mirroring the +// strings emitted by bee/v2/pkg/api.BeeNodeMode.String(). +const ( + BeeModeFull = "full" + BeeModeLight = "light" + BeeModeUltraLight = "ultra-light" + BeeModeUnknown = "unknown" +) + type StatusResponse struct { Overlay string `json:"overlay"` Proximity uint `json:"proximity"` @@ -22,6 +31,7 @@ type StatusResponse struct { IsReachable bool `json:"isReachable"` LastSyncedBlock uint64 `json:"lastSyncedBlock"` CommittedDepth uint8 `json:"committedDepth"` + IsWarmingUp bool `json:"isWarmingUp"` } // Ping pings given node diff --git a/pkg/bee/client.go b/pkg/bee/client.go index 711ffeb65..a1c0d93f4 100644 --- a/pkg/bee/client.go +++ b/pkg/bee/client.go @@ -265,11 +265,19 @@ func (c *Client) DownloadActFile(ctx context.Context, a swarm.Address, opts *api return size, h.Sum(nil), nil } -// HasChunk returns true/false if node has a chunk +// HasChunk returns true/false if node has a chunk. +// +// NOTE: Backed by GET /chunks/{addr}, which falls back to network retrieval on +// a local miss. Use LocalHasChunk for a strict local-only check. func (c *Client) HasChunk(ctx context.Context, a swarm.Address) (bool, error) { return c.api.Node.HasChunk(ctx, a) } +// LocalHasChunk is a strict local-only check; see api.NodeService.LocalHasChunk. +func (c *Client) LocalHasChunk(ctx context.Context, a swarm.Address) (bool, error) { + return c.api.Node.LocalHasChunk(ctx, a) +} + func (c *Client) HasChunks(ctx context.Context, a []swarm.Address) (has []bool, count int, err error) { has = make([]bool, len(a)) for i, addr := range a { diff --git a/pkg/check/smoke/metrics.go b/pkg/check/smoke/metrics.go index 41c72b2fc..144e296a4 100644 --- a/pkg/check/smoke/metrics.go +++ b/pkg/check/smoke/metrics.go @@ -6,27 +6,37 @@ import ( ) type metrics struct { - BatchCreateErrors prometheus.Counter - BatchCreateAttempts prometheus.Counter - UploadErrors *prometheus.CounterVec - UploadAttempts *prometheus.CounterVec - UploadSuccess *prometheus.CounterVec - DownloadErrors *prometheus.CounterVec - DownloadMismatch *prometheus.CounterVec - DownloadAttempts *prometheus.CounterVec - DownloadSuccess *prometheus.CounterVec - UploadDuration *prometheus.HistogramVec - DownloadDuration *prometheus.HistogramVec - UploadThroughput *prometheus.GaugeVec - DownloadThroughput *prometheus.GaugeVec - UploadedBytes *prometheus.CounterVec - DownloadedBytes *prometheus.CounterVec + BatchCreateErrors prometheus.Counter + BatchCreateAttempts prometheus.Counter + UploadErrors *prometheus.CounterVec + UploadAttempts *prometheus.CounterVec + UploadSuccess *prometheus.CounterVec + DownloadErrors *prometheus.CounterVec + DownloadEOFErrors *prometheus.CounterVec + DownloadMismatch *prometheus.CounterVec + DownloadAttempts *prometheus.CounterVec + DownloadSuccess *prometheus.CounterVec + UploadDuration *prometheus.HistogramVec + DownloadDuration *prometheus.HistogramVec + UploadThroughput *prometheus.GaugeVec + DownloadThroughput *prometheus.GaugeVec + UploadedBytes *prometheus.CounterVec + DownloadedBytes *prometheus.CounterVec + NodeHealthVerdict *prometheus.GaugeVec + ClusterFullNodeCount prometheus.Gauge + ClusterLightNodeCount prometheus.Gauge + ChunkReplicaCount prometheus.Histogram + ChunkPresentOnStorer *prometheus.CounterVec + ChunkAbsentFromStorer *prometheus.CounterVec + UnhealthyAbortsPreUp prometheus.Counter + UnhealthyAbortsPreDown prometheus.Counter } const ( labelSizeBytes = "size_bytes" labelNodeName = "node_name" labelRedundancyLevel = "redundancy_level" + labelPhase = "phase" ) func newMetrics(subsystem string) metrics { @@ -166,6 +176,83 @@ func newMetrics(subsystem string) metrics { }, []string{labelNodeName, labelRedundancyLevel}, ), + DownloadEOFErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "download_eof_errors_count", + Help: "Download errors classified as unexpected EOF, which indicate the chunk is likely missing from the cluster.", + }, + []string{labelSizeBytes, labelNodeName, labelRedundancyLevel}, + ), + NodeHealthVerdict: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "node_health_verdict", + Help: "Topology health verdict for a node: 0=unknown, 1=unhealthy, 2=degraded, 3=healthy. Sampled per phase (pre_upload, pre_download, on_failure).", + }, + []string{labelNodeName, labelPhase}, + ), + ClusterFullNodeCount: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "cluster_full_node_count", + Help: "Number of full (non-bootnode) nodes in the cluster.", + }, + ), + ClusterLightNodeCount: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "cluster_light_node_count", + Help: "Number of light nodes in the cluster.", + }, + ), + ChunkReplicaCount: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunk_replica_count", + Help: "On download failure, number of intended-storer full nodes that locally hold the chunk address (HEAD /chunks/{addr}).", + Buckets: []float64{0, 1, 2, 3, 4, 5}, + }, + ), + ChunkPresentOnStorer: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunk_present_on_storer", + Help: "Count of HEAD /chunks/{addr} positive responses on an intended storer at on_failure phase.", + }, + []string{labelNodeName}, + ), + ChunkAbsentFromStorer: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunk_absent_from_storer", + Help: "Count of HEAD /chunks/{addr} negative responses on an intended storer at on_failure phase.", + }, + []string{labelNodeName}, + ), + UnhealthyAbortsPreUp: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "unhealthy_aborts_pre_upload", + Help: "Iterations aborted because the uploader was UNHEALTHY before upload.", + }, + ), + UnhealthyAbortsPreDown: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "unhealthy_aborts_pre_download", + Help: "Iterations skipped because the downloader was UNHEALTHY before download.", + }, + ), } } diff --git a/pkg/check/smoke/smoke.go b/pkg/check/smoke/smoke.go index 179e7b33f..87bdfab78 100644 --- a/pkg/check/smoke/smoke.go +++ b/pkg/check/smoke/smoke.go @@ -6,21 +6,29 @@ import ( "crypto/rand" "errors" "fmt" + "io" "strconv" + "sync" "time" "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" "github.com/ethersphere/beekeeper/pkg/beekeeper" "github.com/ethersphere/beekeeper/pkg/logging" "github.com/ethersphere/beekeeper/pkg/orchestration" "github.com/ethersphere/beekeeper/pkg/random" "github.com/ethersphere/beekeeper/pkg/scheduler" "github.com/ethersphere/beekeeper/pkg/test" + "github.com/ethersphere/beekeeper/pkg/topohealth" "github.com/prometheus/client_golang/prometheus" ) +// onFailureStorerProbeCount is the number of intended storers to probe on a +// download failure (closest by XOR distance to chunk address). +const onFailureStorerProbeCount = 3 + // Options represents smoke test options type Options struct { ContentSize int64 @@ -108,6 +116,13 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option return fmt.Errorf("smoke check requires at least 2 full nodes, got %d", len(fullNodeClients)) } + shape := topohealth.ClusterShape(cluster) + c.metrics.ClusterFullNodeCount.Set(float64(shape.FullNodes)) + c.metrics.ClusterLightNodeCount.Set(float64(shape.LightNodes)) + c.logger.Infof("cluster shape: full=%d light=%d bootnodes=%d", shape.FullNodes, shape.LightNodes, shape.Bootnodes) + + thresholds := topohealth.DefaultThresholds() + test := test.NewTest(c.logger) for i := 0; true; i++ { @@ -176,6 +191,12 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option continue } + if c.probe(ctx, topohealth.PhasePreUpload, uploader, thresholds) == topohealth.StatusUnhealthy { + c.metrics.UnhealthyAbortsPreUp.Inc() + c.logger.Errorf("aborting iteration: uploader %s is UNHEALTHY pre-upload", uploader.Name()) + continue + } + var ( txCtx context.Context txCancel context.CancelFunc = func() {} @@ -222,6 +243,11 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option time.Sleep(o.NodesSyncWait) + if c.probe(ctx, topohealth.PhasePreDownload, downloader, thresholds) == topohealth.StatusUnhealthy { + c.metrics.UnhealthyAbortsPreDown.Inc() + c.logger.Warningf("downloader %s is UNHEALTHY pre-download; attempting anyway", downloader.Name()) + } + var ( rxCtx context.Context rxCancel context.CancelFunc = func() {} @@ -242,6 +268,9 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option rxData, rxDuration, err = test.Download(rxCtx, downloader, address, rLevel) if err != nil { c.metrics.DownloadErrors.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() + if errors.Is(err, io.ErrUnexpectedEOF) { + c.metrics.DownloadEOFErrors.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() + } c.logger.Errorf("download failed for size %d: %v", contentSize, err) c.logger.Infof("retrying in: %v", o.RxOnErrWait) continue @@ -280,14 +309,8 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option rxCancel() if !downloaded { - c.logger.Errorf("all download attempts failed for size %d, fetching downloader topology", contentSize) - top, topErr := downloader.Topology(ctx) - if topErr != nil { - c.logger.Errorf("failed to get downloader topology: %v", topErr) - } else { - c.logger.Infof("downloader %s topology: depth=%d, connected=%d, population=%d, reachability=%s, bins=%s", - downloader.Name(), top.Depth, top.Connected, top.Population, top.Reachability, top.Bins.String()) - } + c.logger.Errorf("all download attempts failed for size %d, dumping topology health for uploader, downloader, and intended storers", contentSize) + c.onFailureDump(ctx, cluster, uploader, downloader, address, thresholds) } c.logger.Infof("completed testing file size: %d bytes", contentSize) @@ -314,3 +337,60 @@ func redundancyLevelLabel(rLevel *redundancy.Level) string { } return strconv.Itoa(int(*rLevel)) } + +// probe runs the per-node topology health probe, emits structured log + metric, +// and returns the resulting Status. A probe error yields StatusUnknown so +// callers can distinguish a transient API failure from a genuinely unhealthy +// node when deciding whether to abort. +func (c *Check) probe(ctx context.Context, phase topohealth.Phase, client *bee.Client, thresholds topohealth.Thresholds) topohealth.Status { + v, err := topohealth.Probe(ctx, client, thresholds) + if err != nil { + c.logger.Errorf("probe %s on %s failed: %v", phase, client.Name(), err) + c.metrics.NodeHealthVerdict.WithLabelValues(client.Name(), string(phase)).Set(float64(topohealth.StatusUnknown)) + return topohealth.StatusUnknown + } + c.metrics.NodeHealthVerdict.WithLabelValues(client.Name(), string(phase)).Set(float64(v.Status)) + topohealth.LogVerdict(c.logger, phase, v) + return v.Status +} + +// onFailureDump fans out probes for uploader, downloader, and the N closest +// intended storers concurrently, logs a per-node snapshot, and records replica +// count + per-node has/absent counters. +func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster, uploader, downloader *bee.Client, chunkAddr swarm.Address, thresholds topohealth.Thresholds) { + var ( + wg sync.WaitGroup + storers []topohealth.StorerResult + storErr error + ) + wg.Add(3) + go func() { + defer wg.Done() + c.probe(ctx, topohealth.PhaseOnFailure, uploader, thresholds) + }() + go func() { + defer wg.Done() + c.probe(ctx, topohealth.PhaseOnFailure, downloader, thresholds) + }() + go func() { + defer wg.Done() + storers, storErr = topohealth.IntendedStorers(ctx, cluster, chunkAddr, onFailureStorerProbeCount, thresholds) + }() + wg.Wait() + + if storErr != nil { + c.logger.Errorf("on_failure intended storers probe failed: %v", storErr) + return + } + for i, r := range storers { + topohealth.LogStorerResult(c.logger, chunkAddr.String(), string(topohealth.PhaseOnFailure), i, r) + if r.HasChunk { + c.metrics.ChunkPresentOnStorer.WithLabelValues(r.Verdict.Node).Inc() + } else { + c.metrics.ChunkAbsentFromStorer.WithLabelValues(r.Verdict.Node).Inc() + } + } + replicas := topohealth.ReplicaCount(storers) + c.metrics.ChunkReplicaCount.Observe(float64(replicas)) + c.logger.Infof("on_failure chunk %s: %d/%d intended storers locally hold the chunk", chunkAddr, replicas, len(storers)) +} diff --git a/pkg/orchestration/cluster.go b/pkg/orchestration/cluster.go index 446c64860..98e10a09e 100644 --- a/pkg/orchestration/cluster.go +++ b/pkg/orchestration/cluster.go @@ -38,6 +38,7 @@ type Cluster interface { Size() (size int) Topologies(ctx context.Context) (topologies ClusterTopologies, err error) ClosestFullNodeClient(ctx context.Context, s *bee.Client) (*bee.Client, error) + FullNodeClientsByDistance(ctx context.Context, chunkAddr swarm.Address) (ClientList, error) } // ClusterOptions represents Bee cluster options diff --git a/pkg/orchestration/k8s/cluster.go b/pkg/orchestration/k8s/cluster.go index 1fbffc9ce..2db6accd5 100644 --- a/pkg/orchestration/k8s/cluster.go +++ b/pkg/orchestration/k8s/cluster.go @@ -1,6 +1,7 @@ package k8s import ( + "bytes" "context" "crypto/tls" "fmt" @@ -17,6 +18,7 @@ import ( "github.com/ethersphere/beekeeper/pkg/orchestration" "github.com/ethersphere/beekeeper/pkg/orchestration/notset" "github.com/ethersphere/beekeeper/pkg/swap" + "golang.org/x/sync/errgroup" ) // compile check whether client implements interface @@ -440,6 +442,61 @@ func (c *Cluster) FlattenTopologies(ctx context.Context) (topologies map[string] return topologies, err } +// FullNodeClientsByDistance returns full node clients sorted by ascending XOR +// distance from the given chunk address. Bootnodes and light nodes are excluded. +// This is the basis for identifying the intended storers of a chunk. +func (c *Cluster) FullNodeClientsByDistance(ctx context.Context, chunkAddr swarm.Address) (orchestration.ClientList, error) { + type entry struct { + client *bee.Client + // dist is the precomputed XOR distance to chunkAddr in big-endian + // bytes. Storing it once lets the sort comparator be pure (no API + // calls, no error side-channel) and avoids O(n log n) length checks. + dist []byte + } + + type job struct { + client *bee.Client + idx int + } + var jobs []job + for _, n := range c.Nodes() { + cfg := n.Config() + if !cfg.FullNode || cfg.BootnodeMode { + continue + } + jobs = append(jobs, job{client: n.Client()}) + } + + entries := make([]entry, len(jobs)) + g, gctx := errgroup.WithContext(ctx) + for i, j := range jobs { + g.Go(func() error { + addrs, err := j.client.Addresses(gctx) + if err != nil { + return fmt.Errorf("get overlay for %s: %w", j.client.Name(), err) + } + dist, err := swarm.DistanceRaw(chunkAddr, addrs.Overlay) + if err != nil { + return fmt.Errorf("distance for %s: %w", j.client.Name(), err) + } + entries[i] = entry{client: j.client, dist: dist} + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + + slices.SortFunc(entries, func(a, b entry) int { + return bytes.Compare(a.dist, b.dist) + }) + out := make(orchestration.ClientList, 0, len(entries)) + for _, e := range entries { + out = append(out, e.client) + } + return out, nil +} + // ClosestFullNodeClient returns the closest full node client to the supplied client. func (c *Cluster) ClosestFullNodeClient(ctx context.Context, s *bee.Client) (*bee.Client, error) { addrToNode := make(map[string]orchestration.Node) diff --git a/pkg/topohealth/cluster.go b/pkg/topohealth/cluster.go new file mode 100644 index 000000000..6329609da --- /dev/null +++ b/pkg/topohealth/cluster.go @@ -0,0 +1,93 @@ +package topohealth + +import ( + "context" + "fmt" + "sync" + + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/orchestration" +) + +type ClusterCounts struct { + FullNodes int `json:"fullNodes"` + LightNodes int `json:"lightNodes"` + Bootnodes int `json:"bootnodes"` +} + +// ClusterShape inspects node configs only; no API calls. +func ClusterShape(c orchestration.Cluster) ClusterCounts { + var cc ClusterCounts + for _, n := range c.Nodes() { + cfg := n.Config() + switch { + case cfg.BootnodeMode: + cc.Bootnodes++ + case cfg.FullNode: + cc.FullNodes++ + default: + cc.LightNodes++ + } + } + return cc +} + +type StorerResult struct { + Verdict Verdict `json:"verdict"` + HasChunk bool `json:"hasChunk"` + // HasError is set if HEAD /chunks/{addr} or the topology probe failed. + // A probe error leaves Verdict.Status == StatusUnknown (not Unhealthy); + // callers can distinguish "probe failed" from "node is genuinely unhealthy". + HasError string `json:"hasError,omitempty"` +} + +// IntendedStorers returns probe results for the top-n full nodes closest to +// chunkAddr, including a local HEAD /chunks/{addr} check on each. Probes run +// concurrently. +func IntendedStorers(ctx context.Context, c orchestration.Cluster, chunkAddr swarm.Address, n int, t Thresholds) ([]StorerResult, error) { + clients, err := c.FullNodeClientsByDistance(ctx, chunkAddr) + if err != nil { + return nil, fmt.Errorf("rank full nodes by distance: %w", err) + } + if n > len(clients) { + n = len(clients) + } + results := make([]StorerResult, n) + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func(idx int, cl *bee.Client) { + defer wg.Done() + results[idx] = probeStorer(ctx, cl, chunkAddr, t) + }(i, clients[i]) + } + wg.Wait() + return results, nil +} + +func probeStorer(ctx context.Context, cl *bee.Client, chunkAddr swarm.Address, t Thresholds) StorerResult { + v, err := Probe(ctx, cl, t) + if err != nil { + return StorerResult{ + Verdict: Verdict{Node: cl.Name(), Status: StatusUnknown}, + HasError: "probe: " + err.Error(), + } + } + has, herr := cl.LocalHasChunk(ctx, chunkAddr) + r := StorerResult{Verdict: v, HasChunk: has} + if herr != nil { + r.HasError = "has_chunk: " + herr.Error() + } + return r +} + +func ReplicaCount(rs []StorerResult) int { + n := 0 + for _, r := range rs { + if r.HasChunk { + n++ + } + } + return n +} diff --git a/pkg/topohealth/health.go b/pkg/topohealth/health.go new file mode 100644 index 000000000..9cce587c8 --- /dev/null +++ b/pkg/topohealth/health.go @@ -0,0 +1,344 @@ +// Package topohealth implements a topology health probe over a Bee cluster. +// It calls /status and /topology on individual nodes, computes a set of +// signals, and produces a Verdict (healthy/degraded/unhealthy) used by the +// smoke tests to diagnose retrieval failures. +package topohealth + +import ( + "context" + "encoding/json" + "fmt" + "sort" + "strconv" + "strings" + "sync" + + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/bee/api" +) + +// Status is the rolled-up verdict for a node. The zero value is StatusUnknown +// so a default-initialized Verdict is never silently read as healthy/unhealthy. +type Status int + +const ( + StatusUnknown Status = iota + StatusUnhealthy + StatusDegraded + StatusHealthy +) + +func (s Status) String() string { + switch s { + case StatusHealthy: + return "HEALTHY" + case StatusDegraded: + return "DEGRADED" + case StatusUnhealthy: + return "UNHEALTHY" + default: + return "UNKNOWN" + } +} + +func (s Status) MarshalJSON() ([]byte, error) { + return fmt.Appendf(nil, `%q`, s.String()), nil +} + +func (s *Status) UnmarshalJSON(b []byte) error { + var v string + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch strings.ToUpper(v) { + case "HEALTHY": + *s = StatusHealthy + case "DEGRADED": + *s = StatusDegraded + case "UNHEALTHY": + *s = StatusUnhealthy + default: + *s = StatusUnknown + } + return nil +} + +type Signals struct { + DepthOK bool `json:"depthOK"` + BelowDepthFilled bool `json:"belowDepthFilled"` + BelowDepthSaturation float64 `json:"belowDepthSaturation"` + NeighborhoodSizeOK bool `json:"neighborhoodSizeOK"` + Reachable bool `json:"reachable"` + WarmedUp bool `json:"warmedUp"` + DialRatioOK bool `json:"dialRatioOK"` + // ReserveAligned/ReserveNonEmpty are only meaningful for full nodes. + ReserveAligned bool `json:"reserveAligned"` + ReserveNonEmpty bool `json:"reserveNonEmpty"` +} + +// BinView is a per-bin snapshot. Unlike bee.BinMap.String it includes empty +// bins, which is the signal we need for "is there a gap below depth?". +type BinView struct { + Index int `json:"index"` + Connected int `json:"connected"` + Disconnected int `json:"disconnected"` + Population int `json:"population"` + Empty bool `json:"empty"` +} + +type Raw struct { + Depth int `json:"depth"` + NnLowWatermark int `json:"nnLowWatermark"` + Connected int `json:"connected"` + Population int `json:"population"` + NeighborhoodSize int `json:"neighborhoodSize"` + EmptyBinsBelowDepth int `json:"emptyBinsBelowDepth"` + DialRatio float64 `json:"dialRatio"` + BeeMode string `json:"beeMode"` + IsReachable bool `json:"isReachable"` + IsWarmingUp bool `json:"isWarmingUp"` + Reachability string `json:"reachability"` + NetworkAvailability string `json:"networkAvailability"` + ReserveSize uint64 `json:"reserveSize"` + ReserveSizeWithinRadius uint64 `json:"reserveSizeWithinRadius"` + StorageRadius uint8 `json:"storageRadius"` + CommittedDepth uint8 `json:"committedDepth"` + PullsyncRate float64 `json:"pullsyncRate"` +} + +type Verdict struct { + Node string `json:"node"` + Overlay swarm.Address `json:"overlay"` + Status Status `json:"status"` + Signals Signals `json:"signals"` + Raw Raw `json:"raw"` + Bins []BinView `json:"bins"` + FailureReasons []string `json:"failureReasons,omitempty"` +} + +// Probe runs the single-node probe: /status and /topology in parallel. +func Probe(ctx context.Context, c *bee.Client, t Thresholds) (Verdict, error) { + var ( + wg sync.WaitGroup + st *api.StatusResponse + top bee.Topology + stErr error + topErr error + ) + wg.Add(2) + go func() { + defer wg.Done() + st, stErr = c.Status(ctx) + }() + go func() { + defer wg.Done() + top, topErr = c.Topology(ctx) + }() + wg.Wait() + if stErr != nil { + return Verdict{}, fmt.Errorf("status: %w", stErr) + } + if topErr != nil { + return Verdict{}, fmt.Errorf("topology: %w", topErr) + } + return Evaluate(c.Name(), st, top, t), nil +} + +// Evaluate is the pure rule-table portion of Probe, exported so the rule logic +// can be tested without a live bee node. +func Evaluate(nodeName string, st *api.StatusResponse, top bee.Topology, t Thresholds) Verdict { + bins, walk := walkBins(top, t) + signals := computeSignals(st, top, walk, t) + status, reasons := decideStatus(st, top, signals, walk) + + return Verdict{ + Node: nodeName, + Overlay: top.Overlay, + Status: status, + Signals: signals, + Bins: bins, + Raw: Raw{ + Depth: top.Depth, + NnLowWatermark: top.NnLowWatermark, + Connected: top.Connected, + Population: top.Population, + NeighborhoodSize: int(st.NeighborhoodSize), + EmptyBinsBelowDepth: walk.emptyBelow, + DialRatio: walk.dialRatio, + BeeMode: st.BeeMode, + IsReachable: st.IsReachable, + IsWarmingUp: st.IsWarmingUp, + Reachability: top.Reachability, + NetworkAvailability: top.NetworkAvailability, + ReserveSize: st.ReserveSize, + ReserveSizeWithinRadius: st.ReserveSizeWithinRadius, + StorageRadius: st.StorageRadius, + CommittedDepth: st.CommittedDepth, + PullsyncRate: st.PullsyncRate, + }, + FailureReasons: reasons, + } +} + +type binWalkResult struct { + emptyBelow int + minBelowSat float64 + belowFilled bool + neighborhoodPeers int + dialRatio float64 +} + +// walkBins iterates bins from 0..maxBin once and produces both the per-bin +// view and every aggregate the verdict rules need, including saturation +// (which depends on the threshold). +func walkBins(top bee.Topology, t Thresholds) ([]BinView, binWalkResult) { + maxBin := top.Depth + for k := range top.Bins { + idx, ok := parseBinIndex(k) + if !ok { + continue + } + if idx > maxBin { + maxBin = idx + } + } + + bins := make([]BinView, 0, maxBin+1) + r := binWalkResult{ + minBelowSat: -1, + belowFilled: true, + } + totalConnected, totalDisconnected := 0, 0 + + for i := 0; i <= maxBin; i++ { + b := top.Bins[binKey(i)] + disc := len(b.DisconnectedPeers) + bins = append(bins, BinView{ + Index: i, + Connected: b.Connected, + Disconnected: disc, + Population: b.Population, + Empty: b.Connected == 0 && b.Population == 0, + }) + totalConnected += b.Connected + totalDisconnected += disc + + if i < top.Depth { + if b.Connected == 0 { + r.belowFilled = false + r.emptyBelow++ + } + sat := float64(b.Connected) / float64(t.SaturationPeers) + if r.minBelowSat < 0 || sat < r.minBelowSat { + r.minBelowSat = sat + } + } else { + r.neighborhoodPeers += b.Connected + } + } + if r.minBelowSat < 0 { + r.minBelowSat = 1.0 + } + + r.dialRatio = 1.0 + if totalConnected+totalDisconnected > 0 { + r.dialRatio = float64(totalConnected) / float64(totalConnected+totalDisconnected) + } + return bins, r +} + +func binKey(i int) string { + return "bin_" + strconv.Itoa(i) +} + +func parseBinIndex(k string) (int, bool) { + rest, ok := strings.CutPrefix(k, "bin_") + if !ok { + return 0, false + } + idx, err := strconv.Atoi(rest) + if err != nil { + return 0, false + } + return idx, true +} + +func computeSignals(st *api.StatusResponse, top bee.Topology, w binWalkResult, t Thresholds) Signals { + reachable := st.IsReachable + if t.RequirePublicReachable { + reachable = reachable && strings.EqualFold(top.Reachability, "Public") + } + + sig := Signals{ + DepthOK: top.Depth > 0, + BelowDepthFilled: w.belowFilled, + BelowDepthSaturation: w.minBelowSat, + NeighborhoodSizeOK: w.neighborhoodPeers >= max(t.MinNeighborhoodSize, top.NnLowWatermark), + Reachable: reachable, + WarmedUp: !st.IsWarmingUp, + DialRatioOK: w.dialRatio >= t.MinDialRatio, + } + + if isFullNode(st.BeeMode) { + sig.ReserveAligned = sig.WarmedUp && st.StorageRadius == st.CommittedDepth + sig.ReserveNonEmpty = st.ReserveSize > 0 + } else { + // Light/ultra-light: marked satisfied so the verdict isn't tripped by + // fields that don't apply to that mode. + sig.ReserveAligned = true + sig.ReserveNonEmpty = true + } + return sig +} + +func isFullNode(beeMode string) bool { + return strings.EqualFold(beeMode, api.BeeModeFull) +} + +// decideStatus applies the rule table: +// +// UNHEALTHY: any of DepthOK / BelowDepthFilled / NeighborhoodSizeOK / +// Reachable / WarmedUp is false. +// DEGRADED: BelowDepthSaturation < 1.0 OR DialRatioOK false OR +// (full node && !ReserveAligned). +// HEALTHY: otherwise. +func decideStatus(st *api.StatusResponse, top bee.Topology, sig Signals, w binWalkResult) (Status, []string) { + var hard, soft []string + if !sig.DepthOK { + hard = append(hard, "depth_not_ok") + } + if !sig.BelowDepthFilled { + hard = append(hard, fmt.Sprintf("empty_bins_below_depth=%d", w.emptyBelow)) + } + if !sig.NeighborhoodSizeOK { + hard = append(hard, fmt.Sprintf("neighborhood_size=%d 0: + reasons := append(hard, soft...) + sort.Strings(reasons) + return StatusUnhealthy, reasons + case len(soft) > 0: + sort.Strings(soft) + return StatusDegraded, soft + default: + return StatusHealthy, nil + } +} diff --git a/pkg/topohealth/health_test.go b/pkg/topohealth/health_test.go new file mode 100644 index 000000000..cd1249978 --- /dev/null +++ b/pkg/topohealth/health_test.go @@ -0,0 +1,119 @@ +package topohealth + +import ( + "testing" + + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/bee/api" +) + +func healthyStatus() *api.StatusResponse { + return &api.StatusResponse{ + BeeMode: api.BeeModeFull, + IsReachable: true, + IsWarmingUp: false, + ReserveSize: 1000, + StorageRadius: 3, + CommittedDepth: 3, + } +} + +func healthyTopology() bee.Topology { + bins := bee.BinMap{} + // bins below depth 3 each have 4 connected (saturated), 0 disconnected. + for i := range 3 { + bins[binKey(i)] = bee.Bin{Connected: 4, Population: 4} + } + // neighborhood (bin >= depth) + bins[binKey(3)] = bee.Bin{Connected: 3, Population: 3} + return bee.Topology{ + Depth: 3, + Connected: 15, + Population: 15, + Reachability: "Public", + Bins: bins, + } +} + +func TestEvaluate_Healthy(t *testing.T) { + v := Evaluate("n1", healthyStatus(), healthyTopology(), DefaultThresholds()) + if v.Status != StatusHealthy { + t.Fatalf("expected HEALTHY, got %s, reasons=%v", v.Status, v.FailureReasons) + } +} + +func TestEvaluate_EmptyBinBelowDepth_Unhealthy(t *testing.T) { + top := healthyTopology() + top.Bins[binKey(1)] = bee.Bin{Connected: 0, Population: 0} + v := Evaluate("n1", healthyStatus(), top, DefaultThresholds()) + if v.Status != StatusUnhealthy { + t.Fatalf("expected UNHEALTHY for missing bin below depth, got %s", v.Status) + } + if v.Raw.EmptyBinsBelowDepth != 1 { + t.Errorf("EmptyBinsBelowDepth = %d, want 1", v.Raw.EmptyBinsBelowDepth) + } +} + +func TestEvaluate_LowSaturation_Degraded(t *testing.T) { + top := healthyTopology() + top.Bins[binKey(0)] = bee.Bin{Connected: 2, Population: 2} + v := Evaluate("n1", healthyStatus(), top, DefaultThresholds()) + if v.Status != StatusDegraded { + t.Fatalf("expected DEGRADED for under-saturated bin, got %s, reasons=%v", v.Status, v.FailureReasons) + } +} + +func TestEvaluate_WarmingUp_Unhealthy(t *testing.T) { + st := healthyStatus() + st.IsWarmingUp = true + v := Evaluate("n1", st, healthyTopology(), DefaultThresholds()) + if v.Status != StatusUnhealthy { + t.Fatalf("expected UNHEALTHY while warming up, got %s", v.Status) + } +} + +func TestEvaluate_NotPublic_RequiresPublic(t *testing.T) { + top := healthyTopology() + top.Reachability = "Private" + v := Evaluate("n1", healthyStatus(), top, DefaultThresholds()) + if v.Status != StatusUnhealthy { + t.Fatalf("expected UNHEALTHY for non-Public reachability when RequirePublicReachable=true, got %s", v.Status) + } + // And HEALTHY when RequirePublicReachable=false. + thr := DefaultThresholds() + thr.RequirePublicReachable = false + v = Evaluate("n1", healthyStatus(), top, thr) + if v.Status != StatusHealthy { + t.Fatalf("expected HEALTHY with RequirePublicReachable=false, got %s, reasons=%v", v.Status, v.FailureReasons) + } +} + +func TestEvaluate_BinsViewIncludesEmpty(t *testing.T) { + top := healthyTopology() + delete(top.Bins, binKey(2)) // simulate bin_2 entirely missing from response + v := Evaluate("n1", healthyStatus(), top, DefaultThresholds()) + var found bool + for _, b := range v.Bins { + if b.Index == 2 { + found = true + if !b.Empty || b.Connected != 0 { + t.Errorf("bin_2 should be empty with 0 connected, got %+v", b) + } + } + } + if !found { + t.Fatalf("expected bin_2 in the view even though missing from response") + } +} + +func TestEvaluate_LightNodeNotPenalizedForReserve(t *testing.T) { + st := healthyStatus() + st.BeeMode = api.BeeModeLight + st.ReserveSize = 0 + st.StorageRadius = 0 + st.CommittedDepth = 0 + v := Evaluate("n1", st, healthyTopology(), DefaultThresholds()) + if v.Status != StatusHealthy { + t.Fatalf("light node with empty reserve should be HEALTHY, got %s, reasons=%v", v.Status, v.FailureReasons) + } +} diff --git a/pkg/topohealth/log.go b/pkg/topohealth/log.go new file mode 100644 index 000000000..8de635c45 --- /dev/null +++ b/pkg/topohealth/log.go @@ -0,0 +1,56 @@ +package topohealth + +import ( + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/sirupsen/logrus" +) + +// Phase tags when in a check's lifecycle a probe was taken. +type Phase string + +const ( + PhasePreUpload Phase = "pre_upload" + PhasePostUpload Phase = "post_upload" + PhasePreDownload Phase = "pre_download" + PhaseOnFailure Phase = "on_failure" +) + +// LogVerdict emits a structured log line for a per-node verdict. All probe +// data is attached as logrus fields; the message is a stable event name that +// CI log greps can pin on. +func LogVerdict(logger logging.Logger, phase Phase, v Verdict) { + logger.WithFields(logrus.Fields{ + "event": "topohealth.verdict", + "phase": string(phase), + "node": v.Node, + "status": v.Status.String(), + "overlay": v.Overlay.String(), + "depth": v.Raw.Depth, + "connected": v.Raw.Connected, + "population": v.Raw.Population, + "empty_below_depth": v.Raw.EmptyBinsBelowDepth, + "dial_ratio": v.Raw.DialRatio, + "reachability": v.Raw.Reachability, + "warming_up": v.Raw.IsWarmingUp, + "reserve_size": v.Raw.ReserveSize, + "storage_radius": v.Raw.StorageRadius, + "committed_depth": v.Raw.CommittedDepth, + "failure_reasons": v.FailureReasons, + }).Info("topohealth.verdict") +} + +// LogStorerResult emits one structured line per intended-storer probe, +// including the HEAD /chunks/{addr} ground-truth. +func LogStorerResult(logger logging.Logger, chunkAddr, phase string, idx int, r StorerResult) { + logger.WithFields(logrus.Fields{ + "event": "topohealth.storer", + "phase": phase, + "storer_rank": idx, + "chunk_addr": chunkAddr, + "node": r.Verdict.Node, + "status": r.Verdict.Status.String(), + "has_chunk": r.HasChunk, + "has_error": r.HasError, + "failure_reasons": r.Verdict.FailureReasons, + }).Info("topohealth.storer") +} diff --git a/pkg/topohealth/thresholds.go b/pkg/topohealth/thresholds.go new file mode 100644 index 000000000..1c593df83 --- /dev/null +++ b/pkg/topohealth/thresholds.go @@ -0,0 +1,29 @@ +package topohealth + +// Thresholds tunes the rules used to derive a per-node Verdict. +// Defaults match a production Swarm cluster; small testnets may need lower +// SaturationPeers. +type Thresholds struct { + // SaturationPeers is the minimum number of connected peers per bin below + // the node's depth for that bin to be considered saturated. + SaturationPeers int + // MinNeighborhoodSize is the minimum number of peers within the node's + // neighborhood (bins >= depth) for the node to be considered usable. + MinNeighborhoodSize int + // MinDialRatio is the minimum connected / (connected + disconnected) ratio + // across all bins under which the node is flagged as having dial issues. + MinDialRatio float64 + // RequirePublicReachable, when true, requires reachability == "Public" + // (instead of just isReachable == true). + RequirePublicReachable bool +} + +// DefaultThresholds returns the production defaults. +func DefaultThresholds() Thresholds { + return Thresholds{ + SaturationPeers: 4, + MinNeighborhoodSize: 2, + MinDialRatio: 0.8, + RequirePublicReachable: true, + } +} From da0a7592216a66174d5c73c3d6d2259d7d618a2a Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 18 May 2026 14:12:05 +0200 Subject: [PATCH 2/7] feat(smoke): add chunk-level diagnostics for download failures --- pkg/check/smoke/metrics.go | 74 +++++--- pkg/check/smoke/smoke.go | 111 +++++++++--- pkg/topohealth/chunkwalk.go | 281 +++++++++++++++++++++++++++++++ pkg/topohealth/chunkwalk_test.go | 83 +++++++++ pkg/topohealth/log.go | 19 +++ 5 files changed, 526 insertions(+), 42 deletions(-) create mode 100644 pkg/topohealth/chunkwalk.go create mode 100644 pkg/topohealth/chunkwalk_test.go diff --git a/pkg/check/smoke/metrics.go b/pkg/check/smoke/metrics.go index 144e296a4..9233017f6 100644 --- a/pkg/check/smoke/metrics.go +++ b/pkg/check/smoke/metrics.go @@ -25,11 +25,15 @@ type metrics struct { NodeHealthVerdict *prometheus.GaugeVec ClusterFullNodeCount prometheus.Gauge ClusterLightNodeCount prometheus.Gauge - ChunkReplicaCount prometheus.Histogram - ChunkPresentOnStorer *prometheus.CounterVec - ChunkAbsentFromStorer *prometheus.CounterVec UnhealthyAbortsPreUp prometheus.Counter UnhealthyAbortsPreDown prometheus.Counter + // Chunk walk: per-chunk presence check across the full upload tree. + ChunksChecked prometheus.Counter + ChunksMissingTotal *prometheus.CounterVec // {position} + ChunksMissingOutOfAOR *prometheus.CounterVec // {position} — bug 1 fingerprint (out-of-depth storing) + ChunksMissingInAOR *prometheus.CounterVec // {position} — bug 2/3 fingerprint (in-depth but not stored) + ChunksPresentOutOfAOR *prometheus.CounterVec // {position} — bug 1 confirmed (chunk exists outside its AOR) + FilesWithLoss prometheus.Counter } const ( @@ -37,6 +41,7 @@ const ( labelNodeName = "node_name" labelRedundancyLevel = "redundancy_level" labelPhase = "phase" + labelPosition = "position" ) func newMetrics(subsystem string) metrics { @@ -210,47 +215,72 @@ func newMetrics(subsystem string) metrics { Help: "Number of light nodes in the cluster.", }, ), - ChunkReplicaCount: prometheus.NewHistogram( - prometheus.HistogramOpts{ + UnhealthyAbortsPreUp: prometheus.NewCounter( + prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "chunk_replica_count", - Help: "On download failure, number of intended-storer full nodes that locally hold the chunk address (HEAD /chunks/{addr}).", - Buckets: []float64{0, 1, 2, 3, 4, 5}, + Name: "unhealthy_aborts_pre_upload", + Help: "Iterations aborted because the uploader was UNHEALTHY before upload.", }, ), - ChunkPresentOnStorer: prometheus.NewCounterVec( + UnhealthyAbortsPreDown: prometheus.NewCounter( prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "chunk_present_on_storer", - Help: "Count of HEAD /chunks/{addr} positive responses on an intended storer at on_failure phase.", + Name: "unhealthy_aborts_pre_download", + Help: "Iterations skipped because the downloader was UNHEALTHY before download.", }, - []string{labelNodeName}, ), - ChunkAbsentFromStorer: prometheus.NewCounterVec( + ChunksChecked: prometheus.NewCounter( prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "chunk_absent_from_storer", - Help: "Count of HEAD /chunks/{addr} negative responses on an intended storer at on_failure phase.", + Name: "chunks_checked_total", + Help: "Total chunks inspected by the on-failure chunk walk (denominator for chunks_missing_* rates).", }, - []string{labelNodeName}, ), - UnhealthyAbortsPreUp: prometheus.NewCounter( + ChunksMissingTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "unhealthy_aborts_pre_upload", - Help: "Iterations aborted because the uploader was UNHEALTHY before upload.", + Name: "chunks_missing_total", + Help: "Chunks not found on their closest full node (HEAD /chunks/{addr} returned 404). Labelled by tree position.", }, + []string{labelPosition}, ), - UnhealthyAbortsPreDown: prometheus.NewCounter( + ChunksMissingOutOfAOR: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "unhealthy_aborts_pre_download", - Help: "Iterations skipped because the downloader was UNHEALTHY before download.", + Name: "chunks_missing_out_of_aor_total", + Help: "Missing chunks whose closest storer in the cluster still has PO(chunk, storer) < storageRadius. Indicates a cluster-coverage gap — the address falls outside every node's AOR. Common in small testnets.", + }, + []string{labelPosition}, + ), + ChunksMissingInAOR: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunks_missing_in_aor_total", + Help: "Missing chunks whose closest storer covers the address (PO >= storageRadius) but does not have the chunk. Bee#5400 bug-2/3 fingerprint: shallow receipt short-circuit or false ChunkSynced.", + }, + []string{labelPosition}, + ), + ChunksPresentOutOfAOR: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunks_present_out_of_aor_total", + Help: "Chunks held by a node whose AOR does not cover them (PO < storageRadius). Direct bee#5400 bug-1 confirmation: out-of-depth storing.", + }, + []string{labelPosition}, + ), + FilesWithLoss: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "files_with_chunk_loss_total", + Help: "Files where the on-failure chunk walk found at least one missing chunk.", }, ), } diff --git a/pkg/check/smoke/smoke.go b/pkg/check/smoke/smoke.go index 87bdfab78..8045d0094 100644 --- a/pkg/check/smoke/smoke.go +++ b/pkg/check/smoke/smoke.go @@ -25,9 +25,17 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// onFailureStorerProbeCount is the number of intended storers to probe on a -// download failure (closest by XOR distance to chunk address). -const onFailureStorerProbeCount = 3 +const ( + // onFailureStorerProbeCount is the number of intended storers to probe on a + // download failure (closest by XOR distance to root chunk address). + onFailureStorerProbeCount = 3 + // chunkWalkParallelism caps concurrent HEAD /chunks/{addr} requests during + // the on-failure chunk walk. + chunkWalkParallelism = 32 + // chunkWalkMaxReported truncates per-category result lists so a large file + // with many missing chunks does not flood the log. + chunkWalkMaxReported = 50 +) // Options represents smoke test options type Options struct { @@ -191,6 +199,17 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option continue } + // Pre-compute the chunk address tree locally so we can pin-point a + // missing chunk if download later fails. Deterministic for the same + // (data, rLevel) input — matches what bee would produce. + splitRoot, allChunks, splitErr := topohealth.SplitChunkAddresses(ctx, txData, rLevel) + if splitErr != nil { + c.logger.Errorf("local chunk split failed for size %d: %v", contentSize, splitErr) + allChunks = nil // fall back to root-only diagnostics + } else { + c.logger.Infof("local split produced %d chunks (root=%s)", len(allChunks), splitRoot) + } + if c.probe(ctx, topohealth.PhasePreUpload, uploader, thresholds) == topohealth.StatusUnhealthy { c.metrics.UnhealthyAbortsPreUp.Inc() c.logger.Errorf("aborting iteration: uploader %s is UNHEALTHY pre-upload", uploader.Name()) @@ -309,8 +328,8 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option rxCancel() if !downloaded { - c.logger.Errorf("all download attempts failed for size %d, dumping topology health for uploader, downloader, and intended storers", contentSize) - c.onFailureDump(ctx, cluster, uploader, downloader, address, thresholds) + c.logger.Errorf("all download attempts failed for size %d, dumping topology health and walking chunk tree", contentSize) + c.onFailureDump(ctx, cluster, uploader, downloader, address, allChunks, thresholds) } c.logger.Infof("completed testing file size: %d bytes", contentSize) @@ -354,10 +373,11 @@ func (c *Check) probe(ctx context.Context, phase topohealth.Phase, client *bee.C return v.Status } -// onFailureDump fans out probes for uploader, downloader, and the N closest -// intended storers concurrently, logs a per-node snapshot, and records replica -// count + per-node has/absent counters. -func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster, uploader, downloader *bee.Client, chunkAddr swarm.Address, thresholds topohealth.Thresholds) { +// onFailureDump fans out probes for uploader, downloader, and intended storers +// concurrently, then walks every chunk of the uploaded file to identify exactly +// which chunks are missing and classify each missing chunk as out-of-AOR +// (bee#5400 bug 1) vs in-AOR-not-stored (bee#5400 bug 2/3). +func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster, uploader, downloader *bee.Client, root swarm.Address, allChunks []topohealth.ChunkInfo, thresholds topohealth.Thresholds) { var ( wg sync.WaitGroup storers []topohealth.StorerResult @@ -374,23 +394,74 @@ func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster }() go func() { defer wg.Done() - storers, storErr = topohealth.IntendedStorers(ctx, cluster, chunkAddr, onFailureStorerProbeCount, thresholds) + storers, storErr = topohealth.IntendedStorers(ctx, cluster, root, onFailureStorerProbeCount, thresholds) }() wg.Wait() if storErr != nil { c.logger.Errorf("on_failure intended storers probe failed: %v", storErr) + } else { + for i, r := range storers { + topohealth.LogStorerResult(c.logger, root.String(), string(topohealth.PhaseOnFailure), i, r) + } + } + + if len(allChunks) == 0 { + c.logger.Warningf("on_failure: no pre-computed chunk list (split failed earlier); skipping chunk walk") return } - for i, r := range storers { - topohealth.LogStorerResult(c.logger, chunkAddr.String(), string(topohealth.PhaseOnFailure), i, r) - if r.HasChunk { - c.metrics.ChunkPresentOnStorer.WithLabelValues(r.Verdict.Node).Inc() - } else { - c.metrics.ChunkAbsentFromStorer.WithLabelValues(r.Verdict.Node).Inc() - } + c.walkChunksOnFailure(ctx, cluster, root, allChunks) +} + +// walkChunksOnFailure does a HEAD /chunks/{addr} per chunk against its +// closest full node, records the per-bug counters, and logs every missing or +// out-of-AOR-present chunk. +func (c *Check) walkChunksOnFailure(ctx context.Context, cluster orchestration.Cluster, root swarm.Address, chunks []topohealth.ChunkInfo) { + storers, err := topohealth.GatherStorers(ctx, cluster) + if err != nil { + c.logger.Errorf("on_failure gather storers failed: %v", err) + return + } + res, err := topohealth.WalkChunks(ctx, storers, chunks, chunkWalkParallelism, chunkWalkMaxReported) + if err != nil { + // ctx-cancellation is reported here but we still emit partial counters. + c.logger.Warningf("on_failure chunk walk did not complete: %v", err) + } + + c.metrics.ChunksChecked.Add(float64(res.Checked)) + for pos, n := range res.MissingTotal { + c.metrics.ChunksMissingTotal.WithLabelValues(string(pos)).Add(float64(n)) + } + for pos, n := range res.MissingOutOfAOR { + c.metrics.ChunksMissingOutOfAOR.WithLabelValues(string(pos)).Add(float64(n)) + } + for pos, n := range res.MissingInAOR { + c.metrics.ChunksMissingInAOR.WithLabelValues(string(pos)).Add(float64(n)) + } + for pos, n := range res.PresentOutOfAOR { + c.metrics.ChunksPresentOutOfAOR.WithLabelValues(string(pos)).Add(float64(n)) + } + + for _, m := range res.Missing { + topohealth.LogChunkCheck(c.logger, "missing", root.String(), m) + } + for _, p := range res.OutOfAORHits { + topohealth.LogChunkCheck(c.logger, "present_out_of_aor", root.String(), p) + } + + totalMissing := sumPerPosition(res.MissingTotal) + if totalMissing > 0 { + c.metrics.FilesWithLoss.Inc() + } + c.logger.Infof("on_failure walk: root=%s checked=%d probe_errors=%d missing=%d (out_of_aor=%d, in_aor=%d) present_out_of_aor=%d", + root, res.Checked, res.ProbeErrors, totalMissing, + sumPerPosition(res.MissingOutOfAOR), sumPerPosition(res.MissingInAOR), sumPerPosition(res.PresentOutOfAOR)) +} + +func sumPerPosition(m topohealth.PerPositionCounts) int { + n := 0 + for _, v := range m { + n += v } - replicas := topohealth.ReplicaCount(storers) - c.metrics.ChunkReplicaCount.Observe(float64(replicas)) - c.logger.Infof("on_failure chunk %s: %d/%d intended storers locally hold the chunk", chunkAddr, replicas, len(storers)) + return n } diff --git a/pkg/topohealth/chunkwalk.go b/pkg/topohealth/chunkwalk.go new file mode 100644 index 000000000..4b900cc72 --- /dev/null +++ b/pkg/topohealth/chunkwalk.go @@ -0,0 +1,281 @@ +package topohealth + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + + "github.com/ethersphere/bee/v2/pkg/file/pipeline/builder" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/orchestration" + "golang.org/x/sync/errgroup" +) + +// ChunkPosition is where a chunk sits in the file's Merkle-like tree. +type ChunkPosition string + +const ( + ChunkPositionRoot ChunkPosition = "root" + ChunkPositionIntermediate ChunkPosition = "intermediate" + ChunkPositionLeaf ChunkPosition = "leaf" +) + +// ChunkInfo is one chunk's identity plus its position in the upload tree. +type ChunkInfo struct { + Address swarm.Address `json:"address"` + Position ChunkPosition `json:"position"` +} + +// SplitChunkAddresses runs the chunk-splitting pipeline locally over the same +// bytes that were (or will be) uploaded, returning the root chunk address and +// the full list of chunks bee would produce. It is deterministic for the same +// (data, redundancy level) pair, so the addresses match what bee stores. +func SplitChunkAddresses(ctx context.Context, data []byte, rLevel *redundancy.Level) (swarm.Address, []ChunkInfo, error) { + level := redundancy.NONE + if rLevel != nil { + level = *rLevel + } + // addressOnlyCollector classifies each chunk as it is produced and discards + // the chunk data immediately, so the splitter does not retain a copy of the + // payload in memory for the lifetime of the walk. + col := &addressOnlyCollector{rootPlaceholder: swarm.ZeroAddress} + pipe := builder.NewPipelineBuilder(ctx, col, false, level) + root, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data)) + if err != nil { + return swarm.ZeroAddress, nil, fmt.Errorf("split pipeline: %w", err) + } + // Promote whichever chunk matches the final root. + for i := range col.chunks { + if col.chunks[i].Address.Equal(root) { + col.chunks[i].Position = ChunkPositionRoot + } + } + return root, col.chunks, nil +} + +// addressOnlyCollector classifies a chunk by its span at Put time, stores just +// the address + position, and does not keep the chunk data. The pipeline is +// single-writer so no synchronisation is needed. +type addressOnlyCollector struct { + chunks []ChunkInfo + rootPlaceholder swarm.Address +} + +func (c *addressOnlyCollector) Put(_ context.Context, ch swarm.Chunk) error { + c.chunks = append(c.chunks, ChunkInfo{ + Address: ch.Address(), + Position: classifyBySpan(ch.Data()), + }) + return nil +} + +// classifyBySpan derives a position from the span prefix only. The final root +// promotion happens after FeedPipeline returns (we cannot know which chunk +// will be the root until the pipeline closes). +func classifyBySpan(data []byte) ChunkPosition { + if len(data) < swarm.SpanSize { + return ChunkPositionLeaf + } + span := binary.LittleEndian.Uint64(data[:swarm.SpanSize]) + if span > swarm.ChunkSize { + return ChunkPositionIntermediate + } + return ChunkPositionLeaf +} + +// StorerInfo is one full node's identity plus its current storage radius, +// gathered once at the start of a walk so per-chunk classification is local. +type StorerInfo struct { + Client *bee.Client + Name string + Overlay swarm.Address + overlayBytes []byte // cached for inner-loop comparison + StorageRadius uint8 +} + +// GatherStorers collects overlay + storage radius for every full node in the +// cluster in parallel. Bootnodes and light nodes are excluded. +func GatherStorers(ctx context.Context, cluster orchestration.Cluster) ([]StorerInfo, error) { + type job struct { + client *bee.Client + name string + } + var jobs []job + for _, n := range cluster.Nodes() { + cfg := n.Config() + if !cfg.FullNode || cfg.BootnodeMode { + continue + } + jobs = append(jobs, job{client: n.Client(), name: n.Name()}) + } + out := make([]StorerInfo, len(jobs)) + g, gctx := errgroup.WithContext(ctx) + for i, j := range jobs { + g.Go(func() error { + addrs, err := j.client.Addresses(gctx) + if err != nil { + return fmt.Errorf("addresses for %s: %w", j.name, err) + } + st, err := j.client.Status(gctx) + if err != nil { + return fmt.Errorf("status for %s: %w", j.name, err) + } + out[i] = StorerInfo{ + Client: j.client, + Name: j.name, + Overlay: addrs.Overlay, + overlayBytes: addrs.Overlay.Bytes(), + StorageRadius: st.StorageRadius, + } + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + return out, nil +} + +func closestStorer(chunkAddr swarm.Address, storers []StorerInfo) (StorerInfo, error) { + best := storers[0] + for _, s := range storers[1:] { + cmp, err := swarm.DistanceCmp(chunkAddr, s.Overlay, best.Overlay) + if err != nil { + return StorerInfo{}, fmt.Errorf("distance cmp: %w", err) + } + if cmp > 0 { + best = s + } + } + return best, nil +} + +// ChunkCheck is one chunk's presence result on its intended (closest) storer. +type ChunkCheck struct { + Address swarm.Address `json:"address"` + Position ChunkPosition `json:"position"` + StorerName string `json:"storer"` + StorerOverlay swarm.Address `json:"storerOverlay"` + Proximity uint8 `json:"proximity"` + StorageRadius uint8 `json:"storageRadius"` + // OutOfAOR: PO(chunk, storer) < storer.StorageRadius. When the storer is + // the closest in the cluster and the chunk is still out of its AOR, no + // node covers this address (cluster-coverage gap). When Present is also + // true, that is a direct bee#5400 bug-1 fingerprint: a node is holding a + // chunk outside its own AOR. + OutOfAOR bool `json:"outOfAOR"` + Present bool `json:"present"` + Error string `json:"error,omitempty"` +} + +// PerPositionCounts is per-tree-position aggregation of a walk. +type PerPositionCounts map[ChunkPosition]int + +func (c PerPositionCounts) add(p ChunkPosition) { + c[p]++ +} + +// WalkResult summarises a chunk walk. Counters are exact; slices are +// truncated for logging. +type WalkResult struct { + Checked int `json:"checked"` + Missing []ChunkCheck `json:"missing,omitempty"` + OutOfAORHits []ChunkCheck `json:"outOfAORHits,omitempty"` + MissingTotal PerPositionCounts `json:"missingTotal"` + MissingOutOfAOR PerPositionCounts `json:"missingOutOfAOR"` + MissingInAOR PerPositionCounts `json:"missingInAOR"` + PresentOutOfAOR PerPositionCounts `json:"presentOutOfAOR"` + ProbeErrors int `json:"probeErrors"` +} + +// WalkChunks HEADs each chunk address on its closest full node and produces +// per-chunk presence + AOR results. Concurrency is bounded by parallelism; +// the returned WalkResult holds exact per-position counters plus +// log-friendly truncated slices (maxReported per category). +func WalkChunks(ctx context.Context, storers []StorerInfo, chunks []ChunkInfo, parallelism, maxReported int) (WalkResult, error) { + if len(storers) == 0 { + return WalkResult{}, errors.New("no storers") + } + if parallelism <= 0 { + parallelism = 32 + } + + checks := make([]ChunkCheck, len(chunks)) + sem := make(chan struct{}, parallelism) + var wg sync.WaitGroup +dispatch: + for i, c := range chunks { + select { + case <-ctx.Done(): + break dispatch + case sem <- struct{}{}: + } + wg.Add(1) + go func(idx int, ci ChunkInfo) { + defer wg.Done() + defer func() { <-sem }() + storer, err := closestStorer(ci.Address, storers) + if err != nil { + checks[idx] = ChunkCheck{Address: ci.Address, Position: ci.Position, Error: err.Error()} + return + } + po := swarm.Proximity(ci.Address.Bytes(), storer.overlayBytes) + check := ChunkCheck{ + Address: ci.Address, + Position: ci.Position, + StorerName: storer.Name, + StorerOverlay: storer.Overlay, + Proximity: po, + StorageRadius: storer.StorageRadius, + OutOfAOR: po < storer.StorageRadius, + } + has, herr := storer.Client.LocalHasChunk(ctx, ci.Address) + if herr != nil { + check.Error = herr.Error() + } else { + check.Present = has + } + checks[idx] = check + }(i, c) + } + wg.Wait() + + res := WalkResult{ + MissingTotal: PerPositionCounts{}, + MissingOutOfAOR: PerPositionCounts{}, + MissingInAOR: PerPositionCounts{}, + PresentOutOfAOR: PerPositionCounts{}, + } + for _, c := range checks { + // Skip chunks we never managed to probe — counting them as missing + // would inflate the loss metrics with transient API errors. + if c.Error != "" { + res.ProbeErrors++ + continue + } + res.Checked++ + if !c.Present { + res.MissingTotal.add(c.Position) + if c.OutOfAOR { + res.MissingOutOfAOR.add(c.Position) + } else { + res.MissingInAOR.add(c.Position) + } + if len(res.Missing) < maxReported { + res.Missing = append(res.Missing, c) + } + } + if c.Present && c.OutOfAOR { + res.PresentOutOfAOR.add(c.Position) + if len(res.OutOfAORHits) < maxReported { + res.OutOfAORHits = append(res.OutOfAORHits, c) + } + } + } + return res, ctx.Err() +} diff --git a/pkg/topohealth/chunkwalk_test.go b/pkg/topohealth/chunkwalk_test.go new file mode 100644 index 000000000..71dcd5a4c --- /dev/null +++ b/pkg/topohealth/chunkwalk_test.go @@ -0,0 +1,83 @@ +package topohealth + +import ( + "context" + "crypto/rand" + "testing" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func TestSplitChunkAddresses_SmallFileSingleChunk(t *testing.T) { + data := make([]byte, 1024) // < swarm.ChunkSize → fits in a single leaf chunk + if _, err := rand.Read(data); err != nil { + t.Fatal(err) + } + root, chunks, err := SplitChunkAddresses(context.Background(), data, nil) + if err != nil { + t.Fatalf("split: %v", err) + } + if root.Equal(swarm.ZeroAddress) { + t.Fatal("root address is zero") + } + if len(chunks) != 1 { + t.Fatalf("expected 1 chunk, got %d", len(chunks)) + } + if chunks[0].Position != ChunkPositionRoot { + t.Errorf("expected root position, got %s", chunks[0].Position) + } + if !chunks[0].Address.Equal(root) { + t.Errorf("single chunk address %s != root %s", chunks[0].Address, root) + } +} + +func TestSplitChunkAddresses_MultiChunkTreeHasIntermediate(t *testing.T) { + // Enough data to force at least one intermediate level: roughly + // (BranchingFactor + 1) leaves = 129 chunks * 4096 = ~528KB. + const size = 600 * 1024 + data := make([]byte, size) + if _, err := rand.Read(data); err != nil { + t.Fatal(err) + } + root, chunks, err := SplitChunkAddresses(context.Background(), data, nil) + if err != nil { + t.Fatalf("split: %v", err) + } + if len(chunks) < 130 { + t.Fatalf("expected >130 chunks for %d bytes, got %d", size, len(chunks)) + } + var roots, intermediates, leaves int + for _, c := range chunks { + switch c.Position { + case ChunkPositionRoot: + roots++ + case ChunkPositionIntermediate: + intermediates++ + case ChunkPositionLeaf: + leaves++ + } + } + if roots != 1 { + t.Errorf("expected 1 root, got %d", roots) + } + if intermediates < 1 { + t.Errorf("expected at least 1 intermediate chunk for %d bytes of data, got %d", size, intermediates) + } + if leaves < 130 { + t.Errorf("expected at least 130 leaves, got %d", leaves) + } + // All chunks must be addressable. + seen := make(map[string]bool, len(chunks)) + for _, c := range chunks { + if c.Address.Equal(swarm.ZeroAddress) { + t.Errorf("chunk has zero address") + } + if seen[c.Address.ByteString()] { + t.Errorf("duplicate chunk address %s", c.Address) + } + seen[c.Address.ByteString()] = true + } + if !seen[root.ByteString()] { + t.Errorf("root %s not in collected chunks", root) + } +} diff --git a/pkg/topohealth/log.go b/pkg/topohealth/log.go index 8de635c45..3f9241391 100644 --- a/pkg/topohealth/log.go +++ b/pkg/topohealth/log.go @@ -39,6 +39,25 @@ func LogVerdict(logger logging.Logger, phase Phase, v Verdict) { }).Info("topohealth.verdict") } +// LogChunkCheck emits one structured line per missing or out-of-AOR chunk +// from a WalkChunks result. +func LogChunkCheck(logger logging.Logger, kind, chunkAddr string, c ChunkCheck) { + logger.WithFields(logrus.Fields{ + "event": "topohealth.chunk_check", + "kind": kind, + "upload_root": chunkAddr, + "chunk_address": c.Address.String(), + "position": string(c.Position), + "storer": c.StorerName, + "storer_overlay": c.StorerOverlay.String(), + "proximity": c.Proximity, + "storage_radius": c.StorageRadius, + "out_of_aor": c.OutOfAOR, + "present": c.Present, + "head_error": c.Error, + }).Info("topohealth.chunk_check") +} + // LogStorerResult emits one structured line per intended-storer probe, // including the HEAD /chunks/{addr} ground-truth. func LogStorerResult(logger logging.Logger, chunkAddr, phase string, idx int, r StorerResult) { From b52e93aac94e86bc275635d759460a343a8c0995 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 18 May 2026 14:18:46 +0200 Subject: [PATCH 3/7] refactor(smoke): run chunk walk concurrently with on-failure probes --- pkg/check/smoke/smoke.go | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/pkg/check/smoke/smoke.go b/pkg/check/smoke/smoke.go index 8045d0094..eadd4bc04 100644 --- a/pkg/check/smoke/smoke.go +++ b/pkg/check/smoke/smoke.go @@ -373,17 +373,18 @@ func (c *Check) probe(ctx context.Context, phase topohealth.Phase, client *bee.C return v.Status } -// onFailureDump fans out probes for uploader, downloader, and intended storers -// concurrently, then walks every chunk of the uploaded file to identify exactly -// which chunks are missing and classify each missing chunk as out-of-AOR -// (bee#5400 bug 1) vs in-AOR-not-stored (bee#5400 bug 2/3). +// onFailureDump fans out four independent diagnostics concurrently: uploader +// probe, downloader probe, the 3 intended-storer probes (with HEAD on the +// root), and the full chunk walk that classifies every missing chunk as +// out-of-AOR (bee#5400 bug 1), in-AOR-not-stored (bug 2/3), or +// cluster-coverage-gap. func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster, uploader, downloader *bee.Client, root swarm.Address, allChunks []topohealth.ChunkInfo, thresholds topohealth.Thresholds) { var ( wg sync.WaitGroup storers []topohealth.StorerResult storErr error ) - wg.Add(3) + wg.Add(4) go func() { defer wg.Done() c.probe(ctx, topohealth.PhaseOnFailure, uploader, thresholds) @@ -396,21 +397,23 @@ func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster defer wg.Done() storers, storErr = topohealth.IntendedStorers(ctx, cluster, root, onFailureStorerProbeCount, thresholds) }() + go func() { + defer wg.Done() + if len(allChunks) == 0 { + c.logger.Warningf("on_failure: no pre-computed chunk list (split failed earlier); skipping chunk walk") + return + } + c.walkChunksOnFailure(ctx, cluster, root, allChunks) + }() wg.Wait() if storErr != nil { c.logger.Errorf("on_failure intended storers probe failed: %v", storErr) - } else { - for i, r := range storers { - topohealth.LogStorerResult(c.logger, root.String(), string(topohealth.PhaseOnFailure), i, r) - } - } - - if len(allChunks) == 0 { - c.logger.Warningf("on_failure: no pre-computed chunk list (split failed earlier); skipping chunk walk") return } - c.walkChunksOnFailure(ctx, cluster, root, allChunks) + for i, r := range storers { + topohealth.LogStorerResult(c.logger, root.String(), string(topohealth.PhaseOnFailure), i, r) + } } // walkChunksOnFailure does a HEAD /chunks/{addr} per chunk against its From c683a142df73c9c23ba03415ca1d7bc5beac1325 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 18 May 2026 14:51:39 +0200 Subject: [PATCH 4/7] feat(smoke): cap local chunk split --- pkg/check/smoke/smoke.go | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/pkg/check/smoke/smoke.go b/pkg/check/smoke/smoke.go index eadd4bc04..e938b5d08 100644 --- a/pkg/check/smoke/smoke.go +++ b/pkg/check/smoke/smoke.go @@ -35,6 +35,11 @@ const ( // chunkWalkMaxReported truncates per-category result lists so a large file // with many missing chunks does not flood the log. chunkWalkMaxReported = 50 + // chunkWalkMaxBytes caps the file size for which the local split + chunk + // walk runs. Larger files skip the walk: at ~200 MB the walk takes tens + // of seconds on a healthy cluster and minutes on a slow one; beyond that + // it stops being a useful per-failure diagnostic. + chunkWalkMaxBytes int64 = 200 * 1024 * 1024 ) // Options represents smoke test options @@ -201,13 +206,20 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option // Pre-compute the chunk address tree locally so we can pin-point a // missing chunk if download later fails. Deterministic for the same - // (data, rLevel) input — matches what bee would produce. - splitRoot, allChunks, splitErr := topohealth.SplitChunkAddresses(ctx, txData, rLevel) - if splitErr != nil { - c.logger.Errorf("local chunk split failed for size %d: %v", contentSize, splitErr) - allChunks = nil // fall back to root-only diagnostics + // (data, rLevel) input — matches what bee would produce. Skipped + // for files above chunkWalkMaxBytes since the walk grows linearly + // with chunk count and stops being a useful per-failure tool. + var allChunks []topohealth.ChunkInfo + if contentSize <= chunkWalkMaxBytes { + splitRoot, chunks, splitErr := topohealth.SplitChunkAddresses(ctx, txData, rLevel) + if splitErr != nil { + c.logger.Errorf("local chunk split failed for size %d: %v", contentSize, splitErr) + } else { + allChunks = chunks + c.logger.Infof("local split produced %d chunks (root=%s)", len(allChunks), splitRoot) + } } else { - c.logger.Infof("local split produced %d chunks (root=%s)", len(allChunks), splitRoot) + c.logger.Infof("file size %d > %d (chunkWalkMaxBytes); skipping local split and on-failure chunk walk", contentSize, chunkWalkMaxBytes) } if c.probe(ctx, topohealth.PhasePreUpload, uploader, thresholds) == topohealth.StatusUnhealthy { @@ -400,7 +412,8 @@ func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster go func() { defer wg.Done() if len(allChunks) == 0 { - c.logger.Warningf("on_failure: no pre-computed chunk list (split failed earlier); skipping chunk walk") + // allChunks is empty either because the file size exceeded + // chunkWalkMaxBytes or because the local split errored return } c.walkChunksOnFailure(ctx, cluster, root, allChunks) From 26b1a3f1ab2c06e3c0a0061268a94171c77b7edf Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 18 May 2026 16:47:55 +0200 Subject: [PATCH 5/7] refactor(topohealth): drop dead code and one-field wrapper structs --- pkg/orchestration/k8s/cluster.go | 20 +++++------ pkg/topohealth/chunkwalk.go | 57 ++++++++++++++------------------ pkg/topohealth/cluster.go | 10 ------ pkg/topohealth/health.go | 10 +++--- pkg/topohealth/log.go | 1 - 5 files changed, 37 insertions(+), 61 deletions(-) diff --git a/pkg/orchestration/k8s/cluster.go b/pkg/orchestration/k8s/cluster.go index 2db6accd5..22cccf6c8 100644 --- a/pkg/orchestration/k8s/cluster.go +++ b/pkg/orchestration/k8s/cluster.go @@ -454,32 +454,28 @@ func (c *Cluster) FullNodeClientsByDistance(ctx context.Context, chunkAddr swarm dist []byte } - type job struct { - client *bee.Client - idx int - } - var jobs []job + var clients []*bee.Client for _, n := range c.Nodes() { cfg := n.Config() if !cfg.FullNode || cfg.BootnodeMode { continue } - jobs = append(jobs, job{client: n.Client()}) + clients = append(clients, n.Client()) } - entries := make([]entry, len(jobs)) + entries := make([]entry, len(clients)) g, gctx := errgroup.WithContext(ctx) - for i, j := range jobs { + for i, cl := range clients { g.Go(func() error { - addrs, err := j.client.Addresses(gctx) + addrs, err := cl.Addresses(gctx) if err != nil { - return fmt.Errorf("get overlay for %s: %w", j.client.Name(), err) + return fmt.Errorf("get overlay for %s: %w", cl.Name(), err) } dist, err := swarm.DistanceRaw(chunkAddr, addrs.Overlay) if err != nil { - return fmt.Errorf("distance for %s: %w", j.client.Name(), err) + return fmt.Errorf("distance for %s: %w", cl.Name(), err) } - entries[i] = entry{client: j.client, dist: dist} + entries[i] = entry{client: cl, dist: dist} return nil }) } diff --git a/pkg/topohealth/chunkwalk.go b/pkg/topohealth/chunkwalk.go index 4b900cc72..15beaf6d2 100644 --- a/pkg/topohealth/chunkwalk.go +++ b/pkg/topohealth/chunkwalk.go @@ -43,7 +43,7 @@ func SplitChunkAddresses(ctx context.Context, data []byte, rLevel *redundancy.Le // addressOnlyCollector classifies each chunk as it is produced and discards // the chunk data immediately, so the splitter does not retain a copy of the // payload in memory for the lifetime of the walk. - col := &addressOnlyCollector{rootPlaceholder: swarm.ZeroAddress} + col := &addressOnlyCollector{} pipe := builder.NewPipelineBuilder(ctx, col, false, level) root, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data)) if err != nil { @@ -62,8 +62,7 @@ func SplitChunkAddresses(ctx context.Context, data []byte, rLevel *redundancy.Le // the address + position, and does not keep the chunk data. The pipeline is // single-writer so no synchronisation is needed. type addressOnlyCollector struct { - chunks []ChunkInfo - rootPlaceholder swarm.Address + chunks []ChunkInfo } func (c *addressOnlyCollector) Put(_ context.Context, ch swarm.Chunk) error { @@ -101,33 +100,29 @@ type StorerInfo struct { // GatherStorers collects overlay + storage radius for every full node in the // cluster in parallel. Bootnodes and light nodes are excluded. func GatherStorers(ctx context.Context, cluster orchestration.Cluster) ([]StorerInfo, error) { - type job struct { - client *bee.Client - name string - } - var jobs []job + var clients []*bee.Client for _, n := range cluster.Nodes() { cfg := n.Config() if !cfg.FullNode || cfg.BootnodeMode { continue } - jobs = append(jobs, job{client: n.Client(), name: n.Name()}) + clients = append(clients, n.Client()) } - out := make([]StorerInfo, len(jobs)) + out := make([]StorerInfo, len(clients)) g, gctx := errgroup.WithContext(ctx) - for i, j := range jobs { + for i, cl := range clients { g.Go(func() error { - addrs, err := j.client.Addresses(gctx) + addrs, err := cl.Addresses(gctx) if err != nil { - return fmt.Errorf("addresses for %s: %w", j.name, err) + return fmt.Errorf("addresses for %s: %w", cl.Name(), err) } - st, err := j.client.Status(gctx) + st, err := cl.Status(gctx) if err != nil { - return fmt.Errorf("status for %s: %w", j.name, err) + return fmt.Errorf("status for %s: %w", cl.Name(), err) } out[i] = StorerInfo{ - Client: j.client, - Name: j.name, + Client: cl, + Name: cl.Name(), Overlay: addrs.Overlay, overlayBytes: addrs.Overlay.Bytes(), StorageRadius: st.StorageRadius, @@ -176,21 +171,17 @@ type ChunkCheck struct { // PerPositionCounts is per-tree-position aggregation of a walk. type PerPositionCounts map[ChunkPosition]int -func (c PerPositionCounts) add(p ChunkPosition) { - c[p]++ -} - // WalkResult summarises a chunk walk. Counters are exact; slices are // truncated for logging. type WalkResult struct { - Checked int `json:"checked"` - Missing []ChunkCheck `json:"missing,omitempty"` - OutOfAORHits []ChunkCheck `json:"outOfAORHits,omitempty"` - MissingTotal PerPositionCounts `json:"missingTotal"` - MissingOutOfAOR PerPositionCounts `json:"missingOutOfAOR"` - MissingInAOR PerPositionCounts `json:"missingInAOR"` - PresentOutOfAOR PerPositionCounts `json:"presentOutOfAOR"` - ProbeErrors int `json:"probeErrors"` + Checked int `json:"checked"` + Missing []ChunkCheck `json:"missing,omitempty"` + OutOfAORHits []ChunkCheck `json:"outOfAORHits,omitempty"` + MissingTotal PerPositionCounts `json:"missingTotal"` + MissingOutOfAOR PerPositionCounts `json:"missingOutOfAOR"` + MissingInAOR PerPositionCounts `json:"missingInAOR"` + PresentOutOfAOR PerPositionCounts `json:"presentOutOfAOR"` + ProbeErrors int `json:"probeErrors"` } // WalkChunks HEADs each chunk address on its closest full node and produces @@ -260,18 +251,18 @@ dispatch: } res.Checked++ if !c.Present { - res.MissingTotal.add(c.Position) + res.MissingTotal[c.Position]++ if c.OutOfAOR { - res.MissingOutOfAOR.add(c.Position) + res.MissingOutOfAOR[c.Position]++ } else { - res.MissingInAOR.add(c.Position) + res.MissingInAOR[c.Position]++ } if len(res.Missing) < maxReported { res.Missing = append(res.Missing, c) } } if c.Present && c.OutOfAOR { - res.PresentOutOfAOR.add(c.Position) + res.PresentOutOfAOR[c.Position]++ if len(res.OutOfAORHits) < maxReported { res.OutOfAORHits = append(res.OutOfAORHits, c) } diff --git a/pkg/topohealth/cluster.go b/pkg/topohealth/cluster.go index 6329609da..eb5cf85e5 100644 --- a/pkg/topohealth/cluster.go +++ b/pkg/topohealth/cluster.go @@ -81,13 +81,3 @@ func probeStorer(ctx context.Context, cl *bee.Client, chunkAddr swarm.Address, t } return r } - -func ReplicaCount(rs []StorerResult) int { - n := 0 - for _, r := range rs { - if r.HasChunk { - n++ - } - } - return n -} diff --git a/pkg/topohealth/health.go b/pkg/topohealth/health.go index 9cce587c8..8aad94df2 100644 --- a/pkg/topohealth/health.go +++ b/pkg/topohealth/health.go @@ -120,11 +120,11 @@ type Verdict struct { // Probe runs the single-node probe: /status and /topology in parallel. func Probe(ctx context.Context, c *bee.Client, t Thresholds) (Verdict, error) { var ( - wg sync.WaitGroup - st *api.StatusResponse - top bee.Topology - stErr error - topErr error + wg sync.WaitGroup + st *api.StatusResponse + top bee.Topology + stErr error + topErr error ) wg.Add(2) go func() { diff --git a/pkg/topohealth/log.go b/pkg/topohealth/log.go index 3f9241391..5c4245a6d 100644 --- a/pkg/topohealth/log.go +++ b/pkg/topohealth/log.go @@ -10,7 +10,6 @@ type Phase string const ( PhasePreUpload Phase = "pre_upload" - PhasePostUpload Phase = "post_upload" PhasePreDownload Phase = "pre_download" PhaseOnFailure Phase = "on_failure" ) From b2f51feb1d1f4f9b32848aa39421c24df417e65e Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 18 May 2026 21:02:58 +0200 Subject: [PATCH 6/7] fix(smoke): skip retry-wait before first attempt; range over rLevels --- pkg/check/smoke/smoke.go | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/pkg/check/smoke/smoke.go b/pkg/check/smoke/smoke.go index e938b5d08..c5231d41b 100644 --- a/pkg/check/smoke/smoke.go +++ b/pkg/check/smoke/smoke.go @@ -170,9 +170,7 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option if len(rLevels) == 0 { rLevels = []*redundancy.Level{nil} } - rLevelIdx := 0 - for { - rLevel := rLevels[rLevelIdx] + for _, rLevel := range rLevels { for _, contentSize := range fileSizes { select { case <-ctx.Done(): @@ -233,15 +231,17 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option txCancel context.CancelFunc = func() {} ) - for range 3 { + for attempt := range 3 { txCancel() uploaded = false - select { - case <-ctx.Done(): - return nil - case <-time.After(o.TxOnErrWait): + if attempt > 0 { + select { + case <-ctx.Done(): + return nil + case <-time.After(o.TxOnErrWait): + } } txCtx, txCancel = context.WithTimeout(ctx, o.UploadTimeout) @@ -284,13 +284,15 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option rxCancel context.CancelFunc = func() {} ) - for range 3 { + for attempt := range 3 { rxCancel() - select { - case <-ctx.Done(): - return nil - case <-time.After(o.RxOnErrWait): + if attempt > 0 { + select { + case <-ctx.Done(): + return nil + case <-time.After(o.RxOnErrWait): + } } c.metrics.DownloadAttempts.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() @@ -346,10 +348,6 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option c.logger.Infof("completed testing file size: %d bytes", contentSize) } - rLevelIdx++ - if rLevelIdx >= len(rLevels) { - break - } } time.Sleep(o.IterationWait) From 855977ab2613667de97f0274d184506e1fb32b45 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Thu, 21 May 2026 15:04:15 +0200 Subject: [PATCH 7/7] feat(smoke): add EOF-with-clean-walk metric; extract iteration helpers --- pkg/check/smoke/metrics.go | 9 + pkg/check/smoke/smoke.go | 361 ++++++++++++++++++++----------------- 2 files changed, 203 insertions(+), 167 deletions(-) diff --git a/pkg/check/smoke/metrics.go b/pkg/check/smoke/metrics.go index 9233017f6..e95dd6dcf 100644 --- a/pkg/check/smoke/metrics.go +++ b/pkg/check/smoke/metrics.go @@ -34,6 +34,7 @@ type metrics struct { ChunksMissingInAOR *prometheus.CounterVec // {position} — bug 2/3 fingerprint (in-depth but not stored) ChunksPresentOutOfAOR *prometheus.CounterVec // {position} — bug 1 confirmed (chunk exists outside its AOR) FilesWithLoss prometheus.Counter + EOFWithCleanWalk prometheus.Counter } const ( @@ -283,6 +284,14 @@ func newMetrics(subsystem string) metrics { Help: "Files where the on-failure chunk walk found at least one missing chunk.", }, ), + EOFWithCleanWalk: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "eof_with_clean_walk_total", + Help: "Downloads that failed with unexpected EOF where the chunk walk found nothing missing and no out-of-AOR-present chunks. Strong signal that the EOF was NOT bee#5400 chunk loss but something else (transient retrieval, downloader networking, etc.).", + }, + ), } } diff --git a/pkg/check/smoke/smoke.go b/pkg/check/smoke/smoke.go index c5231d41b..3dda6e4f1 100644 --- a/pkg/check/smoke/smoke.go +++ b/pkg/check/smoke/smoke.go @@ -25,6 +25,25 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// testRunner is the subset of pkg/test used by the smoke iteration. Declared +// locally so we don't have to name the unexported return type of test.NewTest. +type testRunner interface { + Upload(ctx context.Context, c *bee.Client, data []byte, batchID string, rLevel *redundancy.Level) (swarm.Address, time.Duration, error) + Download(ctx context.Context, c *bee.Client, a swarm.Address, rLevel *redundancy.Level) ([]byte, time.Duration, error) +} + +// iterCtx groups the inputs that stay constant across all (contentSize, rLevel) +// pairs within a single outer iteration. Reduces runIteration's parameter count. +type iterCtx struct { + cluster orchestration.Cluster + uploader *bee.Client + downloader *bee.Client + batchID string + runner testRunner + opts Options + thresholds topohealth.Thresholds +} + const ( // onFailureStorerProbeCount is the number of intended storers to probe on a // download failure (closest by XOR distance to root chunk address). @@ -170,190 +189,192 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option if len(rLevels) == 0 { rLevels = []*redundancy.Level{nil} } + iter := iterCtx{ + cluster: cluster, + uploader: uploader, + downloader: downloader, + batchID: batchID, + runner: test, + opts: o, + thresholds: thresholds, + } for _, rLevel := range rLevels { for _, contentSize := range fileSizes { select { case <-ctx.Done(): return nil default: - if rLevel != nil { - c.logger.Infof("testing file size: %d bytes (%.2f KB), redundancy level: %d", contentSize, float64(contentSize)/1024, *rLevel) - } else { - c.logger.Infof("testing file size: %d bytes (%.2f KB), redundancy level: (not set)", contentSize, float64(contentSize)/1024) - } } + c.runIteration(ctx, iter, contentSize, rLevel) + } + } - sizeLabel := fmt.Sprintf("%d", contentSize) - rLevelLabel := redundancyLevelLabel(rLevel) - - var ( - txDuration time.Duration - rxDuration time.Duration - txData []byte - rxData []byte - address swarm.Address - uploaded bool - downloaded bool - ) - - txData = make([]byte, contentSize) - if _, err := rand.Read(txData); err != nil { - c.logger.Errorf("unable to create random content for size %d: %v", contentSize, err) - continue - } + time.Sleep(o.IterationWait) + } - // Pre-compute the chunk address tree locally so we can pin-point a - // missing chunk if download later fails. Deterministic for the same - // (data, rLevel) input — matches what bee would produce. Skipped - // for files above chunkWalkMaxBytes since the walk grows linearly - // with chunk count and stops being a useful per-failure tool. - var allChunks []topohealth.ChunkInfo - if contentSize <= chunkWalkMaxBytes { - splitRoot, chunks, splitErr := topohealth.SplitChunkAddresses(ctx, txData, rLevel) - if splitErr != nil { - c.logger.Errorf("local chunk split failed for size %d: %v", contentSize, splitErr) - } else { - allChunks = chunks - c.logger.Infof("local split produced %d chunks (root=%s)", len(allChunks), splitRoot) - } - } else { - c.logger.Infof("file size %d > %d (chunkWalkMaxBytes); skipping local split and on-failure chunk walk", contentSize, chunkWalkMaxBytes) - } + return nil +} - if c.probe(ctx, topohealth.PhasePreUpload, uploader, thresholds) == topohealth.StatusUnhealthy { - c.metrics.UnhealthyAbortsPreUp.Inc() - c.logger.Errorf("aborting iteration: uploader %s is UNHEALTHY pre-upload", uploader.Name()) - continue - } +// runIteration runs a single upload→sync→download cycle for one (contentSize, +// rLevel) pair. On download failure it triggers the full on-failure dump, +// which includes the chunk walk if a local split was available. +func (c *Check) runIteration(ctx context.Context, iter iterCtx, contentSize int64, rLevel *redundancy.Level) { + if rLevel != nil { + c.logger.Infof("testing file size: %d bytes (%.2f KB), redundancy level: %d", contentSize, float64(contentSize)/1024, *rLevel) + } else { + c.logger.Infof("testing file size: %d bytes (%.2f KB), redundancy level: (not set)", contentSize, float64(contentSize)/1024) + } - var ( - txCtx context.Context - txCancel context.CancelFunc = func() {} - ) - - for attempt := range 3 { - txCancel() - - uploaded = false - - if attempt > 0 { - select { - case <-ctx.Done(): - return nil - case <-time.After(o.TxOnErrWait): - } - } - - txCtx, txCancel = context.WithTimeout(ctx, o.UploadTimeout) - - c.metrics.UploadAttempts.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Inc() - address, txDuration, err = test.Upload(txCtx, uploader, txData, batchID, rLevel) - if err != nil { - c.metrics.UploadErrors.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Inc() - c.logger.Errorf("upload failed for size %d: %v", contentSize, err) - c.logger.Infof("retrying in: %v", o.TxOnErrWait) - } else { - uploaded = true - break - } - } - txCancel() - if !uploaded { - c.logger.Infof("skipping download for size %d due to upload failure", contentSize) - continue - } + sizeLabel := fmt.Sprintf("%d", contentSize) + rLevelLabel := redundancyLevelLabel(rLevel) - c.metrics.UploadDuration.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Observe(txDuration.Seconds()) - c.metrics.UploadSuccess.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Inc() - c.metrics.UploadedBytes.WithLabelValues(uploader.Name(), rLevelLabel).Add(float64(contentSize)) + txData := make([]byte, contentSize) + if _, err := rand.Read(txData); err != nil { + c.logger.Errorf("unable to create random content for size %d: %v", contentSize, err) + return + } - if txDuration.Seconds() > 0 { - uploadThroughput := float64(contentSize) / txDuration.Seconds() - c.metrics.UploadThroughput.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Set(uploadThroughput) - } + // Pre-compute the chunk address tree locally so we can pin-point a missing + // chunk if download later fails. Deterministic for the same (data, rLevel) + // input — matches what bee would produce. Skipped for files above + // chunkWalkMaxBytes since the walk grows linearly with chunk count and + // stops being a useful per-failure tool. + var allChunks []topohealth.ChunkInfo + if contentSize <= chunkWalkMaxBytes { + splitRoot, chunks, splitErr := topohealth.SplitChunkAddresses(ctx, txData, rLevel) + if splitErr != nil { + c.logger.Errorf("local chunk split failed for size %d: %v", contentSize, splitErr) + } else { + allChunks = chunks + c.logger.Infof("local split produced %d chunks (root=%s)", len(allChunks), splitRoot) + } + } else { + c.logger.Infof("file size %d > %d (chunkWalkMaxBytes); skipping local split and on-failure chunk walk", contentSize, chunkWalkMaxBytes) + } - time.Sleep(o.NodesSyncWait) + if c.probe(ctx, topohealth.PhasePreUpload, iter.uploader, iter.thresholds) == topohealth.StatusUnhealthy { + c.metrics.UnhealthyAbortsPreUp.Inc() + c.logger.Errorf("aborting iteration: uploader %s is UNHEALTHY pre-upload", iter.uploader.Name()) + return + } - if c.probe(ctx, topohealth.PhasePreDownload, downloader, thresholds) == topohealth.StatusUnhealthy { - c.metrics.UnhealthyAbortsPreDown.Inc() - c.logger.Warningf("downloader %s is UNHEALTHY pre-download; attempting anyway", downloader.Name()) - } + address, txDuration, ok := c.uploadWithRetry(ctx, iter, sizeLabel, rLevelLabel, txData, rLevel) + if !ok { + c.logger.Infof("skipping download for size %d due to upload failure", contentSize) + return + } - var ( - rxCtx context.Context - rxCancel context.CancelFunc = func() {} - ) - - for attempt := range 3 { - rxCancel() - - if attempt > 0 { - select { - case <-ctx.Done(): - return nil - case <-time.After(o.RxOnErrWait): - } - } - - c.metrics.DownloadAttempts.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() - - rxCtx, rxCancel = context.WithTimeout(ctx, o.DownloadTimeout) - rxData, rxDuration, err = test.Download(rxCtx, downloader, address, rLevel) - if err != nil { - c.metrics.DownloadErrors.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() - if errors.Is(err, io.ErrUnexpectedEOF) { - c.metrics.DownloadEOFErrors.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() - } - c.logger.Errorf("download failed for size %d: %v", contentSize, err) - c.logger.Infof("retrying in: %v", o.RxOnErrWait) - continue - } - - if bytes.Equal(rxData, txData) { - c.metrics.DownloadDuration.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Observe(rxDuration.Seconds()) - c.metrics.DownloadSuccess.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() - c.metrics.DownloadedBytes.WithLabelValues(downloader.Name(), rLevelLabel).Add(float64(contentSize)) - - if rxDuration.Seconds() > 0 { - downloadThroughput := float64(contentSize) / rxDuration.Seconds() - c.metrics.DownloadThroughput.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Set(downloadThroughput) - } - downloaded = true - break - } - - c.logger.Infof("data mismatch for size %d: uploaded and downloaded data differ", contentSize) - c.metrics.DownloadMismatch.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() - - rxLen, txLen := len(rxData), len(txData) - if rxLen != txLen { - c.logger.Errorf("length mismatch for size %d: downloaded %d bytes, uploaded %d bytes", contentSize, rxLen, txLen) - continue - } - - var diff int - for i := range txData { - if txData[i] != rxData[i] { - diff++ - } - } - c.logger.Infof("data mismatch for size %d: found %d different bytes, ~%.2f%%", contentSize, diff, float64(diff)/float64(txLen)*100) - } - rxCancel() + c.metrics.UploadDuration.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Observe(txDuration.Seconds()) + c.metrics.UploadSuccess.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Inc() + c.metrics.UploadedBytes.WithLabelValues(iter.uploader.Name(), rLevelLabel).Add(float64(contentSize)) + if txDuration.Seconds() > 0 { + c.metrics.UploadThroughput.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Set(float64(contentSize) / txDuration.Seconds()) + } - if !downloaded { - c.logger.Errorf("all download attempts failed for size %d, dumping topology health and walking chunk tree", contentSize) - c.onFailureDump(ctx, cluster, uploader, downloader, address, allChunks, thresholds) - } + time.Sleep(iter.opts.NodesSyncWait) + + if c.probe(ctx, topohealth.PhasePreDownload, iter.downloader, iter.thresholds) == topohealth.StatusUnhealthy { + c.metrics.UnhealthyAbortsPreDown.Inc() + c.logger.Warningf("downloader %s is UNHEALTHY pre-download; attempting anyway", iter.downloader.Name()) + } - c.logger.Infof("completed testing file size: %d bytes", contentSize) + downloaded, eofSeen := c.downloadWithRetry(ctx, iter, sizeLabel, rLevelLabel, txData, address, rLevel) + + if !downloaded { + c.logger.Errorf("all download attempts failed for size %d, dumping topology health and walking chunk tree", contentSize) + c.onFailureDump(ctx, iter.cluster, iter.uploader, iter.downloader, address, allChunks, eofSeen, iter.thresholds) + } + + c.logger.Infof("completed testing file size: %d bytes", contentSize) +} + +// uploadWithRetry attempts the upload up to 3 times, sleeping TxOnErrWait +// between attempts (not before the first). Returns the address, duration, and +// whether any attempt succeeded. +func (c *Check) uploadWithRetry(ctx context.Context, iter iterCtx, sizeLabel, rLevelLabel string, txData []byte, rLevel *redundancy.Level) (swarm.Address, time.Duration, bool) { + var ( + address swarm.Address + txDuration time.Duration + ) + for attempt := range 3 { + if attempt > 0 { + select { + case <-ctx.Done(): + return swarm.ZeroAddress, 0, false + case <-time.After(iter.opts.TxOnErrWait): } } - - time.Sleep(o.IterationWait) + txCtx, cancel := context.WithTimeout(ctx, iter.opts.UploadTimeout) + c.metrics.UploadAttempts.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Inc() + addr, dur, err := iter.runner.Upload(txCtx, iter.uploader, txData, iter.batchID, rLevel) + cancel() + if err != nil { + c.metrics.UploadErrors.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Inc() + c.logger.Errorf("upload failed for size %d: %v", len(txData), err) + c.logger.Infof("retrying in: %v", iter.opts.TxOnErrWait) + continue + } + address, txDuration = addr, dur + return address, txDuration, true } + return swarm.ZeroAddress, 0, false +} - return nil +// downloadWithRetry attempts the download up to 3 times. Records per-attempt +// metrics, classifies EOF errors, and tracks whether any attempt hit +// io.ErrUnexpectedEOF (used to attribute on-failure walks). On data mismatch +// it logs the divergence and counts it but does not retry past the loop. +func (c *Check) downloadWithRetry(ctx context.Context, iter iterCtx, sizeLabel, rLevelLabel string, txData []byte, address swarm.Address, rLevel *redundancy.Level) (downloaded, eofSeen bool) { + for attempt := range 3 { + if attempt > 0 { + select { + case <-ctx.Done(): + return downloaded, eofSeen + case <-time.After(iter.opts.RxOnErrWait): + } + } + + c.metrics.DownloadAttempts.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + + rxCtx, cancel := context.WithTimeout(ctx, iter.opts.DownloadTimeout) + rxData, rxDuration, err := iter.runner.Download(rxCtx, iter.downloader, address, rLevel) + cancel() + if err != nil { + c.metrics.DownloadErrors.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + if errors.Is(err, io.ErrUnexpectedEOF) { + c.metrics.DownloadEOFErrors.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + eofSeen = true + } + c.logger.Errorf("download failed for size %d: %v", len(txData), err) + c.logger.Infof("retrying in: %v", iter.opts.RxOnErrWait) + continue + } + + if bytes.Equal(rxData, txData) { + c.metrics.DownloadDuration.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Observe(rxDuration.Seconds()) + c.metrics.DownloadSuccess.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + c.metrics.DownloadedBytes.WithLabelValues(iter.downloader.Name(), rLevelLabel).Add(float64(len(txData))) + if rxDuration.Seconds() > 0 { + c.metrics.DownloadThroughput.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Set(float64(len(txData)) / rxDuration.Seconds()) + } + return true, eofSeen + } + + c.logger.Infof("data mismatch for size %d: uploaded and downloaded data differ", len(txData)) + c.metrics.DownloadMismatch.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + if len(rxData) != len(txData) { + c.logger.Errorf("length mismatch for size %d: downloaded %d bytes, uploaded %d bytes", len(txData), len(rxData), len(txData)) + continue + } + var diff int + for i := range txData { + if txData[i] != rxData[i] { + diff++ + } + } + c.logger.Infof("data mismatch for size %d: found %d different bytes, ~%.2f%%", len(txData), diff, float64(diff)/float64(len(txData))*100) + } + return downloaded, eofSeen } func (c *Check) Report() []prometheus.Collector { @@ -388,7 +409,7 @@ func (c *Check) probe(ctx context.Context, phase topohealth.Phase, client *bee.C // root), and the full chunk walk that classifies every missing chunk as // out-of-AOR (bee#5400 bug 1), in-AOR-not-stored (bug 2/3), or // cluster-coverage-gap. -func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster, uploader, downloader *bee.Client, root swarm.Address, allChunks []topohealth.ChunkInfo, thresholds topohealth.Thresholds) { +func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster, uploader, downloader *bee.Client, root swarm.Address, allChunks []topohealth.ChunkInfo, eofSeen bool, thresholds topohealth.Thresholds) { var ( wg sync.WaitGroup storers []topohealth.StorerResult @@ -414,7 +435,7 @@ func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster // chunkWalkMaxBytes or because the local split errored return } - c.walkChunksOnFailure(ctx, cluster, root, allChunks) + c.walkChunksOnFailure(ctx, cluster, root, allChunks, eofSeen) }() wg.Wait() @@ -429,8 +450,10 @@ func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster // walkChunksOnFailure does a HEAD /chunks/{addr} per chunk against its // closest full node, records the per-bug counters, and logs every missing or -// out-of-AOR-present chunk. -func (c *Check) walkChunksOnFailure(ctx context.Context, cluster orchestration.Cluster, root swarm.Address, chunks []topohealth.ChunkInfo) { +// out-of-AOR-present chunk. If the failure included an EOF and the walk +// finds nothing wrong, eof_with_clean_walk_total is incremented — that's the +// "EOF was NOT bee#5400" signal. +func (c *Check) walkChunksOnFailure(ctx context.Context, cluster orchestration.Cluster, root swarm.Address, chunks []topohealth.ChunkInfo, eofSeen bool) { storers, err := topohealth.GatherStorers(ctx, cluster) if err != nil { c.logger.Errorf("on_failure gather storers failed: %v", err) @@ -464,12 +487,16 @@ func (c *Check) walkChunksOnFailure(ctx context.Context, cluster orchestration.C } totalMissing := sumPerPosition(res.MissingTotal) + totalPresentOOA := sumPerPosition(res.PresentOutOfAOR) if totalMissing > 0 { c.metrics.FilesWithLoss.Inc() } + if eofSeen && totalMissing == 0 && totalPresentOOA == 0 { + c.metrics.EOFWithCleanWalk.Inc() + } c.logger.Infof("on_failure walk: root=%s checked=%d probe_errors=%d missing=%d (out_of_aor=%d, in_aor=%d) present_out_of_aor=%d", root, res.Checked, res.ProbeErrors, totalMissing, - sumPerPosition(res.MissingOutOfAOR), sumPerPosition(res.MissingInAOR), sumPerPosition(res.PresentOutOfAOR)) + sumPerPosition(res.MissingOutOfAOR), sumPerPosition(res.MissingInAOR), totalPresentOOA) } func sumPerPosition(m topohealth.PerPositionCounts) int {