Skip to content

Commit

Permalink
fix(getter): redundancy getter cleanup (#4610)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Mar 15, 2024
1 parent 776f546 commit ef7c97d
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 94 deletions.
3 changes: 2 additions & 1 deletion pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
//
// nolint:thelper
func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) {
t.Skip("flaky")
t.Parallel()
fileUploadResource := "/bzz"
fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" }
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) {
})

t.Run("download with redundancy should succeed", func(t *testing.T) {
req, err := http.NewRequestWithContext(context.TODO(), "GET", fileDownloadResource(refResponse.Reference.String()), nil)
req, err := http.NewRequestWithContext(context.Background(), "GET", fileDownloadResource(refResponse.Reference.String()), nil)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ type UpgradedResponseWriter interface {
http.Pusher
http.Hijacker
http.Flusher
// staticcheck SA1019 CloseNotifier interface is required by gorilla compress handler
// nolint:staticcheck
http.CloseNotifier
}

type responseWriter struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,16 @@ func fingerprint(addrs []swarm.Address) string {

// GetOrCreate returns a decoder for the given chunk address
func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter {

// since a recovery decoder is not allowed, simply return the underlying netstore
if g.config.Strict && g.config.Strategy == getter.NONE {
return g.fetcher
}

if len(addrs) == shardCnt {
return g.fetcher
}

key := fingerprint(addrs)
g.mu.Lock()
defer g.mu.Unlock()
Expand Down
149 changes: 62 additions & 87 deletions pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package getter
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"

Expand All @@ -26,7 +25,6 @@ var (
// if retrieves children of an intermediate chunk potentially using erasure decoding
// it caches sibling chunks if erasure decoding started already
type decoder struct {
ctx context.Context
fetcher storage.Getter // network retrieval interface to fetch chunks
putter storage.Putter // interface to local storage to save reconstructed chunks
addrs []swarm.Address // all addresses of the intermediate chunk
Expand All @@ -36,32 +34,23 @@ type decoder struct {
rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding
goodRecovery chan struct{} // signal channel for successful retrieval of shardCnt chunks
badRecovery chan struct{} // signals that either the recovery has failed or not allowed to run
initRecovery chan struct{} // signals that the recovery has been initialized
lastLen int // length of the last data chunk in the RS buffer
shardCnt int // number of data shards
parityCnt int // number of parity shards
wg sync.WaitGroup // wait group to wait for all goroutines to finish
mu sync.Mutex // mutex to protect buffer
fetchedCnt atomic.Int32 // count successful retrievals
failedCnt atomic.Int32 // count successful retrievals
cancel func() // cancel function for RS decoding
remove func(error) // callback to remove decoder from decoders cache
config Config // configuration
logger log.Logger
}

type Getter interface {
storage.Getter
io.Closer
}

// New returns a decoder object used to retrieve children of an intermediate chunk
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter {
// global context is canceled when the Close is called or when the prefetch terminates
ctx, cancel := context.WithCancel(context.Background())
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) storage.Getter {
size := len(addrs)

d := &decoder{
ctx: ctx,
fetcher: g,
putter: p,
addrs: addrs,
Expand All @@ -71,7 +60,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
rsbuf: make([][]byte, size),
goodRecovery: make(chan struct{}),
badRecovery: make(chan struct{}),
cancel: cancel,
initRecovery: make(chan struct{}),
remove: remove,
shardCnt: shardCnt,
parityCnt: size - shardCnt,
Expand All @@ -89,16 +78,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
d.waits[i] = make(chan error)
}

// prefetch chunks according to strategy
if !conf.Strict || conf.Strategy != NONE {
d.wg.Add(1)
go func() {
defer d.wg.Done()
_ = d.prefetch(ctx)
}()
} else { // recovery not allowed
close(d.badRecovery)
}
go d.prefetch()

return d
}
Expand Down Expand Up @@ -138,21 +118,30 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
}
}

// recovery has started, wait for result instead of fetching from the network
select {
case <-g.initRecovery:
return waitRecovery(nil)
default:
}

// first time
if g.fly(i) {

fctx, cancel := context.WithTimeout(ctx, g.config.FetchTimeout)
defer cancel()

g.wg.Add(1)
go func() {
select {
case <-fctx.Done(): // local context
case <-g.ctx.Done(): // global context
}
cancel()
g.wg.Done()
}()
// when the recovery is triggered, we can terminate any inflight requests.
// we do the extra bool check to not fire an unnecessary goroutine
if waitForRecovery {
go func() {
defer cancel()
select {
case <-g.initRecovery:
case <-fctx.Done():
}
}()
}

// retrieval
ch, err := g.fetcher.Get(fctx, g.addrs[i])
Expand Down Expand Up @@ -181,47 +170,45 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
return waitRecovery(storage.ErrNotFound)
}

func (g *decoder) prefetch(ctx context.Context) (err error) {
defer g.remove(err)
defer g.cancel()
func (g *decoder) prefetch() {

run := func(s Strategy) error {
if err := g.runStrategy(ctx, s); err != nil {
return err
var err error
defer func() {
if err != nil {
close(g.badRecovery)
} else {
close(g.goodRecovery)
}
g.remove(err)
}()

return g.recover(ctx)
}

for s := g.config.Strategy; s < strategyCnt; s++ {
s := g.config.Strategy
for ; s < strategyCnt; s++ {

err = run(s)
if err != nil {
if s == DATA || s == RACE {
g.logger.Debug("failed recovery", "strategy", s)
}
}
if err == nil {
if s > DATA {
g.logger.Debug("successful recovery", "strategy", s)
}
close(g.goodRecovery)
break
err = g.runStrategy(s)
if err != nil && s == DATA || s == RACE {
g.logger.Debug("failed strategy", "strategy", s)
}
if g.config.Strict { // only run one strategy

if err == nil || g.config.Strict {
break
}
}

if err != nil {
close(g.badRecovery)
return err
return
}

close(g.initRecovery)

err = g.recover()
if err == nil && s > DATA {
g.logger.Debug("successful recovery", "strategy", s)
}

return err
}

func (g *decoder) runStrategy(ctx context.Context, s Strategy) error {
func (g *decoder) runStrategy(s Strategy) error {

// across the different strategies, the common goal is to fetch at least as many chunks
// as the number of data shards.
Expand Down Expand Up @@ -256,34 +243,32 @@ func (g *decoder) runStrategy(ctx context.Context, s Strategy) error {

c := make(chan error, len(m))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, i := range m {
g.wg.Add(1)
go func(i int) {
defer g.wg.Done()
c <- g.fetch(ctx, i, false)
}(i)
}

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-c:
if g.fetchedCnt.Load() >= int32(g.shardCnt) {
return nil
}
if g.failedCnt.Load() > int32(allowedErrs) {
return errStrategyFailed
}
for range c {
if g.fetchedCnt.Load() >= int32(g.shardCnt) {
return nil
}
if g.failedCnt.Load() > int32(allowedErrs) {
return errStrategyFailed
}
}

return nil
}

// recover wraps the stages of data shard recovery:
// 1. gather missing data shards
// 2. decode using Reed-Solomon decoder
// 3. save reconstructed chunks
func (g *decoder) recover(ctx context.Context) error {
func (g *decoder) recover() error {
// gather missing shards
m := g.missingDataShards()
if len(m) == 0 {
Expand All @@ -296,7 +281,7 @@ func (g *decoder) recover(ctx context.Context) error {
}

// save chunks
return g.save(ctx, m)
return g.save(m)
}

// decode uses Reed-Solomon erasure coding decoder to recover data shards
Expand Down Expand Up @@ -369,23 +354,13 @@ func (g *decoder) fly(i int) (success bool) {
}

// save iterate over reconstructed shards and puts the corresponding chunks to local storage
func (g *decoder) save(ctx context.Context, missing []int) error {
func (g *decoder) save(missing []int) error {
g.mu.Lock()
defer g.mu.Unlock()
for _, i := range missing {
if err := g.putter.Put(ctx, swarm.NewChunk(g.addrs[i], g.rsbuf[i])); err != nil {
if err := g.putter.Put(context.Background(), swarm.NewChunk(g.addrs[i], g.rsbuf[i])); err != nil {
return err
}
}
return nil
}

// Close terminates the prefetch loop, waits for all goroutines to finish and
// removes the decoder from the cache
// it implements the io.Closer interface
func (g *decoder) Close() error {
g.cancel()
g.wg.Wait()
g.remove(nil)
return nil
}
3 changes: 0 additions & 3 deletions pkg/file/redundancy/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
inmem "github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
mockstorer "github.com/ethersphere/bee/pkg/storer/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/util/testutil"
"github.com/klauspost/reedsolomon"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -112,7 +111,6 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
}

g := getter.New(addrs, shardCnt, store, store, func(error) {}, getter.DefaultConfig)
testutil.CleanupCloser(t, g)

parityCnt := len(buf) - shardCnt
_, err := g.Get(context.Background(), addr)
Expand Down Expand Up @@ -176,7 +174,6 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) {
FetchTimeout: strategyTimeout / 2,
}
g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf)
defer g.Close()

// launch delayed and erased chunk retrieval
wg := sync.WaitGroup{}
Expand Down

0 comments on commit ef7c97d

Please sign in to comment.