Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,6 @@ jobs:
- name: Test without race detector (Windows)
if: matrix.os == 'windows-latest'
run: make test-ci
test-flaky:
name: Test (flaky)
runs-on: ubuntu-latest
continue-on-error: ${{ github.ref == 'refs/heads/master' }}
steps:
- name: Checkout
uses: actions/checkout@v5
- name: Setup Go
uses: actions/setup-go@v6
with:
cache: true
go-version-file: go.mod
- name: Run flaky test
run: make test-ci-flaky
continue-on-error: ${{ github.ref == 'refs/heads/master' }}
lint:
name: Lint
runs-on: ubuntu-latest
Expand Down
12 changes: 4 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,19 @@ endif
.PHONY: test-ci
test-ci:
ifdef cover
$(GO) test -run "[^FLAKY]$$" -coverprofile=cover.out ./...
$(GO) test -coverprofile=cover.out ./...
else
$(GO) test -run "[^FLAKY]$$" ./...
$(GO) test ./...
endif

.PHONY: test-ci-race
test-ci-race:
ifdef cover
$(GO) test -race -run "[^FLAKY]$$" -coverprofile=cover.out ./...
$(GO) test -race -coverprofile=cover.out ./...
else
$(GO) test -race -run "[^FLAKY]$$" ./...
$(GO) test -race ./...
endif

.PHONY: test-ci-flaky
test-ci-flaky:
$(GO) test -race -run "FLAKY$$" ./...

.PHONY: build
build: export CGO_ENABLED=0
build:
Expand Down
7 changes: 5 additions & 2 deletions cmd/bee/cmd/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path"
"strings"
"testing"
"time"

"github.com/ethersphere/bee/v2/cmd/bee/cmd"
"github.com/ethersphere/bee/v2/pkg/log"
Expand Down Expand Up @@ -172,8 +173,7 @@ func TestDBExportImportPinning(t *testing.T) {
}
}

// TestDBNuke_FLAKY is flaky on windows.
func TestDBNuke_FLAKY(t *testing.T) {
func TestDBNuke(t *testing.T) {
t.Parallel()

dataDir := t.TempDir()
Expand Down Expand Up @@ -203,6 +203,9 @@ func TestDBNuke_FLAKY(t *testing.T) {

db.Close()

// Waiting avoids a short OS-level race after db.Close(), where file handles
// may still be getting released and early removal can fail on some platforms.
time.Sleep(2 * time.Second)
Copy link
Copy Markdown
Contributor

@akrem-chabchoub akrem-chabchoub Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to replace this with a condition based wait/retry to ensure more deterministic behavior across slower ci environments rather than setting hardcoded time?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akrem-chabchoub there was something else here before that was a retry... see my previous comment here. if you have any suggestions - welcome (but code example would also be good)

err = newCommand(t, cmd.WithArgs("db", "nuke", "--data-dir", dataDir)).Execute()
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
}

ctx := r.Context()
ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel)
ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), rLevel)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

