Skip to content

Commit

Permalink
perf(getter): cancel inflight requests if enough chunks are fetched f…
Browse files Browse the repository at this point in the history
…or recovery (#4608)
  • Loading branch information
istae committed Mar 6, 2024
1 parent f7b3586 commit d3d6685
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 112 deletions.
8 changes: 2 additions & 6 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,8 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
ls := loadsave.NewReadonly(s.storer.Download(cache))
feedDereferenced := false

strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down Expand Up @@ -537,10 +535,8 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
cache = *headers.Cache
}

strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import (
// using redundancy to reconstruct the file and find the file recoverable.
//
// nolint:thelper
func TestBzzUploadDownloadWithRedundancy(t *testing.T) {
func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) {
t.Parallel()
fileUploadResource := "/bzz"
fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" }
Expand Down
10 changes: 8 additions & 2 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,16 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
}
return d
}
remove := func() {
remove := func(err error) {
g.mu.Lock()
defer g.mu.Unlock()
g.cache[key] = nil
if err != nil {
// signals that a new getter is needed to reattempt to recover the data
delete(g.cache, key)
} else {
// signals that the chunks were fetched/recovered/cached so a future getter is not needed
g.cache[key] = nil
}
}
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, remove, g.config)
g.cache[key] = d
Expand Down
32 changes: 14 additions & 18 deletions pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,17 +1109,15 @@ func TestJoinerRedundancy(t *testing.T) {
if err != nil {
t.Fatal(err)
}
strategyTimeout := 100 * time.Millisecond
// all data can be read back
readCheck := func(t *testing.T, expErr error) {
ctx := context.Background()

strategyTimeoutStr := strategyTimeout.String()
decodeTimeoutStr := (10 * strategyTimeout).String()
decodeTimeoutStr := time.Second.String()
fallback := true
s := getter.RACE

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr, log.Noop)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1229,7 +1227,7 @@ func TestJoinerRedundancy(t *testing.T) {
// nolint:thelper
func TestJoinerRedundancyMultilevel(t *testing.T) {
t.Parallel()
test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, levels, size int) {
test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, size int) {
t.Helper()
store := mockstorer.NewForgettingStore(inmemchunkstore.New())
testutil.CleanupCloser(t, store)
Expand All @@ -1249,14 +1247,12 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
expRead := swarm.ChunkSize
buf := make([]byte, expRead)
offset := mrand.Intn(size) * expRead
canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, levels int, canRead bool) {
canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, canRead bool) {
ctx := context.Background()
strategyTimeout := 100 * time.Millisecond

strategyTimeoutStr := strategyTimeout.String()
decodingTimeoutStr := (2 * strategyTimeout).String()
decodingTimeoutStr := (200 * time.Millisecond).String()

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr, log.Noop)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1295,35 +1291,35 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
t.Run("NONE w/o fallback CAN retrieve", func(t *testing.T) {
store.Record()
defer store.Unrecord()
canReadRange(t, getter.NONE, false, levels, true)
canReadRange(t, getter.NONE, false, true)
})

// do not forget the root chunk
store.Unmiss(swarm.NewAddress(addr.Bytes()[:swarm.HashSize]))
// after we forget the chunks on the way to the range, we should not be able to retrieve
t.Run("NONE w/o fallback CANNOT retrieve", func(t *testing.T) {
canReadRange(t, getter.NONE, false, levels, false)
canReadRange(t, getter.NONE, false, false)
})

// we lost a data chunk, we cannot recover using DATA only strategy with no fallback
t.Run("DATA w/o fallback CANNOT retrieve", func(t *testing.T) {
canReadRange(t, getter.DATA, false, levels, false)
canReadRange(t, getter.DATA, false, false)
})

if rLevel == 0 {
// allowing fallback mode will not help if no redundancy used for upload
t.Run("DATA w fallback CANNOT retrieve", func(t *testing.T) {
canReadRange(t, getter.DATA, true, levels, false)
canReadRange(t, getter.DATA, true, false)
})
return
}
// allowing fallback mode will make the range retrievable using erasure decoding
t.Run("DATA w fallback CAN retrieve", func(t *testing.T) {
canReadRange(t, getter.DATA, true, levels, true)
canReadRange(t, getter.DATA, true, true)
})
// after the reconstructed data is stored, we can retrieve the range using DATA only mode
t.Run("after recovery, NONE w/o fallback CAN retrieve", func(t *testing.T) {
canReadRange(t, getter.NONE, false, levels, true)
canReadRange(t, getter.NONE, false, true)
})
}
r2level := []int{2, 1, 2, 3, 2}
Expand Down Expand Up @@ -1353,7 +1349,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
if r2level[rLevel] != levels || encrypt != encryptChunk[rLevel] {
t.Skip("skipping to save time")
}
test(t, rLevel, encrypt, levels, chunkCnt)
test(t, rLevel, encrypt, chunkCnt)
})
switch levels {
case 1:
Expand All @@ -1364,7 +1360,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
continue
}
t.Run(fmt.Sprintf("encrypt=%v levels=%d chunks=%d full", encrypt, levels, chunkCnt), func(t *testing.T) {
test(t, rLevel, encrypt, levels, chunkCnt)
test(t, rLevel, encrypt, chunkCnt)
})
}
}
Expand Down
27 changes: 18 additions & 9 deletions pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
// if retrieves children of an intermediate chunk potentially using erasure decoding
// it caches sibling chunks if erasure decoding started already
type decoder struct {
ctx context.Context
fetcher storage.Getter // network retrieval interface to fetch chunks
putter storage.Putter // interface to local storage to save reconstructed chunks
addrs []swarm.Address // all addresses of the intermediate chunk
Expand All @@ -40,11 +41,10 @@ type decoder struct {
parityCnt int // number of parity shards
wg sync.WaitGroup // wait group to wait for all goroutines to finish
mu sync.Mutex // mutex to protect buffer
err error // error of the last erasure decoding
fetchedCnt atomic.Int32 // count successful retrievals
failedCnt atomic.Int32 // count successful retrievals
cancel func() // cancel function for RS decoding
remove func() // callback to remove decoder from decoders cache
remove func(error) // callback to remove decoder from decoders cache
config Config // configuration
logger log.Logger
}
Expand All @@ -55,11 +55,13 @@ type Getter interface {
}

// New returns a decoder object used to retrieve children of an intermediate chunk
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(), conf Config) Getter {
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter {
// global context is canceled when the Close is called or when the prefetch terminates
ctx, cancel := context.WithCancel(context.Background())
size := len(addrs)

d := &decoder{
ctx: ctx,
fetcher: g,
putter: p,
addrs: addrs,
Expand Down Expand Up @@ -92,7 +94,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.err = d.prefetch(ctx)
_ = d.prefetch(ctx)
}()
} else { // recovery not allowed
close(d.badRecovery)
Expand Down Expand Up @@ -143,7 +145,14 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
defer cancel()

g.wg.Add(1)
defer g.wg.Done()
go func() {
select {
case <-fctx.Done(): // local context
case <-g.ctx.Done(): // global context
}
cancel()
g.wg.Done()
}()

// retrieval
ch, err := g.fetcher.Get(fctx, g.addrs[i])
Expand Down Expand Up @@ -172,8 +181,9 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
return waitRecovery(storage.ErrNotFound)
}

func (g *decoder) prefetch(ctx context.Context) error {
defer g.remove()
func (g *decoder) prefetch(ctx context.Context) (err error) {
defer g.remove(err)
defer g.cancel()

run := func(s Strategy) error {
if err := g.runStrategy(ctx, s); err != nil {
Expand All @@ -183,7 +193,6 @@ func (g *decoder) prefetch(ctx context.Context) error {
return g.recover(ctx)
}

var err error
for s := g.config.Strategy; s < strategyCnt; s++ {

err = run(s)
Expand Down Expand Up @@ -377,6 +386,6 @@ func (g *decoder) save(ctx context.Context, missing []int) error {
func (g *decoder) Close() error {
g.cancel()
g.wg.Wait()
g.remove()
g.remove(nil)
return nil
}
46 changes: 11 additions & 35 deletions pkg/file/redundancy/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ import (

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/file/redundancy/getter"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/storage"
inmem "github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
mockstorer "github.com/ethersphere/bee/pkg/storer/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/util/testutil/racedetection"
"github.com/ethersphere/bee/pkg/util/testutil"
"github.com/klauspost/reedsolomon"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -96,10 +95,6 @@ func TestGetterFallback(t *testing.T) {

func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
t.Helper()
strategyTimeout := 100 * time.Millisecond
if racedetection.On {
strategyTimeout *= 2
}
store := inmem.New()
buf := make([][]byte, bufSize)
addrs := initData(t, buf, shardCnt, store)
Expand All @@ -115,31 +110,13 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
if len(addr.Bytes()) == 0 {
t.Skip("no data shard erased")
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
conf := getter.Config{
Strategy: getter.RACE,
FetchTimeout: 2 * strategyTimeout,
StrategyTimeout: strategyTimeout,
Logger: log.Noop,
}
g := getter.New(addrs, shardCnt, store, store, func() {}, conf)
defer g.Close()

g := getter.New(addrs, shardCnt, store, store, func(error) {}, getter.DefaultConfig)
testutil.CleanupCloser(t, g)

parityCnt := len(buf) - shardCnt
q := make(chan error, 1)
go func() {
_, err := g.Get(ctx, addr)
q <- err
}()
err := context.DeadlineExceeded
wait := strategyTimeout * 2
if racedetection.On {
wait *= 2
}
select {
case err = <-q:
case <-time.After(wait):
}
_, err := g.Get(context.Background(), addr)

switch {
case erasureCnt > parityCnt:
t.Run("unable to recover", func(t *testing.T) {
Expand Down Expand Up @@ -194,12 +171,11 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) {
// create getter
start := time.Now()
conf := getter.Config{
Strategy: s,
Strict: strict,
FetchTimeout: strategyTimeout / 2,
StrategyTimeout: strategyTimeout,
Strategy: s,
Strict: strict,
FetchTimeout: strategyTimeout / 2,
}
g := getter.New(addrs, shardCnt, store, store, func() {}, conf)
g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf)
defer g.Close()

// launch delayed and erased chunk retrieval
Expand Down

0 comments on commit d3d6685

Please sign in to comment.