From 03f6171d43c02a097b53a65a5f4a915776c7e11c Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 8 Apr 2026 09:53:50 +0200 Subject: [PATCH 01/14] chore: remove randomness in flaky tests Fix CI --- .github/workflows/go.yml | 15 -- Makefile | 12 +- cmd/bee/cmd/db_test.go | 16 +- pkg/api/bzz_test.go | 177 +++++++++++++++++++--- pkg/feeds/epochs/lookup_test.go | 2 +- pkg/feeds/testing/lookup.go | 8 +- pkg/file/redundancy/getter/getter_test.go | 18 ++- pkg/storageincentives/proof_test.go | 58 ++++++- pkg/topology/kademlia/kademlia_test.go | 115 +++++++++++--- 9 files changed, 340 insertions(+), 81 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index c0d13d98b5a..19cb8782349 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -44,21 +44,6 @@ jobs: - name: Test without race detector (Windows) if: matrix.os == 'windows-latest' run: make test-ci - test-flaky: - name: Test (flaky) - runs-on: ubuntu-latest - continue-on-error: ${{ github.ref == 'refs/heads/master' }} - steps: - - name: Checkout - uses: actions/checkout@v5 - - name: Setup Go - uses: actions/setup-go@v6 - with: - cache: true - go-version-file: go.mod - - name: Run flaky test - run: make test-ci-flaky - continue-on-error: ${{ github.ref == 'refs/heads/master' }} lint: name: Lint runs-on: ubuntu-latest diff --git a/Makefile b/Makefile index a015bc84e38..645cbb31b5f 100644 --- a/Makefile +++ b/Makefile @@ -119,23 +119,19 @@ endif .PHONY: test-ci test-ci: ifdef cover - $(GO) test -run "[^FLAKY]$$" -coverprofile=cover.out ./... + $(GO) test -coverprofile=cover.out ./... else - $(GO) test -run "[^FLAKY]$$" ./... + $(GO) test ./... endif .PHONY: test-ci-race test-ci-race: ifdef cover - $(GO) test -race -run "[^FLAKY]$$" -coverprofile=cover.out ./... + $(GO) test -race -coverprofile=cover.out ./... else - $(GO) test -race -run "[^FLAKY]$$" ./... + $(GO) test -race ./... endif -.PHONY: test-ci-flaky -test-ci-flaky: - $(GO) test -race -run "FLAKY$$" ./... - .PHONY: build build: export CGO_ENABLED=0 build: diff --git a/cmd/bee/cmd/db_test.go b/cmd/bee/cmd/db_test.go index 85593ab0fbe..260bc40ed4b 100644 --- a/cmd/bee/cmd/db_test.go +++ b/cmd/bee/cmd/db_test.go @@ -11,6 +11,7 @@ import ( "path" "strings" "testing" + "time" "github.com/ethersphere/bee/v2/cmd/bee/cmd" "github.com/ethersphere/bee/v2/pkg/log" @@ -172,8 +173,7 @@ func TestDBExportImportPinning(t *testing.T) { } } -// TestDBNuke_FLAKY is flaky on windows. -func TestDBNuke_FLAKY(t *testing.T) { +func TestDBNuke(t *testing.T) { t.Parallel() dataDir := t.TempDir() @@ -203,7 +203,17 @@ func TestDBNuke_FLAKY(t *testing.T) { db.Close() - err = newCommand(t, cmd.WithArgs("db", "nuke", "--data-dir", dataDir)).Execute() + // Retrying avoids a short OS-level race after db.Close(), where file handles + // may still be getting released and early removal can fail on some platforms. + backoff := 50 * time.Millisecond + for range 3 { + err = newCommand(t, cmd.WithArgs("db", "nuke", "--data-dir", dataDir)).Execute() + if err == nil { + break + } + time.Sleep(backoff) + backoff *= 2 + } if err != nil { t.Fatal(err) } diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 3f444e6cb07..54f4504116e 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -7,9 +7,11 @@ package api_test import ( "bytes" "context" + "encoding/binary" "errors" "fmt" "io" + mrand "math/rand" "mime" "mime/multipart" "net/http" @@ -19,6 +21,10 @@ import ( "testing" "github.com/ethersphere/bee/v2/pkg/api" + "github.com/ethersphere/bee/v2/pkg/bmt" + "github.com/ethersphere/bee/v2/pkg/encryption" + encryptstore "github.com/ethersphere/bee/v2/pkg/encryption/store" + "github.com/ethersphere/bee/v2/pkg/file" "github.com/ethersphere/bee/v2/pkg/file/loadsave" "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" @@ -49,24 +55,21 @@ import ( // // 1. upload a file with a given redundancy level and encryption // -// 2. [positive test] download the file by the reference returned by the upload API response -// This uses range queries to target specific (number of) chunks of the file structure. -// During path traversal in the swarm hash tree, the underlying mocksore (forgetting) -// is in 'recording' mode, flagging all the retrieved chunks as chunks to forget. -// This is to simulate the scenario where some of the chunks are not available/lost -// NOTE: For this to work one needs to switch off lookaheadbuffer functionality -// (see langos pkg) +// 2. [positive test] download a few ranges by the reference returned by the upload API response +// This verifies that the upload can be read before any chunk loss is injected. // -// 3. [negative test] attempt at downloading the file using once again the same root hash +// 3. pick a random, but still recoverable, set of file chunks to lose. +// The selection respects the local parity budget of each leaf group in the +// hash tree instead of assuming a global "lose any K chunks" model. +// +// 4. [negative test] attempt at downloading the file using once again the same root hash // and the same redundancy strategy to find the file inaccessible after forgetting. // -// 4. [positive test] attempt at downloading the file using a strategy that allows for +// 5. [positive test] attempt at downloading the file using a strategy that allows for // using redundancy to reconstruct the file and find the file recoverable. // // nolint:thelper -func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { - t.Skip("flaky") - t.Parallel() +func TestBzzUploadDownloadWithRedundancy(t *testing.T) { fileUploadResource := "/bzz" fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" } @@ -98,11 +101,23 @@ func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { jsonhttptest.WithUnmarshalJSONResponse(&refResponse), ) + ls := loadsave.NewReadonly(store, store, redundancy.DefaultLevel) + m, err := manifest.NewDefaultManifestReference(refResponse.Reference, ls) + if err != nil { + t.Fatal(err) + } + rootEntry, err := m.Lookup(context.Background(), manifest.RootPath) + if err != nil { + t.Fatal(err) + } + fileName := rootEntry.Metadata()[manifest.WebsiteIndexDocumentSuffixKey] + fileEntry, err := m.Lookup(context.Background(), fileName) + if err != nil { + t.Fatal(err) + } + fileReference := fileEntry.Reference() + t.Run("download multiple ranges without redundancy should succeed", func(t *testing.T) { - // the underlying chunk store is in recording mode, so all chunks retrieved - // in this test will be forgotten in the subsequent ones. - store.Record() - defer store.Unrecord() // we intend to forget as many chunks as possible for the given redundancy level forget := min(parityCnt, shardCnt) if levels == 1 { @@ -145,16 +160,20 @@ func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { } }) + forgetRandomRecoverableFileChunks(t, store, fileReference, seed) + t.Run("download without redundancy should NOT succeed", func(t *testing.T) { if rLevel == 0 { t.Skip("NA") } + req, err := http.NewRequestWithContext(context.Background(), "GET", fileDownloadResource(refResponse.Reference.String()), nil) if err != nil { t.Fatal(err) } req.Header.Set(api.SwarmRedundancyStrategyHeader, "0") req.Header.Set(api.SwarmRedundancyFallbackModeHeader, "false") + req.Header.Set(api.SwarmRedundancyLevelHeader, fmt.Sprintf("%d", rLevel)) resp, err := client.Do(req) if err != nil { @@ -185,6 +204,7 @@ func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { } req.Header.Set(api.SwarmRedundancyStrategyHeader, "3") req.Header.Set(api.SwarmRedundancyFallbackModeHeader, "true") + req.Header.Set(api.SwarmRedundancyLevelHeader, fmt.Sprintf("%d", rLevel)) resp, err := client.Do(req) if err != nil { @@ -231,7 +251,6 @@ func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { if levels > 2 && (encrypt == (rLevel%2 == 1)) { t.Skip("skipping to save time") } - t.Parallel() testRedundancy(t, rLevel, encrypt, levels, chunkCnt, shardCnt, parityCnt) }) } @@ -731,6 +750,130 @@ func createRangeHeader(data any, ranges [][2]int) (header string, parts [][]byte return header, parts } +type leafChunkGroup struct { + refs []swarm.Address + budget int +} + +// forgetRandomRecoverableFileChunks walks one random branch from the file root down to a single +// bottom-level shard group. It then forgets a random number of data chunks from that group, +// capped by the local parity budget of that exact group, so the loss pattern stays recoverable +// under Bee's hierarchical erasure-coding model. +func forgetRandomRecoverableFileChunks(t *testing.T, store *mockstorer.ForgettingStore, fileReference swarm.Address, seed []byte) { + t.Helper() + + lossSeed := randomLossSeed(seed, fileReference) + rng := mrand.New(mrand.NewSource(lossSeed)) + + group, err := pickRandomLeafChunkGroup(context.Background(), encryptstore.New(store), fileReference, rng) + if err != nil { + t.Fatal(err) + } + budget := min(group.budget, len(group.refs)) + if budget == 0 { + t.Fatal("expected at least one recoverable chunk loss") + } + + lossCount := 1 + rng.Intn(budget) + perm := rng.Perm(len(group.refs)) + for _, idx := range perm[:lossCount] { + store.Miss(group.refs[idx]) + } + + store.Unmiss(chunkAddress(fileReference)) + t.Logf("loss seed %d: forgot %d recoverable file chunks", lossSeed, lossCount) +} + +// pickRandomLeafChunkGroup follows random data-shard references until it reaches an intermediate +// chunk whose data-shard children are leaf chunks. The returned group contains only data shards, +// and its loss budget is derived from that group's own parity count rather than from the whole tree. +func pickRandomLeafChunkGroup(ctx context.Context, getter interface { + Get(context.Context, swarm.Address) (swarm.Chunk, error) +}, ref swarm.Address, rng *mrand.Rand) (leafChunkGroup, error) { + for { + addrs, shardCnt, parityCnt, err := readChunkGroup(ctx, getter, ref) + if err != nil { + return leafChunkGroup{}, err + } + if shardCnt == 0 { + return leafChunkGroup{}, errors.New("expected intermediate chunk") + } + + childRef := addrs[rng.Intn(shardCnt)] + leaf, err := isLeafChunk(ctx, getter, childRef) + if err != nil { + return leafChunkGroup{}, err + } + if leaf { + refs := make([]swarm.Address, 0, shardCnt) + for i := 0; i < shardCnt; i++ { + refs = append(refs, chunkAddress(addrs[i])) + } + return leafChunkGroup{ + refs: refs, + budget: min(parityCnt, shardCnt), + }, nil + } + + ref = childRef + } +} + +// readChunkGroup decodes one intermediate chunk and returns its direct references together with +// the number of data shards and parity shards on that level. The random descent only picks from +// the data-shard prefix because parity refs are recovery material, not primary file content. +func readChunkGroup(ctx context.Context, getter interface { + Get(context.Context, swarm.Address) (swarm.Chunk, error) +}, ref swarm.Address) ([]swarm.Address, int, int, error) { + ch, err := getter.Get(ctx, ref) + if err != nil { + return nil, 0, 0, err + } + + level, spanBytes := redundancy.DecodeSpan(ch.Data()[:swarm.SpanSize]) + span := int64(bmt.LengthFromSpan(spanBytes)) + if span <= swarm.ChunkSize { + return nil, 0, 0, nil + } + + payload := ch.Data()[swarm.SpanSize:] + payloadSize, err := file.ChunkPayloadSize(payload) + if err != nil { + return nil, 0, 0, err + } + + encrypted := len(ref.Bytes()) == encryption.ReferenceSize + _, parityCnt := file.ReferenceCount(uint64(span), level, encrypted) + addrs, shardCnt := file.ChunkAddresses(payload[:payloadSize], parityCnt, len(ref.Bytes())) + + return addrs, shardCnt, parityCnt, nil +} + +func isLeafChunk(ctx context.Context, getter interface { + Get(context.Context, swarm.Address) (swarm.Chunk, error) +}, ref swarm.Address) (bool, error) { + ch, err := getter.Get(ctx, ref) + if err != nil { + return false, err + } + + _, spanBytes := redundancy.DecodeSpan(ch.Data()[:swarm.SpanSize]) + return int64(bmt.LengthFromSpan(spanBytes)) <= swarm.ChunkSize, nil +} + +func randomLossSeed(seed []byte, ref swarm.Address) int64 { + value := binary.LittleEndian.Uint64(seed[:8]) + refBytes := ref.Bytes() + for i := 0; i < min(len(refBytes), 8); i++ { + value ^= uint64(refBytes[i]) << (8 * i) + } + return int64(value & ((1 << 63) - 1)) +} + +func chunkAddress(ref swarm.Address) swarm.Address { + return swarm.NewAddress(ref.Bytes()[:swarm.HashSize]) +} + func parseRangeParts(t *testing.T, contentType string, body []byte) (parts [][]byte) { t.Helper() diff --git a/pkg/feeds/epochs/lookup_test.go b/pkg/feeds/epochs/lookup_test.go index e89d8b62b74..42fc618fb09 100644 --- a/pkg/feeds/epochs/lookup_test.go +++ b/pkg/feeds/epochs/lookup_test.go @@ -14,7 +14,7 @@ import ( storage "github.com/ethersphere/bee/v2/pkg/storage" ) -func TestFinder_FLAKY(t *testing.T) { +func TestFinder(t *testing.T) { t.Parallel() testf := func(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) { diff --git a/pkg/feeds/testing/lookup.go b/pkg/feeds/testing/lookup.go index e56e3c477ba..6088d143574 100644 --- a/pkg/feeds/testing/lookup.go +++ b/pkg/feeds/testing/lookup.go @@ -37,7 +37,7 @@ func (t *Timeout) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, err } return ch, err } - time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + time.Sleep(time.Duration(addr.Bytes()[0]%10) * time.Millisecond) return ch, nil } @@ -117,6 +117,7 @@ func TestFinderFixIntervals(t *testing.T, nextf func() (bool, int64), finderf fu func TestFinderIntervals(t *testing.T, nextf func() (bool, int64), finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) { storer := &Timeout{inmemchunkstore.New()} + rng := rand.New(rand.NewSource(1)) topicStr := "testtopic" topic, err := crypto.LegacyKeccak256([]byte(topicStr)) if err != nil { @@ -145,7 +146,7 @@ func TestFinderIntervals(t *testing.T, nextf func() (bool, int64), finderf func( for j := 0; j < len(ats)-1; j++ { at := ats[j] diff := ats[j+1] - at - for now := at; now < ats[j+1]; now += int64(rand.Intn(int(diff)) + 1) { + for now := at; now < ats[j+1]; now += int64(rng.Intn(int(diff)) + 1) { after := uint64(0) ch, current, next, err := finder.At(ctx, now, after) if err != nil { @@ -190,10 +191,11 @@ func TestFinderRandomIntervals(t *testing.T, finderf func(storage.Getter, *feeds t.Run(fmt.Sprintf("random intervals %d", j), func(t *testing.T) { t.Parallel() + rng := rand.New(rand.NewSource(int64(j + 1))) var i int64 var n int nextf := func() (bool, int64) { - i += int64(rand.Intn(1<<10) + 1) + i += int64(rng.Intn(1<<10) + 1) n++ return n == 40, i } diff --git a/pkg/file/redundancy/getter/getter_test.go b/pkg/file/redundancy/getter/getter_test.go index f3c9a27c3ff..1ce9df479c7 100644 --- a/pkg/file/redundancy/getter/getter_test.go +++ b/pkg/file/redundancy/getter/getter_test.go @@ -29,18 +29,19 @@ import ( // TestGetter tests the retrieval of chunks with missing data shards // using the RACE strategy for a number of erasure code parameters -func TestGetterRACE_FLAKY(t *testing.T) { +func TestGetterRACE(t *testing.T) { type getterTest struct { bufSize int shardCnt int erasureCnt int } + rng := mrand.New(mrand.NewSource(1)) var tcs []getterTest for bufSize := 3; bufSize <= 128; bufSize += 21 { for shardCnt := bufSize/2 + 1; shardCnt <= bufSize; shardCnt += 21 { parityCnt := bufSize - shardCnt - erasures := mrand.Perm(parityCnt - 1) + erasures := rng.Perm(parityCnt - 1) if len(erasures) > 3 { erasures = erasures[:3] } @@ -48,7 +49,7 @@ func TestGetterRACE_FLAKY(t *testing.T) { tcs = append(tcs, getterTest{bufSize, shardCnt, erasureCnt}) } tcs = append(tcs, getterTest{bufSize, shardCnt, parityCnt}, getterTest{bufSize, shardCnt, parityCnt + 1}) - erasures = mrand.Perm(shardCnt - 1) + erasures = rng.Perm(shardCnt - 1) if len(erasures) > 3 { erasures = erasures[:3] } @@ -99,7 +100,7 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { addrs := initData(t, buf, shardCnt, store) var addr swarm.Address - erasures := forget(t, store, addrs, erasureCnt) + erasures := forget(t, store, addrs, erasureCnt, deterministicSeed(bufSize, shardCnt, erasureCnt)) for _, i := range erasures { if i < shardCnt { addr = addrs[i] @@ -345,11 +346,12 @@ func checkShardsAvailable(t *testing.T, s storage.ChunkStore, addrs []swarm.Addr } } -func forget(t *testing.T, store storage.ChunkStore, addrs []swarm.Address, erasureCnt int) (erasures []int) { +func forget(t *testing.T, store storage.ChunkStore, addrs []swarm.Address, erasureCnt int, seed int64) (erasures []int) { t.Helper() ctx := context.TODO() - erasures = mrand.Perm(len(addrs))[:erasureCnt] + rng := mrand.New(mrand.NewSource(seed)) + erasures = rng.Perm(len(addrs))[:erasureCnt] for _, i := range erasures { err := store.Delete(ctx, addrs[i]) if err != nil { @@ -358,3 +360,7 @@ func forget(t *testing.T, store storage.ChunkStore, addrs []swarm.Address, erasu } return erasures } + +func deterministicSeed(bufSize, shardCnt, erasureCnt int) int64 { + return int64(bufSize)<<32 | int64(shardCnt)<<16 | int64(erasureCnt) +} diff --git a/pkg/storageincentives/proof_test.go b/pkg/storageincentives/proof_test.go index b0fcc8c8952..a8f545be5ec 100644 --- a/pkg/storageincentives/proof_test.go +++ b/pkg/storageincentives/proof_test.go @@ -26,11 +26,11 @@ import ( ) // Test asserts valid case for MakeInclusionProofs. -func TestMakeInclusionProofs_FLAKY(t *testing.T) { +func TestMakeInclusionProofs(t *testing.T) { t.Parallel() - anchor := testutil.RandBytes(t, 1) - sample := storer.RandSample(t, anchor) + anchor := []byte{0x1} + sample := deterministicSample(t, anchor) _, err := storageincentives.MakeInclusionProofs(sample.Items, anchor, anchor) if err != nil { @@ -38,6 +38,58 @@ func TestMakeInclusionProofs_FLAKY(t *testing.T) { } } +func deterministicSample(t *testing.T, anchor []byte) storer.Sample { + t.Helper() + + const sampleSize = storer.SampleSize + + keyRaw := `00000000000000000000000000000000` + privKey, err := crypto.DecodeSecp256k1PrivateKey([]byte(keyRaw)) + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + + stampID, _ := crypto.LegacyKeccak256([]byte("The Inverted Jenny")) + index := []byte{0, 0, 0, 0, 0, 8, 3, 3} + timestamp := []byte{0, 0, 0, 0, 0, 3, 3, 8} + stamper := func(addr swarm.Address) *postage.Stamp { + sig := postagetesting.MustNewValidSignature(signer, addr, stampID, index, timestamp) + return postage.NewStamp(stampID, index, timestamp, sig) + } + + sampleChunks := make([]swarm.Chunk, 0, sampleSize) + for i := range sampleSize { + ch, err := cac.New(fmt.Appendf(nil, "Deterministic sample chunk #%d", i+1)) + if err != nil { + t.Fatal(err) + } + + if i%2 == 0 { + id, err := crypto.LegacyKeccak256(fmt.Appendf(nil, "Deterministic ID #%d", i+1)) + if err != nil { + t.Fatal(err) + } + + socCh, err := soc.New(id, ch).Sign(signer) + if err != nil { + t.Fatal(err) + } + + ch = socCh + } + + sampleChunks = append(sampleChunks, ch.WithStamp(stamper(ch.Address()))) + } + + sample, err := storer.MakeSampleUsingChunks(sampleChunks, anchor) + if err != nil { + t.Fatal(err) + } + + return sample +} + //go:embed testdata/inclusion-proofs.json var testData []byte diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 9349f283278..8cd1bc9ef07 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -887,7 +887,7 @@ func TestAddressBookPrune(t *testing.T) { } // test pruning addressbook after successive failed connect attempts -func TestAddressBookQuickPrune_FLAKY(t *testing.T) { +func TestAddressBookQuickPrune(t *testing.T) { t.Parallel() var ( @@ -917,24 +917,19 @@ func TestAddressBookQuickPrune_FLAKY(t *testing.T) { // add one valid peer addOne(t, signer, kad, ab, addr) waitCounter(t, &conns, 1) - waitCounter(t, &failedConns, 0) // add non connectable peer, check connection and failed connection counters kad.AddPeers(nonConnPeer.Overlay) - waitCounter(t, &conns, 0) - waitCounter(t, &failedConns, 1) + waitCounterAtLeast(t, &failedConns, 1) // we need to trigger connection attempts maxConnAttempts times for range 3 { time.Sleep(10 * time.Millisecond) kad.Trigger() - waitCounter(t, &failedConns, 1) + waitCounterAtLeast(t, &failedConns, 1) } - _, err = ab.Get(nonConnPeer.Overlay) - if !errors.Is(err, addressbook.ErrNotFound) { - t.Fatal(err) - } + waitAddressBookNotFound(t, ab, nonConnPeer.Overlay) } func TestClosestPeer(t *testing.T) { @@ -1177,7 +1172,7 @@ func TestKademlia_SubscribeTopologyChange(t *testing.T) { }) } -func TestSnapshot_FLAKY(t *testing.T) { +func TestSnapshot(t *testing.T) { t.Parallel() conns := new(int32) @@ -1192,7 +1187,8 @@ func TestSnapshot_FLAKY(t *testing.T) { waitConn(t, conns) - snap := kad.Snapshot() + po := swarm.Proximity(sa.Bytes(), a.Bytes()) + snap := waitSnapshot(t, kad, po, 1, 1, 1) if snap.Connected != 1 { t.Errorf("expected %d connected peers but got %d", 1, snap.Connected) @@ -1200,9 +1196,6 @@ func TestSnapshot_FLAKY(t *testing.T) { if snap.Population != 1 { t.Errorf("expected population %d but got %d", 1, snap.Population) } - - po := swarm.Proximity(sa.Bytes(), a.Bytes()) - if binP := getBinPopulation(&snap.Bins, po); binP != 1 { t.Errorf("expected bin(%d) to have population %d but got %d", po, 1, snap.Population) } @@ -1596,20 +1589,24 @@ func TestBootnodeProtectedNodes(t *testing.T) { } } -func TestAnnounceBgBroadcast_FLAKY(t *testing.T) { +func TestAnnounceBgBroadcast(t *testing.T) { t.Parallel() var ( - conns int32 - bgDone = make(chan struct{}) - p1, p2 = swarm.RandAddress(t), swarm.RandAddress(t) - disc = mock.NewDiscovery( + conns int32 + bgStarted = make(chan struct{}) + bgDone = make(chan struct{}) + startOnce sync.Once + doneOnce sync.Once + p1, p2 = swarm.RandAddress(t), swarm.RandAddress(t) + disc = mock.NewDiscovery( mock.WithBroadcastPeers(func(ctx context.Context, p swarm.Address, _ ...swarm.Address) error { // For the broadcast back to connected peer return early if p.Equal(p2) { return nil } - defer close(bgDone) + startOnce.Do(func() { close(bgStarted) }) + defer doneOnce.Do(func() { close(bgDone) }) <-ctx.Done() return ctx.Err() }), @@ -1634,6 +1631,7 @@ func TestAnnounceBgBroadcast_FLAKY(t *testing.T) { if err := kad.Announce(ctx, p2, true); err != nil { t.Fatal(err) } + waitChanClosed(t, bgStarted) // cancellation should not close background broadcast cancel() @@ -1650,11 +1648,7 @@ func TestAnnounceBgBroadcast_FLAKY(t *testing.T) { t.Fatal(err) } - select { - case <-bgDone: - case <-time.After(time.Millisecond * 100): - t.Fatal("background broadcast did not exit on close") - } + waitChanClosed(t, bgDone) } func TestAnnounceNeighborhoodToNeighbor(t *testing.T) { @@ -2176,6 +2170,23 @@ func waitCounter(t *testing.T, conns *int32, exp int32) { } } +func waitCounterAtLeast(t *testing.T, conns *int32, exp int32) { + t.Helper() + var got int32 + + err := spinlock.Wait(spinLockWaitTime, func() bool { + got = atomic.LoadInt32(conns) + if got < exp { + return false + } + atomic.StoreInt32(conns, 0) + return true + }) + if err != nil { + t.Fatalf("timed out waiting for counter to reach at least expected value. got %d want >= %d", got, exp) + } +} + func waitPeers(t *testing.T, k *kademlia.Kad, peers int) { t.Helper() @@ -2220,6 +2231,60 @@ func waitBcast(t *testing.T, d *mock.Discovery, pivot swarm.Address, addrs ...sw } } +func waitChanClosed(t *testing.T, ch <-chan struct{}) { + t.Helper() + + err := spinlock.Wait(spinLockWaitTime, func() bool { + select { + case <-ch: + return true + default: + return false + } + }) + if err != nil { + t.Fatal("timed out waiting for channel to close") + } +} + +func waitAddressBookNotFound(t *testing.T, ab addressbook.Interface, addr swarm.Address) { + t.Helper() + + var err error + waitErr := spinlock.Wait(spinLockWaitTime, func() bool { + _, err = ab.Get(addr) + return errors.Is(err, addressbook.ErrNotFound) + }) + if waitErr != nil { + t.Fatalf("timed out waiting for addressbook prune. last error: %v", err) + } +} + +func waitSnapshot(t *testing.T, kad *kademlia.Kad, po uint8, connected, population, binPopulation uint64) *topology.KadParams { + t.Helper() + + var snap *topology.KadParams + + if err := spinlock.Wait(spinLockWaitTime, func() bool { + snap = kad.Snapshot() + return uint64(snap.Connected) == connected && + uint64(snap.Population) == population && + getBinPopulation(&snap.Bins, po) == binPopulation + }); err != nil { + t.Fatalf( + "timed out waiting for snapshot. got connected=%d population=%d binPopulation=%d want connected=%d population=%d binPopulation=%d", + snap.Connected, + snap.Population, + getBinPopulation(&snap.Bins, po), + connected, + population, + binPopulation, + ) + } + + return snap +} + // waitBalanced waits for kademlia to be balanced for specified bin. func waitBalanced(t *testing.T, k *kademlia.Kad, bin uint8) { t.Helper() From caa13c905f9991a8521acbd4c67075c6dd833dcd Mon Sep 17 00:00:00 2001 From: sbackend Date: Thu, 9 Apr 2026 13:51:49 +0200 Subject: [PATCH 02/14] fix: read redundancy level from header instead of using default --- pkg/api/bzz.go | 2 +- pkg/api/bzz_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index d99594bc682..783c8c5ae79 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -499,7 +499,7 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV } ctx := r.Context() - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), rLevel) feedDereferenced := false ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger) diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 3f444e6cb07..cfbaa508406 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -65,7 +65,6 @@ import ( // // nolint:thelper func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { - t.Skip("flaky") t.Parallel() fileUploadResource := "/bzz" fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" } From 6e8d2c61f181ad8c542b600900f1ee8c20a6436b Mon Sep 17 00:00:00 2001 From: sbackend Date: Sun, 12 Apr 2026 22:02:37 +0200 Subject: [PATCH 03/14] fix: skip test --- pkg/api/bzz_test.go | 179 +++++--------------------------------------- 1 file changed, 19 insertions(+), 160 deletions(-) diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 54f4504116e..44c94d4e95f 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -7,11 +7,9 @@ package api_test import ( "bytes" "context" - "encoding/binary" "errors" "fmt" "io" - mrand "math/rand" "mime" "mime/multipart" "net/http" @@ -21,10 +19,6 @@ import ( "testing" "github.com/ethersphere/bee/v2/pkg/api" - "github.com/ethersphere/bee/v2/pkg/bmt" - "github.com/ethersphere/bee/v2/pkg/encryption" - encryptstore "github.com/ethersphere/bee/v2/pkg/encryption/store" - "github.com/ethersphere/bee/v2/pkg/file" "github.com/ethersphere/bee/v2/pkg/file/loadsave" "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" @@ -55,21 +49,26 @@ import ( // // 1. upload a file with a given redundancy level and encryption // -// 2. [positive test] download a few ranges by the reference returned by the upload API response -// This verifies that the upload can be read before any chunk loss is injected. +// 2. [positive test] download the file by the reference returned by the upload API response +// This uses range queries to target specific (number of) chunks of the file structure. +// During path traversal in the swarm hash tree, the underlying mocksore (forgetting) +// is in 'recording' mode, flagging all the retrieved chunks as chunks to forget. +// This is to simulate the scenario where some of the chunks are not available/lost +// NOTE: For this to work one needs to switch off lookaheadbuffer functionality +// (see langos pkg) // -// 3. pick a random, but still recoverable, set of file chunks to lose. -// The selection respects the local parity budget of each leaf group in the -// hash tree instead of assuming a global "lose any K chunks" model. -// -// 4. [negative test] attempt at downloading the file using once again the same root hash +// 3. [negative test] attempt at downloading the file using once again the same root hash // and the same redundancy strategy to find the file inaccessible after forgetting. // -// 5. [positive test] attempt at downloading the file using a strategy that allows for +// 4. [positive test] attempt at downloading the file using a strategy that allows for // using redundancy to reconstruct the file and find the file recoverable. // // nolint:thelper -func TestBzzUploadDownloadWithRedundancy(t *testing.T) { +// This test constantly fails, because of inconsistency usage of api.SwarmRedundancyLevelHeader (look issue github.com/ethersphere/bee/issues/5282) +// after issue will be fixed, test will be not skipped anymore +func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { + t.Skip("flaky") + t.Parallel() fileUploadResource := "/bzz" fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" } @@ -101,23 +100,11 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) { jsonhttptest.WithUnmarshalJSONResponse(&refResponse), ) - ls := loadsave.NewReadonly(store, store, redundancy.DefaultLevel) - m, err := manifest.NewDefaultManifestReference(refResponse.Reference, ls) - if err != nil { - t.Fatal(err) - } - rootEntry, err := m.Lookup(context.Background(), manifest.RootPath) - if err != nil { - t.Fatal(err) - } - fileName := rootEntry.Metadata()[manifest.WebsiteIndexDocumentSuffixKey] - fileEntry, err := m.Lookup(context.Background(), fileName) - if err != nil { - t.Fatal(err) - } - fileReference := fileEntry.Reference() - t.Run("download multiple ranges without redundancy should succeed", func(t *testing.T) { + // the underlying chunk store is in recording mode, so all chunks retrieved + // in this test will be forgotten in the subsequent ones. + store.Record() + defer store.Unrecord() // we intend to forget as many chunks as possible for the given redundancy level forget := min(parityCnt, shardCnt) if levels == 1 { @@ -160,20 +147,16 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) { } }) - forgetRandomRecoverableFileChunks(t, store, fileReference, seed) - t.Run("download without redundancy should NOT succeed", func(t *testing.T) { if rLevel == 0 { t.Skip("NA") } - req, err := http.NewRequestWithContext(context.Background(), "GET", fileDownloadResource(refResponse.Reference.String()), nil) if err != nil { t.Fatal(err) } req.Header.Set(api.SwarmRedundancyStrategyHeader, "0") req.Header.Set(api.SwarmRedundancyFallbackModeHeader, "false") - req.Header.Set(api.SwarmRedundancyLevelHeader, fmt.Sprintf("%d", rLevel)) resp, err := client.Do(req) if err != nil { @@ -204,7 +187,6 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) { } req.Header.Set(api.SwarmRedundancyStrategyHeader, "3") req.Header.Set(api.SwarmRedundancyFallbackModeHeader, "true") - req.Header.Set(api.SwarmRedundancyLevelHeader, fmt.Sprintf("%d", rLevel)) resp, err := client.Do(req) if err != nil { @@ -251,6 +233,7 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) { if levels > 2 && (encrypt == (rLevel%2 == 1)) { t.Skip("skipping to save time") } + t.Parallel() testRedundancy(t, rLevel, encrypt, levels, chunkCnt, shardCnt, parityCnt) }) } @@ -750,130 +733,6 @@ func createRangeHeader(data any, ranges [][2]int) (header string, parts [][]byte return header, parts } -type leafChunkGroup struct { - refs []swarm.Address - budget int -} - -// forgetRandomRecoverableFileChunks walks one random branch from the file root down to a single -// bottom-level shard group. It then forgets a random number of data chunks from that group, -// capped by the local parity budget of that exact group, so the loss pattern stays recoverable -// under Bee's hierarchical erasure-coding model. -func forgetRandomRecoverableFileChunks(t *testing.T, store *mockstorer.ForgettingStore, fileReference swarm.Address, seed []byte) { - t.Helper() - - lossSeed := randomLossSeed(seed, fileReference) - rng := mrand.New(mrand.NewSource(lossSeed)) - - group, err := pickRandomLeafChunkGroup(context.Background(), encryptstore.New(store), fileReference, rng) - if err != nil { - t.Fatal(err) - } - budget := min(group.budget, len(group.refs)) - if budget == 0 { - t.Fatal("expected at least one recoverable chunk loss") - } - - lossCount := 1 + rng.Intn(budget) - perm := rng.Perm(len(group.refs)) - for _, idx := range perm[:lossCount] { - store.Miss(group.refs[idx]) - } - - store.Unmiss(chunkAddress(fileReference)) - t.Logf("loss seed %d: forgot %d recoverable file chunks", lossSeed, lossCount) -} - -// pickRandomLeafChunkGroup follows random data-shard references until it reaches an intermediate -// chunk whose data-shard children are leaf chunks. The returned group contains only data shards, -// and its loss budget is derived from that group's own parity count rather than from the whole tree. -func pickRandomLeafChunkGroup(ctx context.Context, getter interface { - Get(context.Context, swarm.Address) (swarm.Chunk, error) -}, ref swarm.Address, rng *mrand.Rand) (leafChunkGroup, error) { - for { - addrs, shardCnt, parityCnt, err := readChunkGroup(ctx, getter, ref) - if err != nil { - return leafChunkGroup{}, err - } - if shardCnt == 0 { - return leafChunkGroup{}, errors.New("expected intermediate chunk") - } - - childRef := addrs[rng.Intn(shardCnt)] - leaf, err := isLeafChunk(ctx, getter, childRef) - if err != nil { - return leafChunkGroup{}, err - } - if leaf { - refs := make([]swarm.Address, 0, shardCnt) - for i := 0; i < shardCnt; i++ { - refs = append(refs, chunkAddress(addrs[i])) - } - return leafChunkGroup{ - refs: refs, - budget: min(parityCnt, shardCnt), - }, nil - } - - ref = childRef - } -} - -// readChunkGroup decodes one intermediate chunk and returns its direct references together with -// the number of data shards and parity shards on that level. The random descent only picks from -// the data-shard prefix because parity refs are recovery material, not primary file content. -func readChunkGroup(ctx context.Context, getter interface { - Get(context.Context, swarm.Address) (swarm.Chunk, error) -}, ref swarm.Address) ([]swarm.Address, int, int, error) { - ch, err := getter.Get(ctx, ref) - if err != nil { - return nil, 0, 0, err - } - - level, spanBytes := redundancy.DecodeSpan(ch.Data()[:swarm.SpanSize]) - span := int64(bmt.LengthFromSpan(spanBytes)) - if span <= swarm.ChunkSize { - return nil, 0, 0, nil - } - - payload := ch.Data()[swarm.SpanSize:] - payloadSize, err := file.ChunkPayloadSize(payload) - if err != nil { - return nil, 0, 0, err - } - - encrypted := len(ref.Bytes()) == encryption.ReferenceSize - _, parityCnt := file.ReferenceCount(uint64(span), level, encrypted) - addrs, shardCnt := file.ChunkAddresses(payload[:payloadSize], parityCnt, len(ref.Bytes())) - - return addrs, shardCnt, parityCnt, nil -} - -func isLeafChunk(ctx context.Context, getter interface { - Get(context.Context, swarm.Address) (swarm.Chunk, error) -}, ref swarm.Address) (bool, error) { - ch, err := getter.Get(ctx, ref) - if err != nil { - return false, err - } - - _, spanBytes := redundancy.DecodeSpan(ch.Data()[:swarm.SpanSize]) - return int64(bmt.LengthFromSpan(spanBytes)) <= swarm.ChunkSize, nil -} - -func randomLossSeed(seed []byte, ref swarm.Address) int64 { - value := binary.LittleEndian.Uint64(seed[:8]) - refBytes := ref.Bytes() - for i := 0; i < min(len(refBytes), 8); i++ { - value ^= uint64(refBytes[i]) << (8 * i) - } - return int64(value & ((1 << 63) - 1)) -} - -func chunkAddress(ref swarm.Address) swarm.Address { - return swarm.NewAddress(ref.Bytes()[:swarm.HashSize]) -} - func parseRangeParts(t *testing.T, contentType string, body []byte) (parts [][]byte) { t.Helper() From de589f37c9849db82a94f6c8c6902f9c82e2164f Mon Sep 17 00:00:00 2001 From: sbackend Date: Sun, 12 Apr 2026 23:56:21 +0200 Subject: [PATCH 04/14] fix: closing db on cache level --- pkg/storage/cache/cache.go | 5 +++++ pkg/storage/cache/cache_test.go | 2 -- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/storage/cache/cache.go b/pkg/storage/cache/cache.go index c727b428288..ac42208a709 100644 --- a/pkg/storage/cache/cache.go +++ b/pkg/storage/cache/cache.go @@ -5,6 +5,8 @@ package cache import ( + "io" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/storageutil" lru "github.com/hashicorp/golang-lru/v2" @@ -98,5 +100,8 @@ func (c *Cache) Delete(i storage.Item) error { func (c *Cache) Close() error { c.lru.Purge() + if closer, ok := c.IndexStore.(io.Closer); ok { + return closer.Close() + } return nil } diff --git a/pkg/storage/cache/cache_test.go b/pkg/storage/cache/cache_test.go index 9cbd0478cc8..3f5cba46f10 100644 --- a/pkg/storage/cache/cache_test.go +++ b/pkg/storage/cache/cache_test.go @@ -10,7 +10,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/storage/cache" "github.com/ethersphere/bee/v2/pkg/storage/leveldbstore" "github.com/ethersphere/bee/v2/pkg/storage/storagetest" - "github.com/ethersphere/bee/v2/pkg/util/testutil" ) func TestCache(t *testing.T) { @@ -20,7 +19,6 @@ func TestCache(t *testing.T) { if err != nil { t.Fatalf("create store failed: %v", err) } - testutil.CleanupCloser(t, store) cache, err := cache.Wrap(store, 100_000) if err != nil { From 0927a96dbe2e422b6e0ce95eacc2b463846d4e5b Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 13 Apr 2026 00:08:40 +0200 Subject: [PATCH 05/14] fix: remove generating deterministic seed --- pkg/storageincentives/proof_test.go | 56 ++--------------------------- 1 file changed, 2 insertions(+), 54 deletions(-) diff --git a/pkg/storageincentives/proof_test.go b/pkg/storageincentives/proof_test.go index a8f545be5ec..524d825fb8e 100644 --- a/pkg/storageincentives/proof_test.go +++ b/pkg/storageincentives/proof_test.go @@ -29,8 +29,8 @@ import ( func TestMakeInclusionProofs(t *testing.T) { t.Parallel() - anchor := []byte{0x1} - sample := deterministicSample(t, anchor) + anchor := testutil.RandBytes(t, 1) + sample := storer.RandSample(t, anchor) _, err := storageincentives.MakeInclusionProofs(sample.Items, anchor, anchor) if err != nil { @@ -38,58 +38,6 @@ func TestMakeInclusionProofs(t *testing.T) { } } -func deterministicSample(t *testing.T, anchor []byte) storer.Sample { - t.Helper() - - const sampleSize = storer.SampleSize - - keyRaw := `00000000000000000000000000000000` - privKey, err := crypto.DecodeSecp256k1PrivateKey([]byte(keyRaw)) - if err != nil { - t.Fatal(err) - } - signer := crypto.NewDefaultSigner(privKey) - - stampID, _ := crypto.LegacyKeccak256([]byte("The Inverted Jenny")) - index := []byte{0, 0, 0, 0, 0, 8, 3, 3} - timestamp := []byte{0, 0, 0, 0, 0, 3, 3, 8} - stamper := func(addr swarm.Address) *postage.Stamp { - sig := postagetesting.MustNewValidSignature(signer, addr, stampID, index, timestamp) - return postage.NewStamp(stampID, index, timestamp, sig) - } - - sampleChunks := make([]swarm.Chunk, 0, sampleSize) - for i := range sampleSize { - ch, err := cac.New(fmt.Appendf(nil, "Deterministic sample chunk #%d", i+1)) - if err != nil { - t.Fatal(err) - } - - if i%2 == 0 { - id, err := crypto.LegacyKeccak256(fmt.Appendf(nil, "Deterministic ID #%d", i+1)) - if err != nil { - t.Fatal(err) - } - - socCh, err := soc.New(id, ch).Sign(signer) - if err != nil { - t.Fatal(err) - } - - ch = socCh - } - - sampleChunks = append(sampleChunks, ch.WithStamp(stamper(ch.Address()))) - } - - sample, err := storer.MakeSampleUsingChunks(sampleChunks, anchor) - if err != nil { - t.Fatal(err) - } - - return sample -} - //go:embed testdata/inclusion-proofs.json var testData []byte From 99c4663e8cbfd4bb61c440eaffec55d73913d80b Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 13 Apr 2026 00:40:07 +0200 Subject: [PATCH 06/14] fix: make pruning test more strict --- pkg/topology/kademlia/kademlia_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 8cd1bc9ef07..c5615b84e61 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -917,10 +917,12 @@ func TestAddressBookQuickPrune(t *testing.T) { // add one valid peer addOne(t, signer, kad, ab, addr) waitCounter(t, &conns, 1) + waitCounter(t, &failedConns, 0) // add non connectable peer, check connection and failed connection counters kad.AddPeers(nonConnPeer.Overlay) - waitCounterAtLeast(t, &failedConns, 1) + waitCounter(t, &conns, 0) + waitCounter(t, &failedConns, 1) // we need to trigger connection attempts maxConnAttempts times for range 3 { From 3ec43d8ef26f69cc3b26193094d729948ba0a189 Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 13 Apr 2026 11:41:21 +0200 Subject: [PATCH 07/14] fix: fixed remaining flaky tests Keep flaky-test ci job for one remaining flaky test --- .github/workflows/go.yml | 21 +++++ Makefile | 12 ++- pkg/pushsync/pushsync_test.go | 119 ++++++++++++++++++++----- pkg/topology/kademlia/kademlia_test.go | 3 +- 4 files changed, 128 insertions(+), 27 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 19cb8782349..681a3253496 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -4,11 +4,17 @@ on: push: paths-ignore: - packaging/** + - openapi/** + - '**/*.md' + - '.github/ISSUE_TEMPLATE/**' branches: - 'master' pull_request: paths-ignore: - packaging/** + - openapi/** + - '**/*.md' + - '.github/ISSUE_TEMPLATE/**' branches: - '**' @@ -44,6 +50,21 @@ jobs: - name: Test without race detector (Windows) if: matrix.os == 'windows-latest' run: make test-ci + test-flaky: + name: Test (flaky) + runs-on: ubuntu-latest + continue-on-error: ${{ github.ref == 'refs/heads/master' }} + steps: + - name: Checkout + uses: actions/checkout@v5 + - name: Setup Go + uses: actions/setup-go@v6 + with: + cache: true + go-version-file: go.mod + - name: Run flaky test + run: make test-ci-flaky + continue-on-error: ${{ github.ref == 'refs/heads/master' }} lint: name: Lint runs-on: ubuntu-latest diff --git a/Makefile b/Makefile index 645cbb31b5f..a015bc84e38 100644 --- a/Makefile +++ b/Makefile @@ -119,19 +119,23 @@ endif .PHONY: test-ci test-ci: ifdef cover - $(GO) test -coverprofile=cover.out ./... + $(GO) test -run "[^FLAKY]$$" -coverprofile=cover.out ./... else - $(GO) test ./... + $(GO) test -run "[^FLAKY]$$" ./... endif .PHONY: test-ci-race test-ci-race: ifdef cover - $(GO) test -race -coverprofile=cover.out ./... + $(GO) test -race -run "[^FLAKY]$$" -coverprofile=cover.out ./... else - $(GO) test -race ./... + $(GO) test -race -run "[^FLAKY]$$" ./... endif +.PHONY: test-ci-flaky +test-ci-flaky: + $(GO) test -race -run "FLAKY$$" ./... + .PHONY: build build: export CGO_ENABLED=0 build: diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index 90e6d654a5b..8931538f4fd 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -26,6 +26,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/pushsync" "github.com/ethersphere/bee/v2/pkg/pushsync/pb" "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/spinlock" stabilmock "github.com/ethersphere/bee/v2/pkg/stabilization/mock" "github.com/ethersphere/bee/v2/pkg/storage" testingc "github.com/ethersphere/bee/v2/pkg/storage/testing" @@ -35,7 +36,8 @@ import ( ) const ( - fixedPrice = uint64(10) + fixedPrice = uint64(10) + pushNextClosestSpinlockWait = 5 * time.Second ) var blockHash = common.HexToHash("0x1") @@ -383,7 +385,6 @@ func TestPushChunkToClosest(t *testing.T) { func TestPushChunkToNextClosest(t *testing.T) { t.Parallel() - t.Skip("flaky test") // chunk data to upload chunk := testingc.FixtureChunk("7000").WithTagID(1) @@ -446,12 +447,6 @@ func TestPushChunkToNextClosest(t *testing.T) { t.Fatal("invalid receipt") } - // this intercepts the outgoing delivery message - waitOnRecordAndTest(t, peer2, recorder, chunk.Address(), chunk.Data()) - - // this intercepts the incoming receipt message - waitOnRecordAndTest(t, peer2, recorder, chunk.Address(), nil) - found, count := pivotStorer.hasReported(t, chunk.Address()) if !found { t.Fatalf("chunk %s not reported", chunk.Address()) @@ -464,32 +459,114 @@ func TestPushChunkToNextClosest(t *testing.T) { t.Fatalf("tags error got %d want >= 1", count) } - balance, err := pivotAccounting.Balance(peer1) + waitPushStreamsToBothClosestPeers(t, recorder, peer1, peer2) + + // The storer that applied PrepareDebit books +price toward the pivot. + peerThatEarnedFromPush(t, peer1, peerAccounting1, peer2, peerAccounting2, pivotNode) + if !anyPeerHasValidPushStream(t, recorder, []swarm.Address{peer1, peer2}, chunk) { + t.Fatal("expected at least one closest peer with a full delivery+receipt stream record") + } + b1, err := pivotAccounting.Balance(peer1) if err != nil { t.Fatal(err) } - - if balance.Int64() != -int64(fixedPrice) { - t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance) - } - - balance2, err := peerAccounting2.Balance(pivotNode) + b2, err := pivotAccounting.Balance(peer2) if err != nil { t.Fatal(err) } + sum := b1.Int64() + b2.Int64() + if sum != -int64(fixedPrice) { + t.Fatalf("pivot combined balance toward closest peers: want %d got %d (%s=%d %s=%d)", + -int64(fixedPrice), sum, peer1, b1.Int64(), peer2, b2.Int64()) + } +} - if balance2.Int64() != int64(fixedPrice) { - t.Fatalf("unexpected balance on peer2. want %d got %d", int64(fixedPrice), balance2) +// peerThatEarnedFromPush asserts exactly one closest peer has balance +fixedPrice toward the pivot (successful storer debit). +func peerThatEarnedFromPush( + t *testing.T, + peer1 swarm.Address, peerAcct1 accounting.Interface, + peer2 swarm.Address, peerAcct2 accounting.Interface, + pivot swarm.Address, +) { + t.Helper() + want := int64(fixedPrice) + n := 0 + for _, p := range []struct { + addr swarm.Address + acct accounting.Interface + }{{peer1, peerAcct1}, {peer2, peerAcct2}} { + b, err := p.acct.Balance(pivot) + if err != nil { + t.Fatal(err) + } + if b.Int64() == want { + n++ + } + } + if n != 1 { + t.Fatalf("want exactly one peer with balance +%d toward pivot, got %d", fixedPrice, n) } +} - balance1, err := peerAccounting1.Balance(peer2) +func anyPeerHasValidPushStream(t *testing.T, recorder *streamtest.Recorder, peers []swarm.Address, chunk swarm.Chunk) bool { + t.Helper() + for _, p := range peers { + recs, err := recorder.Records(p, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName) + if err != nil { + continue + } + for _, rec := range recs { + if pushStreamRecordValid(rec, chunk) { + return true + } + } + } + return false +} + +// waitPushStreamsToBothClosestPeers waits until the recorder shows at least one push stream +// opened to each peer, i.e. the pivot attempted another closest after the first path. +func waitPushStreamsToBothClosestPeers(t *testing.T, recorder *streamtest.Recorder, peer1, peer2 swarm.Address) { + t.Helper() + err := spinlock.Wait(pushNextClosestSpinlockWait, func() bool { + r1, e1 := recorder.Records(peer1, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName) + r2, e2 := recorder.Records(peer2, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName) + return e1 == nil && len(r1) > 0 && e2 == nil && len(r2) > 0 + }) if err != nil { - t.Fatal(err) + t.Fatal("timed out waiting for push streams to both closest peers") } +} - if balance1.Int64() != 0 { - t.Fatalf("unexpected balance on peer1. want %d got %d", 0, balance1) +func pushStreamRecordValid(rec *streamtest.Record, ch swarm.Chunk) bool { + in := rec.In() + if len(in) == 0 { + return false + } + dmsgs, err := protobuf.ReadMessages( + bytes.NewReader(in), + func() protobuf.Message { return new(pb.Delivery) }, + ) + if err != nil || len(dmsgs) != 1 { + return false + } + d := dmsgs[0].(*pb.Delivery) + if !bytes.Equal(d.Address, ch.Address().Bytes()) || !bytes.Equal(d.Data, ch.Data()) { + return false + } + out := rec.Out() + if len(out) == 0 { + return false + } + rmsgs, err := protobuf.ReadMessages( + bytes.NewReader(out), + func() protobuf.Message { return new(pb.Receipt) }, + ) + if err != nil || len(rmsgs) != 1 { + return false } + rcpt := rmsgs[0].(*pb.Receipt) + return swarm.NewAddress(rcpt.Address).Equal(ch.Address()) } func TestPushChunkToClosestErrorAttemptRetry(t *testing.T) { diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index c5615b84e61..3e651d22637 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -1230,7 +1230,6 @@ func TestStart(t *testing.T) { t.Run("non-empty addressbook", func(t *testing.T) { t.Parallel() - t.Skip("test flakes") var conns, failedConns int32 // how many connect calls were made to the p2p mock _, kad, ab, _, signer := newTestKademlia(t, &conns, &failedConns, kademlia.Options{Bootnodes: bootnodes}) @@ -1255,7 +1254,7 @@ func TestStart(t *testing.T) { } testutil.CleanupCloser(t, kad) - waitCounter(t, &conns, 3) + waitCounterAtLeast(t, &conns, 3) waitCounter(t, &failedConns, 0) }) From c882d598d5de5a2f55e8d6c0baa79060277b1d13 Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 13 Apr 2026 12:44:27 +0200 Subject: [PATCH 08/14] fix: clean up --- pkg/feeds/testing/lookup.go | 2 +- pkg/topology/kademlia/export_test.go | 1 + pkg/topology/kademlia/kademlia_test.go | 5 ++--- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/feeds/testing/lookup.go b/pkg/feeds/testing/lookup.go index 6088d143574..99c424348b2 100644 --- a/pkg/feeds/testing/lookup.go +++ b/pkg/feeds/testing/lookup.go @@ -37,7 +37,7 @@ func (t *Timeout) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, err } return ch, err } - time.Sleep(time.Duration(addr.Bytes()[0]%10) * time.Millisecond) + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) return ch, nil } diff --git a/pkg/topology/kademlia/export_test.go b/pkg/topology/kademlia/export_test.go index 3c5ad3e1390..12a7c299281 100644 --- a/pkg/topology/kademlia/export_test.go +++ b/pkg/topology/kademlia/export_test.go @@ -21,6 +21,7 @@ const ( DefaultBitSuffixLength = defaultBitSuffixLength DefaultSaturationPeers = defaultSaturationPeers DefaultOverSaturationPeers = defaultOverSaturationPeers + MaxConnAttempts = maxConnAttempts ) type PeerExcludeFunc = peerExcludeFunc diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 3e651d22637..87e65ac244d 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -924,11 +924,10 @@ func TestAddressBookQuickPrune(t *testing.T) { waitCounter(t, &conns, 0) waitCounter(t, &failedConns, 1) - // we need to trigger connection attempts maxConnAttempts times - for range 3 { + for range kademlia.MaxConnAttempts { time.Sleep(10 * time.Millisecond) kad.Trigger() - waitCounterAtLeast(t, &failedConns, 1) + waitCounter(t, &failedConns, 1) } waitAddressBookNotFound(t, ab, nonConnPeer.Overlay) From aea26b04d67b2e52c0babae9970d6b73f48b4883 Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 13 Apr 2026 13:54:28 +0200 Subject: [PATCH 09/14] fix: re-write quick prune test --- pkg/topology/kademlia/kademlia_test.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 87e65ac244d..399c60d6186 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -891,8 +891,8 @@ func TestAddressBookQuickPrune(t *testing.T) { t.Parallel() var ( - conns, failedConns int32 // how many connect calls were made to the p2p mock - base, kad, ab, _, signer = newTestKademlia(t, &conns, &failedConns, kademlia.Options{ + failedConns int32 + base, kad, ab, _, signer = newTestKademlia(t, nil, &failedConns, kademlia.Options{ TimeToRetry: new(time.Millisecond), }) ) @@ -913,23 +913,18 @@ func TestAddressBookQuickPrune(t *testing.T) { t.Fatal(err) } - addr := swarm.RandAddressAt(t, base, 1) - // add one valid peer - addOne(t, signer, kad, ab, addr) - waitCounter(t, &conns, 1) - waitCounter(t, &failedConns, 0) - - // add non connectable peer, check connection and failed connection counters + // add non connectable peer; AddPeers triggers the manage loop which + // immediately attempts to connect via connectBalanced (the peer is in + // bin 1 which is below storageRadius 2, so connectNeighbours skips it). kad.AddPeers(nonConnPeer.Overlay) - waitCounter(t, &conns, 0) - waitCounter(t, &failedConns, 1) for range kademlia.MaxConnAttempts { time.Sleep(10 * time.Millisecond) kad.Trigger() - waitCounter(t, &failedConns, 1) } + // after maxConnAttempts (4) failed dials the peer must be pruned + waitCounterAtLeast(t, &failedConns, int32(kademlia.MaxConnAttempts)) waitAddressBookNotFound(t, ab, nonConnPeer.Overlay) } From 936648182c9fd4c61c994e073d35b91f6b60b718 Mon Sep 17 00:00:00 2001 From: sbackend Date: Tue, 14 Apr 2026 10:38:12 +0200 Subject: [PATCH 10/14] fix: clean up --- pkg/topology/kademlia/kademlia_test.go | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 399c60d6186..efd3664f59f 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -925,7 +925,10 @@ func TestAddressBookQuickPrune(t *testing.T) { // after maxConnAttempts (4) failed dials the peer must be pruned waitCounterAtLeast(t, &failedConns, int32(kademlia.MaxConnAttempts)) - waitAddressBookNotFound(t, ab, nonConnPeer.Overlay) + _, err = ab.Get(nonConnPeer.Overlay) + if !errors.Is(err, addressbook.ErrNotFound) { + t.Fatal(err) + } } func TestClosestPeer(t *testing.T) { @@ -2242,19 +2245,6 @@ func waitChanClosed(t *testing.T, ch <-chan struct{}) { } } -func waitAddressBookNotFound(t *testing.T, ab addressbook.Interface, addr swarm.Address) { - t.Helper() - - var err error - waitErr := spinlock.Wait(spinLockWaitTime, func() bool { - _, err = ab.Get(addr) - return errors.Is(err, addressbook.ErrNotFound) - }) - if waitErr != nil { - t.Fatalf("timed out waiting for addressbook prune. last error: %v", err) - } -} - func waitSnapshot(t *testing.T, kad *kademlia.Kad, po uint8, connected, population, binPopulation uint64) *topology.KadParams { t.Helper() From e634fd53e82da94867cc6971fc3cc0c3d323c464 Mon Sep 17 00:00:00 2001 From: sbackend Date: Tue, 14 Apr 2026 12:32:16 +0200 Subject: [PATCH 11/14] chore: remove ci-runner for flaky tests --- .github/workflows/go.yml | 15 --------------- Makefile | 12 ++++-------- pkg/api/bzz_test.go | 2 +- 3 files changed, 5 insertions(+), 24 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 681a3253496..51afeff511a 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -50,21 +50,6 @@ jobs: - name: Test without race detector (Windows) if: matrix.os == 'windows-latest' run: make test-ci - test-flaky: - name: Test (flaky) - runs-on: ubuntu-latest - continue-on-error: ${{ github.ref == 'refs/heads/master' }} - steps: - - name: Checkout - uses: actions/checkout@v5 - - name: Setup Go - uses: actions/setup-go@v6 - with: - cache: true - go-version-file: go.mod - - name: Run flaky test - run: make test-ci-flaky - continue-on-error: ${{ github.ref == 'refs/heads/master' }} lint: name: Lint runs-on: ubuntu-latest diff --git a/Makefile b/Makefile index a015bc84e38..645cbb31b5f 100644 --- a/Makefile +++ b/Makefile @@ -119,23 +119,19 @@ endif .PHONY: test-ci test-ci: ifdef cover - $(GO) test -run "[^FLAKY]$$" -coverprofile=cover.out ./... + $(GO) test -coverprofile=cover.out ./... else - $(GO) test -run "[^FLAKY]$$" ./... + $(GO) test ./... endif .PHONY: test-ci-race test-ci-race: ifdef cover - $(GO) test -race -run "[^FLAKY]$$" -coverprofile=cover.out ./... + $(GO) test -race -coverprofile=cover.out ./... else - $(GO) test -race -run "[^FLAKY]$$" ./... + $(GO) test -race ./... endif -.PHONY: test-ci-flaky -test-ci-flaky: - $(GO) test -race -run "FLAKY$$" ./... - .PHONY: build build: export CGO_ENABLED=0 build: diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 6327967189d..2ae43ce79f5 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -66,7 +66,7 @@ import ( // nolint:thelper // This test constantly fails, because of inconsistency usage of api.SwarmRedundancyLevelHeader (look issue github.com/ethersphere/bee/issues/5282) // after issue will be fixed, test will be not skipped anymore -func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { +func TestBzzUploadDownloadWithRedundancy(t *testing.T) { t.Parallel() fileUploadResource := "/bzz" fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" } From 85bfbe51813de38b87f9d3994a84f3d1dc902100 Mon Sep 17 00:00:00 2001 From: sbackend Date: Tue, 14 Apr 2026 12:37:33 +0200 Subject: [PATCH 12/14] fix: remove comment --- pkg/api/bzz_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 2ae43ce79f5..a2ff8630bc9 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -64,8 +64,6 @@ import ( // using redundancy to reconstruct the file and find the file recoverable. // // nolint:thelper -// This test constantly fails, because of inconsistency usage of api.SwarmRedundancyLevelHeader (look issue github.com/ethersphere/bee/issues/5282) -// after issue will be fixed, test will be not skipped anymore func TestBzzUploadDownloadWithRedundancy(t *testing.T) { t.Parallel() fileUploadResource := "/bzz" From 079ed30d34e704554713cb956fa78635fcc168ea Mon Sep 17 00:00:00 2001 From: sbackend Date: Tue, 14 Apr 2026 12:57:08 +0200 Subject: [PATCH 13/14] fix: review issues --- cmd/bee/cmd/db_test.go | 13 ++----- pkg/topology/kademlia/kademlia_test.go | 49 ++++++++++++-------------- 2 files changed, 25 insertions(+), 37 deletions(-) diff --git a/cmd/bee/cmd/db_test.go b/cmd/bee/cmd/db_test.go index 260bc40ed4b..8cad9b04e55 100644 --- a/cmd/bee/cmd/db_test.go +++ b/cmd/bee/cmd/db_test.go @@ -203,17 +203,10 @@ func TestDBNuke(t *testing.T) { db.Close() - // Retrying avoids a short OS-level race after db.Close(), where file handles + // Waiting avoids a short OS-level race after db.Close(), where file handles // may still be getting released and early removal can fail on some platforms. - backoff := 50 * time.Millisecond - for range 3 { - err = newCommand(t, cmd.WithArgs("db", "nuke", "--data-dir", dataDir)).Execute() - if err == nil { - break - } - time.Sleep(backoff) - backoff *= 2 - } + time.Sleep(2 * time.Second) + err = newCommand(t, cmd.WithArgs("db", "nuke", "--data-dir", dataDir)).Execute() if err != nil { t.Fatal(err) } diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index efd3664f59f..b0be11c8004 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -2231,16 +2231,10 @@ func waitBcast(t *testing.T, d *mock.Discovery, pivot swarm.Address, addrs ...sw func waitChanClosed(t *testing.T, ch <-chan struct{}) { t.Helper() - - err := spinlock.Wait(spinLockWaitTime, func() bool { - select { - case <-ch: - return true - default: - return false - } - }) - if err != nil { + select { + case <-ch: + return + case <-time.After(spinLockWaitTime): t.Fatal("timed out waiting for channel to close") } } @@ -2248,26 +2242,27 @@ func waitChanClosed(t *testing.T, ch <-chan struct{}) { func waitSnapshot(t *testing.T, kad *kademlia.Kad, po uint8, connected, population, binPopulation uint64) *topology.KadParams { t.Helper() - var snap *topology.KadParams + timeout := time.After(spinLockWaitTime) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() - if err := spinlock.Wait(spinLockWaitTime, func() bool { - snap = kad.Snapshot() - return uint64(snap.Connected) == connected && + for { + snap := kad.Snapshot() + if uint64(snap.Connected) == connected && uint64(snap.Population) == population && - getBinPopulation(&snap.Bins, po) == binPopulation - }); err != nil { - t.Fatalf( - "timed out waiting for snapshot. got connected=%d population=%d binPopulation=%d want connected=%d population=%d binPopulation=%d", - snap.Connected, - snap.Population, - getBinPopulation(&snap.Bins, po), - connected, - population, - binPopulation, - ) + getBinPopulation(&snap.Bins, po) == binPopulation { + return snap + } + select { + case <-ticker.C: + case <-timeout: + t.Fatalf( + "timed out waiting for snapshot. got connected=%d population=%d binPopulation=%d want connected=%d population=%d binPopulation=%d", + snap.Connected, snap.Population, getBinPopulation(&snap.Bins, po), + connected, population, binPopulation, + ) + } } - - return snap } // waitBalanced waits for kademlia to be balanced for specified bin. From 952a49be52035b95ed963fdfd4c7bbae6e462f93 Mon Sep 17 00:00:00 2001 From: sbackend Date: Tue, 14 Apr 2026 13:07:14 +0200 Subject: [PATCH 14/14] fix: use go channels instead of spinlock --- pkg/pushsync/pushsync_test.go | 21 +++++++++++++++------ pkg/topology/kademlia/kademlia_test.go | 25 +++++++++++++++---------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index 8931538f4fd..debc6554f94 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -26,7 +26,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/pushsync" "github.com/ethersphere/bee/v2/pkg/pushsync/pb" "github.com/ethersphere/bee/v2/pkg/soc" - "github.com/ethersphere/bee/v2/pkg/spinlock" stabilmock "github.com/ethersphere/bee/v2/pkg/stabilization/mock" "github.com/ethersphere/bee/v2/pkg/storage" testingc "github.com/ethersphere/bee/v2/pkg/storage/testing" @@ -528,13 +527,23 @@ func anyPeerHasValidPushStream(t *testing.T, recorder *streamtest.Recorder, peer // opened to each peer, i.e. the pivot attempted another closest after the first path. func waitPushStreamsToBothClosestPeers(t *testing.T, recorder *streamtest.Recorder, peer1, peer2 swarm.Address) { t.Helper() - err := spinlock.Wait(pushNextClosestSpinlockWait, func() bool { + + timeout := time.After(pushNextClosestSpinlockWait) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for { r1, e1 := recorder.Records(peer1, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName) r2, e2 := recorder.Records(peer2, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName) - return e1 == nil && len(r1) > 0 && e2 == nil && len(r2) > 0 - }) - if err != nil { - t.Fatal("timed out waiting for push streams to both closest peers") + if e1 == nil && len(r1) > 0 && e2 == nil && len(r2) > 0 { + return + } + select { + case <-ticker.C: + case <-timeout: + t.Fatal("timed out waiting for push streams to both closest peers") + return + } } } diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index b0be11c8004..d8cf74b5f21 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -2170,18 +2170,23 @@ func waitCounter(t *testing.T, conns *int32, exp int32) { func waitCounterAtLeast(t *testing.T, conns *int32, exp int32) { t.Helper() - var got int32 - err := spinlock.Wait(spinLockWaitTime, func() bool { - got = atomic.LoadInt32(conns) - if got < exp { - return false + timeout := time.After(spinLockWaitTime) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + got := atomic.LoadInt32(conns) + if got >= exp { + atomic.StoreInt32(conns, 0) + return + } + select { + case <-ticker.C: + case <-timeout: + t.Fatalf("timed out waiting for counter to reach at least expected value. got %d want >= %d", got, exp) + return } - atomic.StoreInt32(conns, 0) - return true - }) - if err != nil { - t.Fatalf("timed out waiting for counter to reach at least expected value. got %d want >= %d", got, exp) } }