feedDereferenced := false

ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger)
Expand Down
3 changes: 1 addition & 2 deletions pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ import (
// using redundancy to reconstruct the file and find the file recoverable.
//
// nolint:thelper
func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) {
t.Skip("flaky")
func TestBzzUploadDownloadWithRedundancy(t *testing.T) {
t.Parallel()
fileUploadResource := "/bzz"
fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" }
Expand Down
2 changes: 1 addition & 1 deletion pkg/feeds/epochs/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
storage "github.com/ethersphere/bee/v2/pkg/storage"
)

func TestFinder_FLAKY(t *testing.T) {
func TestFinder(t *testing.T) {
t.Parallel()

testf := func(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/feeds/testing/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func TestFinderFixIntervals(t *testing.T, nextf func() (bool, int64), finderf fu

func TestFinderIntervals(t *testing.T, nextf func() (bool, int64), finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
storer := &Timeout{inmemchunkstore.New()}
rng := rand.New(rand.NewSource(1))
topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil {
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestFinderIntervals(t *testing.T, nextf func() (bool, int64), finderf func(
for j := 0; j < len(ats)-1; j++ {
at := ats[j]
diff := ats[j+1] - at
for now := at; now < ats[j+1]; now += int64(rand.Intn(int(diff)) + 1) {
for now := at; now < ats[j+1]; now += int64(rng.Intn(int(diff)) + 1) {
after := uint64(0)
ch, current, next, err := finder.At(ctx, now, after)
if err != nil {
Expand Down Expand Up @@ -190,10 +191,11 @@ func TestFinderRandomIntervals(t *testing.T, finderf func(storage.Getter, *feeds
t.Run(fmt.Sprintf("random intervals %d", j), func(t *testing.T) {
t.Parallel()

rng := rand.New(rand.NewSource(int64(j + 1)))
var i int64
var n int
nextf := func() (bool, int64) {
i += int64(rand.Intn(1<<10) + 1)
i += int64(rng.Intn(1<<10) + 1)
n++
return n == 40, i
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/file/redundancy/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,27 @@ import (

// TestGetter tests the retrieval of chunks with missing data shards
// using the RACE strategy for a number of erasure code parameters
func TestGetterRACE_FLAKY(t *testing.T) {
func TestGetterRACE(t *testing.T) {
type getterTest struct {
bufSize int
shardCnt int
erasureCnt int
}

rng := mrand.New(mrand.NewSource(1))
var tcs []getterTest
for bufSize := 3; bufSize <= 128; bufSize += 21 {
for shardCnt := bufSize/2 + 1; shardCnt <= bufSize; shardCnt += 21 {
parityCnt := bufSize - shardCnt
erasures := mrand.Perm(parityCnt - 1)
erasures := rng.Perm(parityCnt - 1)
if len(erasures) > 3 {
erasures = erasures[:3]
}
for _, erasureCnt := range erasures {
tcs = append(tcs, getterTest{bufSize, shardCnt, erasureCnt})
}
tcs = append(tcs, getterTest{bufSize, shardCnt, parityCnt}, getterTest{bufSize, shardCnt, parityCnt + 1})
erasures = mrand.Perm(shardCnt - 1)
erasures = rng.Perm(shardCnt - 1)
if len(erasures) > 3 {
erasures = erasures[:3]
}
Expand Down Expand Up @@ -99,7 +100,7 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
addrs := initData(t, buf, shardCnt, store)

var addr swarm.Address
erasures := forget(t, store, addrs, erasureCnt)
erasures := forget(t, store, addrs, erasureCnt, deterministicSeed(bufSize, shardCnt, erasureCnt))
for _, i := range erasures {
if i < shardCnt {
addr = addrs[i]
Expand Down Expand Up @@ -345,11 +346,12 @@ func checkShardsAvailable(t *testing.T, s storage.ChunkStore, addrs []swarm.Addr
}
}

func forget(t *testing.T, store storage.ChunkStore, addrs []swarm.Address, erasureCnt int) (erasures []int) {
func forget(t *testing.T, store storage.ChunkStore, addrs []swarm.Address, erasureCnt int, seed int64) (erasures []int) {
t.Helper()

ctx := context.TODO()
erasures = mrand.Perm(len(addrs))[:erasureCnt]
rng := mrand.New(mrand.NewSource(seed))
erasures = rng.Perm(len(addrs))[:erasureCnt]
for _, i := range erasures {
err := store.Delete(ctx, addrs[i])
if err != nil {
Expand All @@ -358,3 +360,7 @@ func forget(t *testing.T, store storage.ChunkStore, addrs []swarm.Address, erasu
}
return erasures
}

func deterministicSeed(bufSize, shardCnt, erasureCnt int) int64 {
return int64(bufSize)<<32 | int64(shardCnt)<<16 | int64(erasureCnt)
}
130 changes: 108 additions & 22 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (
)

const (
fixedPrice = uint64(10)
fixedPrice = uint64(10)
pushNextClosestSpinlockWait = 5 * time.Second
)

var blockHash = common.HexToHash("0x1")
Expand Down Expand Up @@ -383,7 +384,6 @@ func TestPushChunkToClosest(t *testing.T) {

func TestPushChunkToNextClosest(t *testing.T) {
t.Parallel()
t.Skip("flaky test")

// chunk data to upload
chunk := testingc.FixtureChunk("7000").WithTagID(1)
Expand Down Expand Up @@ -446,12 +446,6 @@ func TestPushChunkToNextClosest(t *testing.T) {
t.Fatal("invalid receipt")
}

// this intercepts the outgoing delivery message
waitOnRecordAndTest(t, peer2, recorder, chunk.Address(), chunk.Data())

// this intercepts the incoming receipt message
waitOnRecordAndTest(t, peer2, recorder, chunk.Address(), nil)

found, count := pivotStorer.hasReported(t, chunk.Address())
if !found {
t.Fatalf("chunk %s not reported", chunk.Address())
Expand All @@ -464,32 +458,124 @@ func TestPushChunkToNextClosest(t *testing.T) {
t.Fatalf("tags error got %d want >= 1", count)
}

balance, err := pivotAccounting.Balance(peer1)
waitPushStreamsToBothClosestPeers(t, recorder, peer1, peer2)

// The storer that applied PrepareDebit books +price toward the pivot.
peerThatEarnedFromPush(t, peer1, peerAccounting1, peer2, peerAccounting2, pivotNode)
if !anyPeerHasValidPushStream(t, recorder, []swarm.Address{peer1, peer2}, chunk) {
t.Fatal("expected at least one closest peer with a full delivery+receipt stream record")
}
b1, err := pivotAccounting.Balance(peer1)
if err != nil {
t.Fatal(err)
}

if balance.Int64() != -int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
}

balance2, err := peerAccounting2.Balance(pivotNode)
b2, err := pivotAccounting.Balance(peer2)
if err != nil {
t.Fatal(err)
}
sum := b1.Int64() + b2.Int64()
if sum != -int64(fixedPrice) {
t.Fatalf("pivot combined balance toward closest peers: want %d got %d (%s=%d %s=%d)",
-int64(fixedPrice), sum, peer1, b1.Int64(), peer2, b2.Int64())
}
}

// peerThatEarnedFromPush asserts exactly one closest peer has balance +fixedPrice toward the pivot (successful storer debit).
func peerThatEarnedFromPush(
t *testing.T,
peer1 swarm.Address, peerAcct1 accounting.Interface,
peer2 swarm.Address, peerAcct2 accounting.Interface,
pivot swarm.Address,
) {
t.Helper()
want := int64(fixedPrice)
n := 0
for _, p := range []struct {
addr swarm.Address
acct accounting.Interface
}{{peer1, peerAcct1}, {peer2, peerAcct2}} {
b, err := p.acct.Balance(pivot)
if err != nil {
t.Fatal(err)
}
if b.Int64() == want {
n++
}
}
if n != 1 {
t.Fatalf("want exactly one peer with balance +%d toward pivot, got %d", fixedPrice, n)
}
}

if balance2.Int64() != int64(fixedPrice) {
t.Fatalf("unexpected balance on peer2. want %d got %d", int64(fixedPrice), balance2)
func anyPeerHasValidPushStream(t *testing.T, recorder *streamtest.Recorder, peers []swarm.Address, chunk swarm.Chunk) bool {
t.Helper()
for _, p := range peers {
recs, err := recorder.Records(p, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName)
if err != nil {
continue
}
for _, rec := range recs {
if pushStreamRecordValid(rec, chunk) {
return true
}
}
}
return false
}

balance1, err := peerAccounting1.Balance(peer2)
if err != nil {
t.Fatal(err)
// waitPushStreamsToBothClosestPeers waits until the recorder shows at least one push stream
// opened to each peer, i.e. the pivot attempted another closest after the first path.
func waitPushStreamsToBothClosestPeers(t *testing.T, recorder *streamtest.Recorder, peer1, peer2 swarm.Address) {
t.Helper()

timeout := time.After(pushNextClosestSpinlockWait)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()

for {
r1, e1 := recorder.Records(peer1, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName)
r2, e2 := recorder.Records(peer2, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName)
if e1 == nil && len(r1) > 0 && e2 == nil && len(r2) > 0 {
return
}
select {
case <-ticker.C:
case <-timeout:
t.Fatal("timed out waiting for push streams to both closest peers")
return
}
}
}

if balance1.Int64() != 0 {
t.Fatalf("unexpected balance on peer1. want %d got %d", 0, balance1)
func pushStreamRecordValid(rec *streamtest.Record, ch swarm.Chunk) bool {
in := rec.In()
if len(in) == 0 {
return false
}
dmsgs, err := protobuf.ReadMessages(
bytes.NewReader(in),
func() protobuf.Message { return new(pb.Delivery) },
)
if err != nil || len(dmsgs) != 1 {
return false
}
d := dmsgs[0].(*pb.Delivery)
if !bytes.Equal(d.Address, ch.Address().Bytes()) || !bytes.Equal(d.Data, ch.Data()) {
return false
}
out := rec.Out()
if len(out) == 0 {
return false
}
rmsgs, err := protobuf.ReadMessages(
bytes.NewReader(out),
func() protobuf.Message { return new(pb.Receipt) },
)
if err != nil || len(rmsgs) != 1 {
return false
}
rcpt := rmsgs[0].(*pb.Receipt)
return swarm.NewAddress(rcpt.Address).Equal(ch.Address())
}

func TestPushChunkToClosestErrorAttemptRetry(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package cache

import (
"io"

"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/storageutil"
lru "github.com/hashicorp/golang-lru/v2"
Expand Down Expand Up @@ -98,5 +100,8 @@ func (c *Cache) Delete(i storage.Item) error {

func (c *Cache) Close() error {
c.lru.Purge()
if closer, ok := c.IndexStore.(io.Closer); ok {
return closer.Close()
}
return nil
}
Loading
Loading