diff --git a/pkg/bee/api/node.go b/pkg/bee/api/node.go index 789eeccc..e808a83f 100644 --- a/pkg/bee/api/node.go +++ b/pkg/bee/api/node.go @@ -78,7 +78,11 @@ func (n *NodeService) Balances(ctx context.Context) (resp Balances, err error) { return resp, err } -// HasChunk returns true/false if node has a chunk +// HasChunk returns true/false if node has a chunk. +// +// NOTE: This uses GET /chunks/{addr}, which on bee falls back to network +// retrieval if the chunk is not present locally. It is therefore NOT a clean +// local-only check. Use LocalHasChunk for that. func (n *NodeService) HasChunk(ctx context.Context, a swarm.Address) (bool, error) { resp := struct { Message string `json:"message,omitempty"` @@ -95,6 +99,20 @@ func (n *NodeService) HasChunk(ctx context.Context, a swarm.Address) (bool, erro return true, nil } +// LocalHasChunk reports whether the chunk is stored locally on the node +// without triggering a network retrieval. Uses HEAD /chunks/{addr} which +// maps to storer.ChunkStore().Has on the bee side. +func (n *NodeService) LocalHasChunk(ctx context.Context, a swarm.Address) (bool, error) { + err := n.client.request(ctx, http.MethodHead, "/chunks/"+a.String(), nil, nil) + if IsHTTPStatusErrorCode(err, http.StatusNotFound) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil +} + // Health represents node's health type Health struct { Status string `json:"status"` diff --git a/pkg/bee/api/status.go b/pkg/bee/api/status.go index 32e98a16..44906c3d 100644 --- a/pkg/bee/api/status.go +++ b/pkg/bee/api/status.go @@ -7,6 +7,15 @@ import ( type StatusService service +// Canonical values for the BeeMode field of StatusResponse, mirroring the +// strings emitted by bee/v2/pkg/api.BeeNodeMode.String(). +const ( + BeeModeFull = "full" + BeeModeLight = "light" + BeeModeUltraLight = "ultra-light" + BeeModeUnknown = "unknown" +) + type StatusResponse struct { Overlay string `json:"overlay"` Proximity uint `json:"proximity"` @@ -22,6 +31,7 @@ type StatusResponse struct { IsReachable bool `json:"isReachable"` LastSyncedBlock uint64 `json:"lastSyncedBlock"` CommittedDepth uint8 `json:"committedDepth"` + IsWarmingUp bool `json:"isWarmingUp"` } // Ping pings given node diff --git a/pkg/bee/client.go b/pkg/bee/client.go index 711ffeb6..a1c0d93f 100644 --- a/pkg/bee/client.go +++ b/pkg/bee/client.go @@ -265,11 +265,19 @@ func (c *Client) DownloadActFile(ctx context.Context, a swarm.Address, opts *api return size, h.Sum(nil), nil } -// HasChunk returns true/false if node has a chunk +// HasChunk returns true/false if node has a chunk. +// +// NOTE: Backed by GET /chunks/{addr}, which falls back to network retrieval on +// a local miss. Use LocalHasChunk for a strict local-only check. func (c *Client) HasChunk(ctx context.Context, a swarm.Address) (bool, error) { return c.api.Node.HasChunk(ctx, a) } +// LocalHasChunk is a strict local-only check; see api.NodeService.LocalHasChunk. +func (c *Client) LocalHasChunk(ctx context.Context, a swarm.Address) (bool, error) { + return c.api.Node.LocalHasChunk(ctx, a) +} + func (c *Client) HasChunks(ctx context.Context, a []swarm.Address) (has []bool, count int, err error) { has = make([]bool, len(a)) for i, addr := range a { diff --git a/pkg/check/smoke/metrics.go b/pkg/check/smoke/metrics.go index 41c72b2f..e95dd6dc 100644 --- a/pkg/check/smoke/metrics.go +++ b/pkg/check/smoke/metrics.go @@ -6,27 +6,43 @@ import ( ) type metrics struct { - BatchCreateErrors prometheus.Counter - BatchCreateAttempts prometheus.Counter - UploadErrors *prometheus.CounterVec - UploadAttempts *prometheus.CounterVec - UploadSuccess *prometheus.CounterVec - DownloadErrors *prometheus.CounterVec - DownloadMismatch *prometheus.CounterVec - DownloadAttempts *prometheus.CounterVec - DownloadSuccess *prometheus.CounterVec - UploadDuration *prometheus.HistogramVec - DownloadDuration *prometheus.HistogramVec - UploadThroughput *prometheus.GaugeVec - DownloadThroughput *prometheus.GaugeVec - UploadedBytes *prometheus.CounterVec - DownloadedBytes *prometheus.CounterVec + BatchCreateErrors prometheus.Counter + BatchCreateAttempts prometheus.Counter + UploadErrors *prometheus.CounterVec + UploadAttempts *prometheus.CounterVec + UploadSuccess *prometheus.CounterVec + DownloadErrors *prometheus.CounterVec + DownloadEOFErrors *prometheus.CounterVec + DownloadMismatch *prometheus.CounterVec + DownloadAttempts *prometheus.CounterVec + DownloadSuccess *prometheus.CounterVec + UploadDuration *prometheus.HistogramVec + DownloadDuration *prometheus.HistogramVec + UploadThroughput *prometheus.GaugeVec + DownloadThroughput *prometheus.GaugeVec + UploadedBytes *prometheus.CounterVec + DownloadedBytes *prometheus.CounterVec + NodeHealthVerdict *prometheus.GaugeVec + ClusterFullNodeCount prometheus.Gauge + ClusterLightNodeCount prometheus.Gauge + UnhealthyAbortsPreUp prometheus.Counter + UnhealthyAbortsPreDown prometheus.Counter + // Chunk walk: per-chunk presence check across the full upload tree. + ChunksChecked prometheus.Counter + ChunksMissingTotal *prometheus.CounterVec // {position} + ChunksMissingOutOfAOR *prometheus.CounterVec // {position} — bug 1 fingerprint (out-of-depth storing) + ChunksMissingInAOR *prometheus.CounterVec // {position} — bug 2/3 fingerprint (in-depth but not stored) + ChunksPresentOutOfAOR *prometheus.CounterVec // {position} — bug 1 confirmed (chunk exists outside its AOR) + FilesWithLoss prometheus.Counter + EOFWithCleanWalk prometheus.Counter } const ( labelSizeBytes = "size_bytes" labelNodeName = "node_name" labelRedundancyLevel = "redundancy_level" + labelPhase = "phase" + labelPosition = "position" ) func newMetrics(subsystem string) metrics { @@ -166,6 +182,116 @@ func newMetrics(subsystem string) metrics { }, []string{labelNodeName, labelRedundancyLevel}, ), + DownloadEOFErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "download_eof_errors_count", + Help: "Download errors classified as unexpected EOF, which indicate the chunk is likely missing from the cluster.", + }, + []string{labelSizeBytes, labelNodeName, labelRedundancyLevel}, + ), + NodeHealthVerdict: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "node_health_verdict", + Help: "Topology health verdict for a node: 0=unknown, 1=unhealthy, 2=degraded, 3=healthy. Sampled per phase (pre_upload, pre_download, on_failure).", + }, + []string{labelNodeName, labelPhase}, + ), + ClusterFullNodeCount: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "cluster_full_node_count", + Help: "Number of full (non-bootnode) nodes in the cluster.", + }, + ), + ClusterLightNodeCount: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "cluster_light_node_count", + Help: "Number of light nodes in the cluster.", + }, + ), + UnhealthyAbortsPreUp: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "unhealthy_aborts_pre_upload", + Help: "Iterations aborted because the uploader was UNHEALTHY before upload.", + }, + ), + UnhealthyAbortsPreDown: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "unhealthy_aborts_pre_download", + Help: "Iterations skipped because the downloader was UNHEALTHY before download.", + }, + ), + ChunksChecked: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunks_checked_total", + Help: "Total chunks inspected by the on-failure chunk walk (denominator for chunks_missing_* rates).", + }, + ), + ChunksMissingTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunks_missing_total", + Help: "Chunks not found on their closest full node (HEAD /chunks/{addr} returned 404). Labelled by tree position.", + }, + []string{labelPosition}, + ), + ChunksMissingOutOfAOR: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunks_missing_out_of_aor_total", + Help: "Missing chunks whose closest storer in the cluster still has PO(chunk, storer) < storageRadius. Indicates a cluster-coverage gap — the address falls outside every node's AOR. Common in small testnets.", + }, + []string{labelPosition}, + ), + ChunksMissingInAOR: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunks_missing_in_aor_total", + Help: "Missing chunks whose closest storer covers the address (PO >= storageRadius) but does not have the chunk. Bee#5400 bug-2/3 fingerprint: shallow receipt short-circuit or false ChunkSynced.", + }, + []string{labelPosition}, + ), + ChunksPresentOutOfAOR: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "chunks_present_out_of_aor_total", + Help: "Chunks held by a node whose AOR does not cover them (PO < storageRadius). Direct bee#5400 bug-1 confirmation: out-of-depth storing.", + }, + []string{labelPosition}, + ), + FilesWithLoss: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "files_with_chunk_loss_total", + Help: "Files where the on-failure chunk walk found at least one missing chunk.", + }, + ), + EOFWithCleanWalk: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "eof_with_clean_walk_total", + Help: "Downloads that failed with unexpected EOF where the chunk walk found nothing missing and no out-of-AOR-present chunks. Strong signal that the EOF was NOT bee#5400 chunk loss but something else (transient retrieval, downloader networking, etc.).", + }, + ), } } diff --git a/pkg/check/smoke/smoke.go b/pkg/check/smoke/smoke.go index 179e7b33..3dda6e4f 100644 --- a/pkg/check/smoke/smoke.go +++ b/pkg/check/smoke/smoke.go @@ -6,21 +6,61 @@ import ( "crypto/rand" "errors" "fmt" + "io" "strconv" + "sync" "time" "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" "github.com/ethersphere/beekeeper/pkg/beekeeper" "github.com/ethersphere/beekeeper/pkg/logging" "github.com/ethersphere/beekeeper/pkg/orchestration" "github.com/ethersphere/beekeeper/pkg/random" "github.com/ethersphere/beekeeper/pkg/scheduler" "github.com/ethersphere/beekeeper/pkg/test" + "github.com/ethersphere/beekeeper/pkg/topohealth" "github.com/prometheus/client_golang/prometheus" ) +// testRunner is the subset of pkg/test used by the smoke iteration. Declared +// locally so we don't have to name the unexported return type of test.NewTest. +type testRunner interface { + Upload(ctx context.Context, c *bee.Client, data []byte, batchID string, rLevel *redundancy.Level) (swarm.Address, time.Duration, error) + Download(ctx context.Context, c *bee.Client, a swarm.Address, rLevel *redundancy.Level) ([]byte, time.Duration, error) +} + +// iterCtx groups the inputs that stay constant across all (contentSize, rLevel) +// pairs within a single outer iteration. Reduces runIteration's parameter count. +type iterCtx struct { + cluster orchestration.Cluster + uploader *bee.Client + downloader *bee.Client + batchID string + runner testRunner + opts Options + thresholds topohealth.Thresholds +} + +const ( + // onFailureStorerProbeCount is the number of intended storers to probe on a + // download failure (closest by XOR distance to root chunk address). + onFailureStorerProbeCount = 3 + // chunkWalkParallelism caps concurrent HEAD /chunks/{addr} requests during + // the on-failure chunk walk. + chunkWalkParallelism = 32 + // chunkWalkMaxReported truncates per-category result lists so a large file + // with many missing chunks does not flood the log. + chunkWalkMaxReported = 50 + // chunkWalkMaxBytes caps the file size for which the local split + chunk + // walk runs. Larger files skip the walk: at ~200 MB the walk takes tens + // of seconds on a healthy cluster and minutes on a slow one; beyond that + // it stops being a useful per-failure diagnostic. + chunkWalkMaxBytes int64 = 200 * 1024 * 1024 +) + // Options represents smoke test options type Options struct { ContentSize int64 @@ -108,6 +148,13 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option return fmt.Errorf("smoke check requires at least 2 full nodes, got %d", len(fullNodeClients)) } + shape := topohealth.ClusterShape(cluster) + c.metrics.ClusterFullNodeCount.Set(float64(shape.FullNodes)) + c.metrics.ClusterLightNodeCount.Set(float64(shape.LightNodes)) + c.logger.Infof("cluster shape: full=%d light=%d bootnodes=%d", shape.FullNodes, shape.LightNodes, shape.Bootnodes) + + thresholds := topohealth.DefaultThresholds() + test := test.NewTest(c.logger) for i := 0; true; i++ { @@ -142,166 +189,192 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option if len(rLevels) == 0 { rLevels = []*redundancy.Level{nil} } - rLevelIdx := 0 - for { - rLevel := rLevels[rLevelIdx] + iter := iterCtx{ + cluster: cluster, + uploader: uploader, + downloader: downloader, + batchID: batchID, + runner: test, + opts: o, + thresholds: thresholds, + } + for _, rLevel := range rLevels { for _, contentSize := range fileSizes { select { case <-ctx.Done(): return nil default: - if rLevel != nil { - c.logger.Infof("testing file size: %d bytes (%.2f KB), redundancy level: %d", contentSize, float64(contentSize)/1024, *rLevel) - } else { - c.logger.Infof("testing file size: %d bytes (%.2f KB), redundancy level: (not set)", contentSize, float64(contentSize)/1024) - } } + c.runIteration(ctx, iter, contentSize, rLevel) + } + } - sizeLabel := fmt.Sprintf("%d", contentSize) - rLevelLabel := redundancyLevelLabel(rLevel) - - var ( - txDuration time.Duration - rxDuration time.Duration - txData []byte - rxData []byte - address swarm.Address - uploaded bool - downloaded bool - ) - - txData = make([]byte, contentSize) - if _, err := rand.Read(txData); err != nil { - c.logger.Errorf("unable to create random content for size %d: %v", contentSize, err) - continue - } + time.Sleep(o.IterationWait) + } - var ( - txCtx context.Context - txCancel context.CancelFunc = func() {} - ) - - for range 3 { - txCancel() - - uploaded = false - - select { - case <-ctx.Done(): - return nil - case <-time.After(o.TxOnErrWait): - } - - txCtx, txCancel = context.WithTimeout(ctx, o.UploadTimeout) - - c.metrics.UploadAttempts.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Inc() - address, txDuration, err = test.Upload(txCtx, uploader, txData, batchID, rLevel) - if err != nil { - c.metrics.UploadErrors.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Inc() - c.logger.Errorf("upload failed for size %d: %v", contentSize, err) - c.logger.Infof("retrying in: %v", o.TxOnErrWait) - } else { - uploaded = true - break - } - } - txCancel() - if !uploaded { - c.logger.Infof("skipping download for size %d due to upload failure", contentSize) - continue - } + return nil +} - c.metrics.UploadDuration.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Observe(txDuration.Seconds()) - c.metrics.UploadSuccess.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Inc() - c.metrics.UploadedBytes.WithLabelValues(uploader.Name(), rLevelLabel).Add(float64(contentSize)) +// runIteration runs a single upload→sync→download cycle for one (contentSize, +// rLevel) pair. On download failure it triggers the full on-failure dump, +// which includes the chunk walk if a local split was available. +func (c *Check) runIteration(ctx context.Context, iter iterCtx, contentSize int64, rLevel *redundancy.Level) { + if rLevel != nil { + c.logger.Infof("testing file size: %d bytes (%.2f KB), redundancy level: %d", contentSize, float64(contentSize)/1024, *rLevel) + } else { + c.logger.Infof("testing file size: %d bytes (%.2f KB), redundancy level: (not set)", contentSize, float64(contentSize)/1024) + } - if txDuration.Seconds() > 0 { - uploadThroughput := float64(contentSize) / txDuration.Seconds() - c.metrics.UploadThroughput.WithLabelValues(sizeLabel, uploader.Name(), rLevelLabel).Set(uploadThroughput) - } + sizeLabel := fmt.Sprintf("%d", contentSize) + rLevelLabel := redundancyLevelLabel(rLevel) - time.Sleep(o.NodesSyncWait) - - var ( - rxCtx context.Context - rxCancel context.CancelFunc = func() {} - ) - - for range 3 { - rxCancel() - - select { - case <-ctx.Done(): - return nil - case <-time.After(o.RxOnErrWait): - } - - c.metrics.DownloadAttempts.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() - - rxCtx, rxCancel = context.WithTimeout(ctx, o.DownloadTimeout) - rxData, rxDuration, err = test.Download(rxCtx, downloader, address, rLevel) - if err != nil { - c.metrics.DownloadErrors.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() - c.logger.Errorf("download failed for size %d: %v", contentSize, err) - c.logger.Infof("retrying in: %v", o.RxOnErrWait) - continue - } - - if bytes.Equal(rxData, txData) { - c.metrics.DownloadDuration.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Observe(rxDuration.Seconds()) - c.metrics.DownloadSuccess.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() - c.metrics.DownloadedBytes.WithLabelValues(downloader.Name(), rLevelLabel).Add(float64(contentSize)) - - if rxDuration.Seconds() > 0 { - downloadThroughput := float64(contentSize) / rxDuration.Seconds() - c.metrics.DownloadThroughput.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Set(downloadThroughput) - } - downloaded = true - break - } - - c.logger.Infof("data mismatch for size %d: uploaded and downloaded data differ", contentSize) - c.metrics.DownloadMismatch.WithLabelValues(sizeLabel, downloader.Name(), rLevelLabel).Inc() - - rxLen, txLen := len(rxData), len(txData) - if rxLen != txLen { - c.logger.Errorf("length mismatch for size %d: downloaded %d bytes, uploaded %d bytes", contentSize, rxLen, txLen) - continue - } - - var diff int - for i := range txData { - if txData[i] != rxData[i] { - diff++ - } - } - c.logger.Infof("data mismatch for size %d: found %d different bytes, ~%.2f%%", contentSize, diff, float64(diff)/float64(txLen)*100) - } - rxCancel() - - if !downloaded { - c.logger.Errorf("all download attempts failed for size %d, fetching downloader topology", contentSize) - top, topErr := downloader.Topology(ctx) - if topErr != nil { - c.logger.Errorf("failed to get downloader topology: %v", topErr) - } else { - c.logger.Infof("downloader %s topology: depth=%d, connected=%d, population=%d, reachability=%s, bins=%s", - downloader.Name(), top.Depth, top.Connected, top.Population, top.Reachability, top.Bins.String()) - } - } + txData := make([]byte, contentSize) + if _, err := rand.Read(txData); err != nil { + c.logger.Errorf("unable to create random content for size %d: %v", contentSize, err) + return + } + + // Pre-compute the chunk address tree locally so we can pin-point a missing + // chunk if download later fails. Deterministic for the same (data, rLevel) + // input — matches what bee would produce. Skipped for files above + // chunkWalkMaxBytes since the walk grows linearly with chunk count and + // stops being a useful per-failure tool. + var allChunks []topohealth.ChunkInfo + if contentSize <= chunkWalkMaxBytes { + splitRoot, chunks, splitErr := topohealth.SplitChunkAddresses(ctx, txData, rLevel) + if splitErr != nil { + c.logger.Errorf("local chunk split failed for size %d: %v", contentSize, splitErr) + } else { + allChunks = chunks + c.logger.Infof("local split produced %d chunks (root=%s)", len(allChunks), splitRoot) + } + } else { + c.logger.Infof("file size %d > %d (chunkWalkMaxBytes); skipping local split and on-failure chunk walk", contentSize, chunkWalkMaxBytes) + } + + if c.probe(ctx, topohealth.PhasePreUpload, iter.uploader, iter.thresholds) == topohealth.StatusUnhealthy { + c.metrics.UnhealthyAbortsPreUp.Inc() + c.logger.Errorf("aborting iteration: uploader %s is UNHEALTHY pre-upload", iter.uploader.Name()) + return + } + + address, txDuration, ok := c.uploadWithRetry(ctx, iter, sizeLabel, rLevelLabel, txData, rLevel) + if !ok { + c.logger.Infof("skipping download for size %d due to upload failure", contentSize) + return + } + + c.metrics.UploadDuration.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Observe(txDuration.Seconds()) + c.metrics.UploadSuccess.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Inc() + c.metrics.UploadedBytes.WithLabelValues(iter.uploader.Name(), rLevelLabel).Add(float64(contentSize)) + if txDuration.Seconds() > 0 { + c.metrics.UploadThroughput.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Set(float64(contentSize) / txDuration.Seconds()) + } + + time.Sleep(iter.opts.NodesSyncWait) + + if c.probe(ctx, topohealth.PhasePreDownload, iter.downloader, iter.thresholds) == topohealth.StatusUnhealthy { + c.metrics.UnhealthyAbortsPreDown.Inc() + c.logger.Warningf("downloader %s is UNHEALTHY pre-download; attempting anyway", iter.downloader.Name()) + } + + downloaded, eofSeen := c.downloadWithRetry(ctx, iter, sizeLabel, rLevelLabel, txData, address, rLevel) + + if !downloaded { + c.logger.Errorf("all download attempts failed for size %d, dumping topology health and walking chunk tree", contentSize) + c.onFailureDump(ctx, iter.cluster, iter.uploader, iter.downloader, address, allChunks, eofSeen, iter.thresholds) + } + + c.logger.Infof("completed testing file size: %d bytes", contentSize) +} + +// uploadWithRetry attempts the upload up to 3 times, sleeping TxOnErrWait +// between attempts (not before the first). Returns the address, duration, and +// whether any attempt succeeded. +func (c *Check) uploadWithRetry(ctx context.Context, iter iterCtx, sizeLabel, rLevelLabel string, txData []byte, rLevel *redundancy.Level) (swarm.Address, time.Duration, bool) { + var ( + address swarm.Address + txDuration time.Duration + ) + for attempt := range 3 { + if attempt > 0 { + select { + case <-ctx.Done(): + return swarm.ZeroAddress, 0, false + case <-time.After(iter.opts.TxOnErrWait): + } + } + txCtx, cancel := context.WithTimeout(ctx, iter.opts.UploadTimeout) + c.metrics.UploadAttempts.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Inc() + addr, dur, err := iter.runner.Upload(txCtx, iter.uploader, txData, iter.batchID, rLevel) + cancel() + if err != nil { + c.metrics.UploadErrors.WithLabelValues(sizeLabel, iter.uploader.Name(), rLevelLabel).Inc() + c.logger.Errorf("upload failed for size %d: %v", len(txData), err) + c.logger.Infof("retrying in: %v", iter.opts.TxOnErrWait) + continue + } + address, txDuration = addr, dur + return address, txDuration, true + } + return swarm.ZeroAddress, 0, false +} - c.logger.Infof("completed testing file size: %d bytes", contentSize) +// downloadWithRetry attempts the download up to 3 times. Records per-attempt +// metrics, classifies EOF errors, and tracks whether any attempt hit +// io.ErrUnexpectedEOF (used to attribute on-failure walks). On data mismatch +// it logs the divergence and counts it but does not retry past the loop. +func (c *Check) downloadWithRetry(ctx context.Context, iter iterCtx, sizeLabel, rLevelLabel string, txData []byte, address swarm.Address, rLevel *redundancy.Level) (downloaded, eofSeen bool) { + for attempt := range 3 { + if attempt > 0 { + select { + case <-ctx.Done(): + return downloaded, eofSeen + case <-time.After(iter.opts.RxOnErrWait): } - rLevelIdx++ - if rLevelIdx >= len(rLevels) { - break + } + + c.metrics.DownloadAttempts.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + + rxCtx, cancel := context.WithTimeout(ctx, iter.opts.DownloadTimeout) + rxData, rxDuration, err := iter.runner.Download(rxCtx, iter.downloader, address, rLevel) + cancel() + if err != nil { + c.metrics.DownloadErrors.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + if errors.Is(err, io.ErrUnexpectedEOF) { + c.metrics.DownloadEOFErrors.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + eofSeen = true } + c.logger.Errorf("download failed for size %d: %v", len(txData), err) + c.logger.Infof("retrying in: %v", iter.opts.RxOnErrWait) + continue } - time.Sleep(o.IterationWait) - } + if bytes.Equal(rxData, txData) { + c.metrics.DownloadDuration.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Observe(rxDuration.Seconds()) + c.metrics.DownloadSuccess.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + c.metrics.DownloadedBytes.WithLabelValues(iter.downloader.Name(), rLevelLabel).Add(float64(len(txData))) + if rxDuration.Seconds() > 0 { + c.metrics.DownloadThroughput.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Set(float64(len(txData)) / rxDuration.Seconds()) + } + return true, eofSeen + } - return nil + c.logger.Infof("data mismatch for size %d: uploaded and downloaded data differ", len(txData)) + c.metrics.DownloadMismatch.WithLabelValues(sizeLabel, iter.downloader.Name(), rLevelLabel).Inc() + if len(rxData) != len(txData) { + c.logger.Errorf("length mismatch for size %d: downloaded %d bytes, uploaded %d bytes", len(txData), len(rxData), len(txData)) + continue + } + var diff int + for i := range txData { + if txData[i] != rxData[i] { + diff++ + } + } + c.logger.Infof("data mismatch for size %d: found %d different bytes, ~%.2f%%", len(txData), diff, float64(diff)/float64(len(txData))*100) + } + return downloaded, eofSeen } func (c *Check) Report() []prometheus.Collector { @@ -314,3 +387,122 @@ func redundancyLevelLabel(rLevel *redundancy.Level) string { } return strconv.Itoa(int(*rLevel)) } + +// probe runs the per-node topology health probe, emits structured log + metric, +// and returns the resulting Status. A probe error yields StatusUnknown so +// callers can distinguish a transient API failure from a genuinely unhealthy +// node when deciding whether to abort. +func (c *Check) probe(ctx context.Context, phase topohealth.Phase, client *bee.Client, thresholds topohealth.Thresholds) topohealth.Status { + v, err := topohealth.Probe(ctx, client, thresholds) + if err != nil { + c.logger.Errorf("probe %s on %s failed: %v", phase, client.Name(), err) + c.metrics.NodeHealthVerdict.WithLabelValues(client.Name(), string(phase)).Set(float64(topohealth.StatusUnknown)) + return topohealth.StatusUnknown + } + c.metrics.NodeHealthVerdict.WithLabelValues(client.Name(), string(phase)).Set(float64(v.Status)) + topohealth.LogVerdict(c.logger, phase, v) + return v.Status +} + +// onFailureDump fans out four independent diagnostics concurrently: uploader +// probe, downloader probe, the 3 intended-storer probes (with HEAD on the +// root), and the full chunk walk that classifies every missing chunk as +// out-of-AOR (bee#5400 bug 1), in-AOR-not-stored (bug 2/3), or +// cluster-coverage-gap. +func (c *Check) onFailureDump(ctx context.Context, cluster orchestration.Cluster, uploader, downloader *bee.Client, root swarm.Address, allChunks []topohealth.ChunkInfo, eofSeen bool, thresholds topohealth.Thresholds) { + var ( + wg sync.WaitGroup + storers []topohealth.StorerResult + storErr error + ) + wg.Add(4) + go func() { + defer wg.Done() + c.probe(ctx, topohealth.PhaseOnFailure, uploader, thresholds) + }() + go func() { + defer wg.Done() + c.probe(ctx, topohealth.PhaseOnFailure, downloader, thresholds) + }() + go func() { + defer wg.Done() + storers, storErr = topohealth.IntendedStorers(ctx, cluster, root, onFailureStorerProbeCount, thresholds) + }() + go func() { + defer wg.Done() + if len(allChunks) == 0 { + // allChunks is empty either because the file size exceeded + // chunkWalkMaxBytes or because the local split errored + return + } + c.walkChunksOnFailure(ctx, cluster, root, allChunks, eofSeen) + }() + wg.Wait() + + if storErr != nil { + c.logger.Errorf("on_failure intended storers probe failed: %v", storErr) + return + } + for i, r := range storers { + topohealth.LogStorerResult(c.logger, root.String(), string(topohealth.PhaseOnFailure), i, r) + } +} + +// walkChunksOnFailure does a HEAD /chunks/{addr} per chunk against its +// closest full node, records the per-bug counters, and logs every missing or +// out-of-AOR-present chunk. If the failure included an EOF and the walk +// finds nothing wrong, eof_with_clean_walk_total is incremented — that's the +// "EOF was NOT bee#5400" signal. +func (c *Check) walkChunksOnFailure(ctx context.Context, cluster orchestration.Cluster, root swarm.Address, chunks []topohealth.ChunkInfo, eofSeen bool) { + storers, err := topohealth.GatherStorers(ctx, cluster) + if err != nil { + c.logger.Errorf("on_failure gather storers failed: %v", err) + return + } + res, err := topohealth.WalkChunks(ctx, storers, chunks, chunkWalkParallelism, chunkWalkMaxReported) + if err != nil { + // ctx-cancellation is reported here but we still emit partial counters. + c.logger.Warningf("on_failure chunk walk did not complete: %v", err) + } + + c.metrics.ChunksChecked.Add(float64(res.Checked)) + for pos, n := range res.MissingTotal { + c.metrics.ChunksMissingTotal.WithLabelValues(string(pos)).Add(float64(n)) + } + for pos, n := range res.MissingOutOfAOR { + c.metrics.ChunksMissingOutOfAOR.WithLabelValues(string(pos)).Add(float64(n)) + } + for pos, n := range res.MissingInAOR { + c.metrics.ChunksMissingInAOR.WithLabelValues(string(pos)).Add(float64(n)) + } + for pos, n := range res.PresentOutOfAOR { + c.metrics.ChunksPresentOutOfAOR.WithLabelValues(string(pos)).Add(float64(n)) + } + + for _, m := range res.Missing { + topohealth.LogChunkCheck(c.logger, "missing", root.String(), m) + } + for _, p := range res.OutOfAORHits { + topohealth.LogChunkCheck(c.logger, "present_out_of_aor", root.String(), p) + } + + totalMissing := sumPerPosition(res.MissingTotal) + totalPresentOOA := sumPerPosition(res.PresentOutOfAOR) + if totalMissing > 0 { + c.metrics.FilesWithLoss.Inc() + } + if eofSeen && totalMissing == 0 && totalPresentOOA == 0 { + c.metrics.EOFWithCleanWalk.Inc() + } + c.logger.Infof("on_failure walk: root=%s checked=%d probe_errors=%d missing=%d (out_of_aor=%d, in_aor=%d) present_out_of_aor=%d", + root, res.Checked, res.ProbeErrors, totalMissing, + sumPerPosition(res.MissingOutOfAOR), sumPerPosition(res.MissingInAOR), totalPresentOOA) +} + +func sumPerPosition(m topohealth.PerPositionCounts) int { + n := 0 + for _, v := range m { + n += v + } + return n +} diff --git a/pkg/orchestration/cluster.go b/pkg/orchestration/cluster.go index 446c6486..98e10a09 100644 --- a/pkg/orchestration/cluster.go +++ b/pkg/orchestration/cluster.go @@ -38,6 +38,7 @@ type Cluster interface { Size() (size int) Topologies(ctx context.Context) (topologies ClusterTopologies, err error) ClosestFullNodeClient(ctx context.Context, s *bee.Client) (*bee.Client, error) + FullNodeClientsByDistance(ctx context.Context, chunkAddr swarm.Address) (ClientList, error) } // ClusterOptions represents Bee cluster options diff --git a/pkg/orchestration/k8s/cluster.go b/pkg/orchestration/k8s/cluster.go index 1fbffc9c..22cccf6c 100644 --- a/pkg/orchestration/k8s/cluster.go +++ b/pkg/orchestration/k8s/cluster.go @@ -1,6 +1,7 @@ package k8s import ( + "bytes" "context" "crypto/tls" "fmt" @@ -17,6 +18,7 @@ import ( "github.com/ethersphere/beekeeper/pkg/orchestration" "github.com/ethersphere/beekeeper/pkg/orchestration/notset" "github.com/ethersphere/beekeeper/pkg/swap" + "golang.org/x/sync/errgroup" ) // compile check whether client implements interface @@ -440,6 +442,57 @@ func (c *Cluster) FlattenTopologies(ctx context.Context) (topologies map[string] return topologies, err } +// FullNodeClientsByDistance returns full node clients sorted by ascending XOR +// distance from the given chunk address. Bootnodes and light nodes are excluded. +// This is the basis for identifying the intended storers of a chunk. +func (c *Cluster) FullNodeClientsByDistance(ctx context.Context, chunkAddr swarm.Address) (orchestration.ClientList, error) { + type entry struct { + client *bee.Client + // dist is the precomputed XOR distance to chunkAddr in big-endian + // bytes. Storing it once lets the sort comparator be pure (no API + // calls, no error side-channel) and avoids O(n log n) length checks. + dist []byte + } + + var clients []*bee.Client + for _, n := range c.Nodes() { + cfg := n.Config() + if !cfg.FullNode || cfg.BootnodeMode { + continue + } + clients = append(clients, n.Client()) + } + + entries := make([]entry, len(clients)) + g, gctx := errgroup.WithContext(ctx) + for i, cl := range clients { + g.Go(func() error { + addrs, err := cl.Addresses(gctx) + if err != nil { + return fmt.Errorf("get overlay for %s: %w", cl.Name(), err) + } + dist, err := swarm.DistanceRaw(chunkAddr, addrs.Overlay) + if err != nil { + return fmt.Errorf("distance for %s: %w", cl.Name(), err) + } + entries[i] = entry{client: cl, dist: dist} + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + + slices.SortFunc(entries, func(a, b entry) int { + return bytes.Compare(a.dist, b.dist) + }) + out := make(orchestration.ClientList, 0, len(entries)) + for _, e := range entries { + out = append(out, e.client) + } + return out, nil +} + // ClosestFullNodeClient returns the closest full node client to the supplied client. func (c *Cluster) ClosestFullNodeClient(ctx context.Context, s *bee.Client) (*bee.Client, error) { addrToNode := make(map[string]orchestration.Node) diff --git a/pkg/topohealth/chunkwalk.go b/pkg/topohealth/chunkwalk.go new file mode 100644 index 00000000..15beaf6d --- /dev/null +++ b/pkg/topohealth/chunkwalk.go @@ -0,0 +1,272 @@ +package topohealth + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + + "github.com/ethersphere/bee/v2/pkg/file/pipeline/builder" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/orchestration" + "golang.org/x/sync/errgroup" +) + +// ChunkPosition is where a chunk sits in the file's Merkle-like tree. +type ChunkPosition string + +const ( + ChunkPositionRoot ChunkPosition = "root" + ChunkPositionIntermediate ChunkPosition = "intermediate" + ChunkPositionLeaf ChunkPosition = "leaf" +) + +// ChunkInfo is one chunk's identity plus its position in the upload tree. +type ChunkInfo struct { + Address swarm.Address `json:"address"` + Position ChunkPosition `json:"position"` +} + +// SplitChunkAddresses runs the chunk-splitting pipeline locally over the same +// bytes that were (or will be) uploaded, returning the root chunk address and +// the full list of chunks bee would produce. It is deterministic for the same +// (data, redundancy level) pair, so the addresses match what bee stores. +func SplitChunkAddresses(ctx context.Context, data []byte, rLevel *redundancy.Level) (swarm.Address, []ChunkInfo, error) { + level := redundancy.NONE + if rLevel != nil { + level = *rLevel + } + // addressOnlyCollector classifies each chunk as it is produced and discards + // the chunk data immediately, so the splitter does not retain a copy of the + // payload in memory for the lifetime of the walk. + col := &addressOnlyCollector{} + pipe := builder.NewPipelineBuilder(ctx, col, false, level) + root, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data)) + if err != nil { + return swarm.ZeroAddress, nil, fmt.Errorf("split pipeline: %w", err) + } + // Promote whichever chunk matches the final root. + for i := range col.chunks { + if col.chunks[i].Address.Equal(root) { + col.chunks[i].Position = ChunkPositionRoot + } + } + return root, col.chunks, nil +} + +// addressOnlyCollector classifies a chunk by its span at Put time, stores just +// the address + position, and does not keep the chunk data. The pipeline is +// single-writer so no synchronisation is needed. +type addressOnlyCollector struct { + chunks []ChunkInfo +} + +func (c *addressOnlyCollector) Put(_ context.Context, ch swarm.Chunk) error { + c.chunks = append(c.chunks, ChunkInfo{ + Address: ch.Address(), + Position: classifyBySpan(ch.Data()), + }) + return nil +} + +// classifyBySpan derives a position from the span prefix only. The final root +// promotion happens after FeedPipeline returns (we cannot know which chunk +// will be the root until the pipeline closes). +func classifyBySpan(data []byte) ChunkPosition { + if len(data) < swarm.SpanSize { + return ChunkPositionLeaf + } + span := binary.LittleEndian.Uint64(data[:swarm.SpanSize]) + if span > swarm.ChunkSize { + return ChunkPositionIntermediate + } + return ChunkPositionLeaf +} + +// StorerInfo is one full node's identity plus its current storage radius, +// gathered once at the start of a walk so per-chunk classification is local. +type StorerInfo struct { + Client *bee.Client + Name string + Overlay swarm.Address + overlayBytes []byte // cached for inner-loop comparison + StorageRadius uint8 +} + +// GatherStorers collects overlay + storage radius for every full node in the +// cluster in parallel. Bootnodes and light nodes are excluded. +func GatherStorers(ctx context.Context, cluster orchestration.Cluster) ([]StorerInfo, error) { + var clients []*bee.Client + for _, n := range cluster.Nodes() { + cfg := n.Config() + if !cfg.FullNode || cfg.BootnodeMode { + continue + } + clients = append(clients, n.Client()) + } + out := make([]StorerInfo, len(clients)) + g, gctx := errgroup.WithContext(ctx) + for i, cl := range clients { + g.Go(func() error { + addrs, err := cl.Addresses(gctx) + if err != nil { + return fmt.Errorf("addresses for %s: %w", cl.Name(), err) + } + st, err := cl.Status(gctx) + if err != nil { + return fmt.Errorf("status for %s: %w", cl.Name(), err) + } + out[i] = StorerInfo{ + Client: cl, + Name: cl.Name(), + Overlay: addrs.Overlay, + overlayBytes: addrs.Overlay.Bytes(), + StorageRadius: st.StorageRadius, + } + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + return out, nil +} + +func closestStorer(chunkAddr swarm.Address, storers []StorerInfo) (StorerInfo, error) { + best := storers[0] + for _, s := range storers[1:] { + cmp, err := swarm.DistanceCmp(chunkAddr, s.Overlay, best.Overlay) + if err != nil { + return StorerInfo{}, fmt.Errorf("distance cmp: %w", err) + } + if cmp > 0 { + best = s + } + } + return best, nil +} + +// ChunkCheck is one chunk's presence result on its intended (closest) storer. +type ChunkCheck struct { + Address swarm.Address `json:"address"` + Position ChunkPosition `json:"position"` + StorerName string `json:"storer"` + StorerOverlay swarm.Address `json:"storerOverlay"` + Proximity uint8 `json:"proximity"` + StorageRadius uint8 `json:"storageRadius"` + // OutOfAOR: PO(chunk, storer) < storer.StorageRadius. When the storer is + // the closest in the cluster and the chunk is still out of its AOR, no + // node covers this address (cluster-coverage gap). When Present is also + // true, that is a direct bee#5400 bug-1 fingerprint: a node is holding a + // chunk outside its own AOR. + OutOfAOR bool `json:"outOfAOR"` + Present bool `json:"present"` + Error string `json:"error,omitempty"` +} + +// PerPositionCounts is per-tree-position aggregation of a walk. +type PerPositionCounts map[ChunkPosition]int + +// WalkResult summarises a chunk walk. Counters are exact; slices are +// truncated for logging. +type WalkResult struct { + Checked int `json:"checked"` + Missing []ChunkCheck `json:"missing,omitempty"` + OutOfAORHits []ChunkCheck `json:"outOfAORHits,omitempty"` + MissingTotal PerPositionCounts `json:"missingTotal"` + MissingOutOfAOR PerPositionCounts `json:"missingOutOfAOR"` + MissingInAOR PerPositionCounts `json:"missingInAOR"` + PresentOutOfAOR PerPositionCounts `json:"presentOutOfAOR"` + ProbeErrors int `json:"probeErrors"` +} + +// WalkChunks HEADs each chunk address on its closest full node and produces +// per-chunk presence + AOR results. Concurrency is bounded by parallelism; +// the returned WalkResult holds exact per-position counters plus +// log-friendly truncated slices (maxReported per category). +func WalkChunks(ctx context.Context, storers []StorerInfo, chunks []ChunkInfo, parallelism, maxReported int) (WalkResult, error) { + if len(storers) == 0 { + return WalkResult{}, errors.New("no storers") + } + if parallelism <= 0 { + parallelism = 32 + } + + checks := make([]ChunkCheck, len(chunks)) + sem := make(chan struct{}, parallelism) + var wg sync.WaitGroup +dispatch: + for i, c := range chunks { + select { + case <-ctx.Done(): + break dispatch + case sem <- struct{}{}: + } + wg.Add(1) + go func(idx int, ci ChunkInfo) { + defer wg.Done() + defer func() { <-sem }() + storer, err := closestStorer(ci.Address, storers) + if err != nil { + checks[idx] = ChunkCheck{Address: ci.Address, Position: ci.Position, Error: err.Error()} + return + } + po := swarm.Proximity(ci.Address.Bytes(), storer.overlayBytes) + check := ChunkCheck{ + Address: ci.Address, + Position: ci.Position, + StorerName: storer.Name, + StorerOverlay: storer.Overlay, + Proximity: po, + StorageRadius: storer.StorageRadius, + OutOfAOR: po < storer.StorageRadius, + } + has, herr := storer.Client.LocalHasChunk(ctx, ci.Address) + if herr != nil { + check.Error = herr.Error() + } else { + check.Present = has + } + checks[idx] = check + }(i, c) + } + wg.Wait() + + res := WalkResult{ + MissingTotal: PerPositionCounts{}, + MissingOutOfAOR: PerPositionCounts{}, + MissingInAOR: PerPositionCounts{}, + PresentOutOfAOR: PerPositionCounts{}, + } + for _, c := range checks { + // Skip chunks we never managed to probe — counting them as missing + // would inflate the loss metrics with transient API errors. + if c.Error != "" { + res.ProbeErrors++ + continue + } + res.Checked++ + if !c.Present { + res.MissingTotal[c.Position]++ + if c.OutOfAOR { + res.MissingOutOfAOR[c.Position]++ + } else { + res.MissingInAOR[c.Position]++ + } + if len(res.Missing) < maxReported { + res.Missing = append(res.Missing, c) + } + } + if c.Present && c.OutOfAOR { + res.PresentOutOfAOR[c.Position]++ + if len(res.OutOfAORHits) < maxReported { + res.OutOfAORHits = append(res.OutOfAORHits, c) + } + } + } + return res, ctx.Err() +} diff --git a/pkg/topohealth/chunkwalk_test.go b/pkg/topohealth/chunkwalk_test.go new file mode 100644 index 00000000..71dcd5a4 --- /dev/null +++ b/pkg/topohealth/chunkwalk_test.go @@ -0,0 +1,83 @@ +package topohealth + +import ( + "context" + "crypto/rand" + "testing" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func TestSplitChunkAddresses_SmallFileSingleChunk(t *testing.T) { + data := make([]byte, 1024) // < swarm.ChunkSize → fits in a single leaf chunk + if _, err := rand.Read(data); err != nil { + t.Fatal(err) + } + root, chunks, err := SplitChunkAddresses(context.Background(), data, nil) + if err != nil { + t.Fatalf("split: %v", err) + } + if root.Equal(swarm.ZeroAddress) { + t.Fatal("root address is zero") + } + if len(chunks) != 1 { + t.Fatalf("expected 1 chunk, got %d", len(chunks)) + } + if chunks[0].Position != ChunkPositionRoot { + t.Errorf("expected root position, got %s", chunks[0].Position) + } + if !chunks[0].Address.Equal(root) { + t.Errorf("single chunk address %s != root %s", chunks[0].Address, root) + } +} + +func TestSplitChunkAddresses_MultiChunkTreeHasIntermediate(t *testing.T) { + // Enough data to force at least one intermediate level: roughly + // (BranchingFactor + 1) leaves = 129 chunks * 4096 = ~528KB. + const size = 600 * 1024 + data := make([]byte, size) + if _, err := rand.Read(data); err != nil { + t.Fatal(err) + } + root, chunks, err := SplitChunkAddresses(context.Background(), data, nil) + if err != nil { + t.Fatalf("split: %v", err) + } + if len(chunks) < 130 { + t.Fatalf("expected >130 chunks for %d bytes, got %d", size, len(chunks)) + } + var roots, intermediates, leaves int + for _, c := range chunks { + switch c.Position { + case ChunkPositionRoot: + roots++ + case ChunkPositionIntermediate: + intermediates++ + case ChunkPositionLeaf: + leaves++ + } + } + if roots != 1 { + t.Errorf("expected 1 root, got %d", roots) + } + if intermediates < 1 { + t.Errorf("expected at least 1 intermediate chunk for %d bytes of data, got %d", size, intermediates) + } + if leaves < 130 { + t.Errorf("expected at least 130 leaves, got %d", leaves) + } + // All chunks must be addressable. + seen := make(map[string]bool, len(chunks)) + for _, c := range chunks { + if c.Address.Equal(swarm.ZeroAddress) { + t.Errorf("chunk has zero address") + } + if seen[c.Address.ByteString()] { + t.Errorf("duplicate chunk address %s", c.Address) + } + seen[c.Address.ByteString()] = true + } + if !seen[root.ByteString()] { + t.Errorf("root %s not in collected chunks", root) + } +} diff --git a/pkg/topohealth/cluster.go b/pkg/topohealth/cluster.go new file mode 100644 index 00000000..eb5cf85e --- /dev/null +++ b/pkg/topohealth/cluster.go @@ -0,0 +1,83 @@ +package topohealth + +import ( + "context" + "fmt" + "sync" + + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/orchestration" +) + +type ClusterCounts struct { + FullNodes int `json:"fullNodes"` + LightNodes int `json:"lightNodes"` + Bootnodes int `json:"bootnodes"` +} + +// ClusterShape inspects node configs only; no API calls. +func ClusterShape(c orchestration.Cluster) ClusterCounts { + var cc ClusterCounts + for _, n := range c.Nodes() { + cfg := n.Config() + switch { + case cfg.BootnodeMode: + cc.Bootnodes++ + case cfg.FullNode: + cc.FullNodes++ + default: + cc.LightNodes++ + } + } + return cc +} + +type StorerResult struct { + Verdict Verdict `json:"verdict"` + HasChunk bool `json:"hasChunk"` + // HasError is set if HEAD /chunks/{addr} or the topology probe failed. + // A probe error leaves Verdict.Status == StatusUnknown (not Unhealthy); + // callers can distinguish "probe failed" from "node is genuinely unhealthy". + HasError string `json:"hasError,omitempty"` +} + +// IntendedStorers returns probe results for the top-n full nodes closest to +// chunkAddr, including a local HEAD /chunks/{addr} check on each. Probes run +// concurrently. +func IntendedStorers(ctx context.Context, c orchestration.Cluster, chunkAddr swarm.Address, n int, t Thresholds) ([]StorerResult, error) { + clients, err := c.FullNodeClientsByDistance(ctx, chunkAddr) + if err != nil { + return nil, fmt.Errorf("rank full nodes by distance: %w", err) + } + if n > len(clients) { + n = len(clients) + } + results := make([]StorerResult, n) + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func(idx int, cl *bee.Client) { + defer wg.Done() + results[idx] = probeStorer(ctx, cl, chunkAddr, t) + }(i, clients[i]) + } + wg.Wait() + return results, nil +} + +func probeStorer(ctx context.Context, cl *bee.Client, chunkAddr swarm.Address, t Thresholds) StorerResult { + v, err := Probe(ctx, cl, t) + if err != nil { + return StorerResult{ + Verdict: Verdict{Node: cl.Name(), Status: StatusUnknown}, + HasError: "probe: " + err.Error(), + } + } + has, herr := cl.LocalHasChunk(ctx, chunkAddr) + r := StorerResult{Verdict: v, HasChunk: has} + if herr != nil { + r.HasError = "has_chunk: " + herr.Error() + } + return r +} diff --git a/pkg/topohealth/health.go b/pkg/topohealth/health.go new file mode 100644 index 00000000..8aad94df --- /dev/null +++ b/pkg/topohealth/health.go @@ -0,0 +1,344 @@ +// Package topohealth implements a topology health probe over a Bee cluster. +// It calls /status and /topology on individual nodes, computes a set of +// signals, and produces a Verdict (healthy/degraded/unhealthy) used by the +// smoke tests to diagnose retrieval failures. +package topohealth + +import ( + "context" + "encoding/json" + "fmt" + "sort" + "strconv" + "strings" + "sync" + + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/bee/api" +) + +// Status is the rolled-up verdict for a node. The zero value is StatusUnknown +// so a default-initialized Verdict is never silently read as healthy/unhealthy. +type Status int + +const ( + StatusUnknown Status = iota + StatusUnhealthy + StatusDegraded + StatusHealthy +) + +func (s Status) String() string { + switch s { + case StatusHealthy: + return "HEALTHY" + case StatusDegraded: + return "DEGRADED" + case StatusUnhealthy: + return "UNHEALTHY" + default: + return "UNKNOWN" + } +} + +func (s Status) MarshalJSON() ([]byte, error) { + return fmt.Appendf(nil, `%q`, s.String()), nil +} + +func (s *Status) UnmarshalJSON(b []byte) error { + var v string + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch strings.ToUpper(v) { + case "HEALTHY": + *s = StatusHealthy + case "DEGRADED": + *s = StatusDegraded + case "UNHEALTHY": + *s = StatusUnhealthy + default: + *s = StatusUnknown + } + return nil +} + +type Signals struct { + DepthOK bool `json:"depthOK"` + BelowDepthFilled bool `json:"belowDepthFilled"` + BelowDepthSaturation float64 `json:"belowDepthSaturation"` + NeighborhoodSizeOK bool `json:"neighborhoodSizeOK"` + Reachable bool `json:"reachable"` + WarmedUp bool `json:"warmedUp"` + DialRatioOK bool `json:"dialRatioOK"` + // ReserveAligned/ReserveNonEmpty are only meaningful for full nodes. + ReserveAligned bool `json:"reserveAligned"` + ReserveNonEmpty bool `json:"reserveNonEmpty"` +} + +// BinView is a per-bin snapshot. Unlike bee.BinMap.String it includes empty +// bins, which is the signal we need for "is there a gap below depth?". +type BinView struct { + Index int `json:"index"` + Connected int `json:"connected"` + Disconnected int `json:"disconnected"` + Population int `json:"population"` + Empty bool `json:"empty"` +} + +type Raw struct { + Depth int `json:"depth"` + NnLowWatermark int `json:"nnLowWatermark"` + Connected int `json:"connected"` + Population int `json:"population"` + NeighborhoodSize int `json:"neighborhoodSize"` + EmptyBinsBelowDepth int `json:"emptyBinsBelowDepth"` + DialRatio float64 `json:"dialRatio"` + BeeMode string `json:"beeMode"` + IsReachable bool `json:"isReachable"` + IsWarmingUp bool `json:"isWarmingUp"` + Reachability string `json:"reachability"` + NetworkAvailability string `json:"networkAvailability"` + ReserveSize uint64 `json:"reserveSize"` + ReserveSizeWithinRadius uint64 `json:"reserveSizeWithinRadius"` + StorageRadius uint8 `json:"storageRadius"` + CommittedDepth uint8 `json:"committedDepth"` + PullsyncRate float64 `json:"pullsyncRate"` +} + +type Verdict struct { + Node string `json:"node"` + Overlay swarm.Address `json:"overlay"` + Status Status `json:"status"` + Signals Signals `json:"signals"` + Raw Raw `json:"raw"` + Bins []BinView `json:"bins"` + FailureReasons []string `json:"failureReasons,omitempty"` +} + +// Probe runs the single-node probe: /status and /topology in parallel. +func Probe(ctx context.Context, c *bee.Client, t Thresholds) (Verdict, error) { + var ( + wg sync.WaitGroup + st *api.StatusResponse + top bee.Topology + stErr error + topErr error + ) + wg.Add(2) + go func() { + defer wg.Done() + st, stErr = c.Status(ctx) + }() + go func() { + defer wg.Done() + top, topErr = c.Topology(ctx) + }() + wg.Wait() + if stErr != nil { + return Verdict{}, fmt.Errorf("status: %w", stErr) + } + if topErr != nil { + return Verdict{}, fmt.Errorf("topology: %w", topErr) + } + return Evaluate(c.Name(), st, top, t), nil +} + +// Evaluate is the pure rule-table portion of Probe, exported so the rule logic +// can be tested without a live bee node. +func Evaluate(nodeName string, st *api.StatusResponse, top bee.Topology, t Thresholds) Verdict { + bins, walk := walkBins(top, t) + signals := computeSignals(st, top, walk, t) + status, reasons := decideStatus(st, top, signals, walk) + + return Verdict{ + Node: nodeName, + Overlay: top.Overlay, + Status: status, + Signals: signals, + Bins: bins, + Raw: Raw{ + Depth: top.Depth, + NnLowWatermark: top.NnLowWatermark, + Connected: top.Connected, + Population: top.Population, + NeighborhoodSize: int(st.NeighborhoodSize), + EmptyBinsBelowDepth: walk.emptyBelow, + DialRatio: walk.dialRatio, + BeeMode: st.BeeMode, + IsReachable: st.IsReachable, + IsWarmingUp: st.IsWarmingUp, + Reachability: top.Reachability, + NetworkAvailability: top.NetworkAvailability, + ReserveSize: st.ReserveSize, + ReserveSizeWithinRadius: st.ReserveSizeWithinRadius, + StorageRadius: st.StorageRadius, + CommittedDepth: st.CommittedDepth, + PullsyncRate: st.PullsyncRate, + }, + FailureReasons: reasons, + } +} + +type binWalkResult struct { + emptyBelow int + minBelowSat float64 + belowFilled bool + neighborhoodPeers int + dialRatio float64 +} + +// walkBins iterates bins from 0..maxBin once and produces both the per-bin +// view and every aggregate the verdict rules need, including saturation +// (which depends on the threshold). +func walkBins(top bee.Topology, t Thresholds) ([]BinView, binWalkResult) { + maxBin := top.Depth + for k := range top.Bins { + idx, ok := parseBinIndex(k) + if !ok { + continue + } + if idx > maxBin { + maxBin = idx + } + } + + bins := make([]BinView, 0, maxBin+1) + r := binWalkResult{ + minBelowSat: -1, + belowFilled: true, + } + totalConnected, totalDisconnected := 0, 0 + + for i := 0; i <= maxBin; i++ { + b := top.Bins[binKey(i)] + disc := len(b.DisconnectedPeers) + bins = append(bins, BinView{ + Index: i, + Connected: b.Connected, + Disconnected: disc, + Population: b.Population, + Empty: b.Connected == 0 && b.Population == 0, + }) + totalConnected += b.Connected + totalDisconnected += disc + + if i < top.Depth { + if b.Connected == 0 { + r.belowFilled = false + r.emptyBelow++ + } + sat := float64(b.Connected) / float64(t.SaturationPeers) + if r.minBelowSat < 0 || sat < r.minBelowSat { + r.minBelowSat = sat + } + } else { + r.neighborhoodPeers += b.Connected + } + } + if r.minBelowSat < 0 { + r.minBelowSat = 1.0 + } + + r.dialRatio = 1.0 + if totalConnected+totalDisconnected > 0 { + r.dialRatio = float64(totalConnected) / float64(totalConnected+totalDisconnected) + } + return bins, r +} + +func binKey(i int) string { + return "bin_" + strconv.Itoa(i) +} + +func parseBinIndex(k string) (int, bool) { + rest, ok := strings.CutPrefix(k, "bin_") + if !ok { + return 0, false + } + idx, err := strconv.Atoi(rest) + if err != nil { + return 0, false + } + return idx, true +} + +func computeSignals(st *api.StatusResponse, top bee.Topology, w binWalkResult, t Thresholds) Signals { + reachable := st.IsReachable + if t.RequirePublicReachable { + reachable = reachable && strings.EqualFold(top.Reachability, "Public") + } + + sig := Signals{ + DepthOK: top.Depth > 0, + BelowDepthFilled: w.belowFilled, + BelowDepthSaturation: w.minBelowSat, + NeighborhoodSizeOK: w.neighborhoodPeers >= max(t.MinNeighborhoodSize, top.NnLowWatermark), + Reachable: reachable, + WarmedUp: !st.IsWarmingUp, + DialRatioOK: w.dialRatio >= t.MinDialRatio, + } + + if isFullNode(st.BeeMode) { + sig.ReserveAligned = sig.WarmedUp && st.StorageRadius == st.CommittedDepth + sig.ReserveNonEmpty = st.ReserveSize > 0 + } else { + // Light/ultra-light: marked satisfied so the verdict isn't tripped by + // fields that don't apply to that mode. + sig.ReserveAligned = true + sig.ReserveNonEmpty = true + } + return sig +} + +func isFullNode(beeMode string) bool { + return strings.EqualFold(beeMode, api.BeeModeFull) +} + +// decideStatus applies the rule table: +// +// UNHEALTHY: any of DepthOK / BelowDepthFilled / NeighborhoodSizeOK / +// Reachable / WarmedUp is false. +// DEGRADED: BelowDepthSaturation < 1.0 OR DialRatioOK false OR +// (full node && !ReserveAligned). +// HEALTHY: otherwise. +func decideStatus(st *api.StatusResponse, top bee.Topology, sig Signals, w binWalkResult) (Status, []string) { + var hard, soft []string + if !sig.DepthOK { + hard = append(hard, "depth_not_ok") + } + if !sig.BelowDepthFilled { + hard = append(hard, fmt.Sprintf("empty_bins_below_depth=%d", w.emptyBelow)) + } + if !sig.NeighborhoodSizeOK { + hard = append(hard, fmt.Sprintf("neighborhood_size=%d 0: + reasons := append(hard, soft...) + sort.Strings(reasons) + return StatusUnhealthy, reasons + case len(soft) > 0: + sort.Strings(soft) + return StatusDegraded, soft + default: + return StatusHealthy, nil + } +} diff --git a/pkg/topohealth/health_test.go b/pkg/topohealth/health_test.go new file mode 100644 index 00000000..cd124997 --- /dev/null +++ b/pkg/topohealth/health_test.go @@ -0,0 +1,119 @@ +package topohealth + +import ( + "testing" + + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/bee/api" +) + +func healthyStatus() *api.StatusResponse { + return &api.StatusResponse{ + BeeMode: api.BeeModeFull, + IsReachable: true, + IsWarmingUp: false, + ReserveSize: 1000, + StorageRadius: 3, + CommittedDepth: 3, + } +} + +func healthyTopology() bee.Topology { + bins := bee.BinMap{} + // bins below depth 3 each have 4 connected (saturated), 0 disconnected. + for i := range 3 { + bins[binKey(i)] = bee.Bin{Connected: 4, Population: 4} + } + // neighborhood (bin >= depth) + bins[binKey(3)] = bee.Bin{Connected: 3, Population: 3} + return bee.Topology{ + Depth: 3, + Connected: 15, + Population: 15, + Reachability: "Public", + Bins: bins, + } +} + +func TestEvaluate_Healthy(t *testing.T) { + v := Evaluate("n1", healthyStatus(), healthyTopology(), DefaultThresholds()) + if v.Status != StatusHealthy { + t.Fatalf("expected HEALTHY, got %s, reasons=%v", v.Status, v.FailureReasons) + } +} + +func TestEvaluate_EmptyBinBelowDepth_Unhealthy(t *testing.T) { + top := healthyTopology() + top.Bins[binKey(1)] = bee.Bin{Connected: 0, Population: 0} + v := Evaluate("n1", healthyStatus(), top, DefaultThresholds()) + if v.Status != StatusUnhealthy { + t.Fatalf("expected UNHEALTHY for missing bin below depth, got %s", v.Status) + } + if v.Raw.EmptyBinsBelowDepth != 1 { + t.Errorf("EmptyBinsBelowDepth = %d, want 1", v.Raw.EmptyBinsBelowDepth) + } +} + +func TestEvaluate_LowSaturation_Degraded(t *testing.T) { + top := healthyTopology() + top.Bins[binKey(0)] = bee.Bin{Connected: 2, Population: 2} + v := Evaluate("n1", healthyStatus(), top, DefaultThresholds()) + if v.Status != StatusDegraded { + t.Fatalf("expected DEGRADED for under-saturated bin, got %s, reasons=%v", v.Status, v.FailureReasons) + } +} + +func TestEvaluate_WarmingUp_Unhealthy(t *testing.T) { + st := healthyStatus() + st.IsWarmingUp = true + v := Evaluate("n1", st, healthyTopology(), DefaultThresholds()) + if v.Status != StatusUnhealthy { + t.Fatalf("expected UNHEALTHY while warming up, got %s", v.Status) + } +} + +func TestEvaluate_NotPublic_RequiresPublic(t *testing.T) { + top := healthyTopology() + top.Reachability = "Private" + v := Evaluate("n1", healthyStatus(), top, DefaultThresholds()) + if v.Status != StatusUnhealthy { + t.Fatalf("expected UNHEALTHY for non-Public reachability when RequirePublicReachable=true, got %s", v.Status) + } + // And HEALTHY when RequirePublicReachable=false. + thr := DefaultThresholds() + thr.RequirePublicReachable = false + v = Evaluate("n1", healthyStatus(), top, thr) + if v.Status != StatusHealthy { + t.Fatalf("expected HEALTHY with RequirePublicReachable=false, got %s, reasons=%v", v.Status, v.FailureReasons) + } +} + +func TestEvaluate_BinsViewIncludesEmpty(t *testing.T) { + top := healthyTopology() + delete(top.Bins, binKey(2)) // simulate bin_2 entirely missing from response + v := Evaluate("n1", healthyStatus(), top, DefaultThresholds()) + var found bool + for _, b := range v.Bins { + if b.Index == 2 { + found = true + if !b.Empty || b.Connected != 0 { + t.Errorf("bin_2 should be empty with 0 connected, got %+v", b) + } + } + } + if !found { + t.Fatalf("expected bin_2 in the view even though missing from response") + } +} + +func TestEvaluate_LightNodeNotPenalizedForReserve(t *testing.T) { + st := healthyStatus() + st.BeeMode = api.BeeModeLight + st.ReserveSize = 0 + st.StorageRadius = 0 + st.CommittedDepth = 0 + v := Evaluate("n1", st, healthyTopology(), DefaultThresholds()) + if v.Status != StatusHealthy { + t.Fatalf("light node with empty reserve should be HEALTHY, got %s, reasons=%v", v.Status, v.FailureReasons) + } +} diff --git a/pkg/topohealth/log.go b/pkg/topohealth/log.go new file mode 100644 index 00000000..5c4245a6 --- /dev/null +++ b/pkg/topohealth/log.go @@ -0,0 +1,74 @@ +package topohealth + +import ( + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/sirupsen/logrus" +) + +// Phase tags when in a check's lifecycle a probe was taken. +type Phase string + +const ( + PhasePreUpload Phase = "pre_upload" + PhasePreDownload Phase = "pre_download" + PhaseOnFailure Phase = "on_failure" +) + +// LogVerdict emits a structured log line for a per-node verdict. All probe +// data is attached as logrus fields; the message is a stable event name that +// CI log greps can pin on. +func LogVerdict(logger logging.Logger, phase Phase, v Verdict) { + logger.WithFields(logrus.Fields{ + "event": "topohealth.verdict", + "phase": string(phase), + "node": v.Node, + "status": v.Status.String(), + "overlay": v.Overlay.String(), + "depth": v.Raw.Depth, + "connected": v.Raw.Connected, + "population": v.Raw.Population, + "empty_below_depth": v.Raw.EmptyBinsBelowDepth, + "dial_ratio": v.Raw.DialRatio, + "reachability": v.Raw.Reachability, + "warming_up": v.Raw.IsWarmingUp, + "reserve_size": v.Raw.ReserveSize, + "storage_radius": v.Raw.StorageRadius, + "committed_depth": v.Raw.CommittedDepth, + "failure_reasons": v.FailureReasons, + }).Info("topohealth.verdict") +} + +// LogChunkCheck emits one structured line per missing or out-of-AOR chunk +// from a WalkChunks result. +func LogChunkCheck(logger logging.Logger, kind, chunkAddr string, c ChunkCheck) { + logger.WithFields(logrus.Fields{ + "event": "topohealth.chunk_check", + "kind": kind, + "upload_root": chunkAddr, + "chunk_address": c.Address.String(), + "position": string(c.Position), + "storer": c.StorerName, + "storer_overlay": c.StorerOverlay.String(), + "proximity": c.Proximity, + "storage_radius": c.StorageRadius, + "out_of_aor": c.OutOfAOR, + "present": c.Present, + "head_error": c.Error, + }).Info("topohealth.chunk_check") +} + +// LogStorerResult emits one structured line per intended-storer probe, +// including the HEAD /chunks/{addr} ground-truth. +func LogStorerResult(logger logging.Logger, chunkAddr, phase string, idx int, r StorerResult) { + logger.WithFields(logrus.Fields{ + "event": "topohealth.storer", + "phase": phase, + "storer_rank": idx, + "chunk_addr": chunkAddr, + "node": r.Verdict.Node, + "status": r.Verdict.Status.String(), + "has_chunk": r.HasChunk, + "has_error": r.HasError, + "failure_reasons": r.Verdict.FailureReasons, + }).Info("topohealth.storer") +} diff --git a/pkg/topohealth/thresholds.go b/pkg/topohealth/thresholds.go new file mode 100644 index 00000000..1c593df8 --- /dev/null +++ b/pkg/topohealth/thresholds.go @@ -0,0 +1,29 @@ +package topohealth + +// Thresholds tunes the rules used to derive a per-node Verdict. +// Defaults match a production Swarm cluster; small testnets may need lower +// SaturationPeers. +type Thresholds struct { + // SaturationPeers is the minimum number of connected peers per bin below + // the node's depth for that bin to be considered saturated. + SaturationPeers int + // MinNeighborhoodSize is the minimum number of peers within the node's + // neighborhood (bins >= depth) for the node to be considered usable. + MinNeighborhoodSize int + // MinDialRatio is the minimum connected / (connected + disconnected) ratio + // across all bins under which the node is flagged as having dial issues. + MinDialRatio float64 + // RequirePublicReachable, when true, requires reachability == "Public" + // (instead of just isReachable == true). + RequirePublicReachable bool +} + +// DefaultThresholds returns the production defaults. +func DefaultThresholds() Thresholds { + return Thresholds{ + SaturationPeers: 4, + MinNeighborhoodSize: 2, + MinDialRatio: 0.8, + RequirePublicReachable: true, + } +}