Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
feat: modify pool tests (#153)
Browse files Browse the repository at this point in the history
* feat: modify pool tests

* fix: top n node selection from heap

* feat: add more comprehensive tests

* enhancement: add refresh no to tests

* go fmt

* remove unused metrics

* put back trace

* response size does not include header

* reset retry counter only if progress is made

* update go-car

* dont drain response body

* send verification errors to Saturn

* pool tier promotion

* otel and send trace id to Saturn

* stabilize dynamics tests

* mirroring parallel

* pool-target-size through config to better test dynamics

* down to flakiness

* add substitution (rough)

* use new orchestrator API

* fix: top N selection

* enhancement: increase test size

* feat: Add tests for affinity

* test cache affinity

* test cache affinity

* remove assert

* fix test

* address review

* feat: port compliance cids

* fix: remove unused code

* modify harness

* feat: add core attr to trace span

* fix CI

* improve error classification (#165)

---------

Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Will Scott <will.scott@protocol.ai>
  • Loading branch information
3 people committed Oct 10, 2023
1 parent 9989d73 commit 33db988
Show file tree
Hide file tree
Showing 20 changed files with 830 additions and 412 deletions.
59 changes: 50 additions & 9 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ package caboose

import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

requestcontext "github.com/willscott/go-requestcontext"

ipfsblockstore "github.com/ipfs/boxo/blockstore"
ipath "github.com/ipfs/boxo/coreiface/path"
gateway "github.com/ipfs/boxo/gateway"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand All @@ -30,7 +34,7 @@ type Config struct {
// OrchestratorClient is the HTTP client to use when communicating with the orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string
OrchestratorOverride []state.NodeInfo

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests.
LoggingEndpoint url.URL
Expand All @@ -55,6 +59,9 @@ type Config struct {
// PoolRefresh is the interval at which we refresh the pool of upstreams from the orchestrator.
PoolRefresh time.Duration

// PoolTargetSize is a baseline size for the pool - the pool will accept decrements in performance to reach maintain at least this size.
PoolTargetSize int

// MirrorFraction is what fraction of requests will be mirrored to another random node in order to track metrics / determine the current best nodes.
MirrorFraction float64

Expand All @@ -74,6 +81,9 @@ type Config struct {

// Harness is an internal test harness that is set during testing.
Harness *state.State

// ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid
ComplianceCidPeriod int64
}

const DefaultLoggingInterval = 5 * time.Second
Expand All @@ -88,8 +98,11 @@ const defaultMaxRetries = 3
// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction
const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

const DefaultComplianceCidPeriod = int64(100)

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
Expand Down Expand Up @@ -129,7 +142,16 @@ func NewCaboose(config *Config) (*Caboose, error) {
config.MirrorFraction = defaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
var overrideNodes []state.NodeInfo
err := json.Unmarshal([]byte(override), &overrideNodes)
if err != nil {
goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err)
return nil, err
}
config.OrchestratorOverride = overrideNodes
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
}

logger := newLogger(config)
Expand All @@ -144,6 +166,9 @@ func NewCaboose(config *Config) (*Caboose, error) {
Timeout: DefaultCarRequestTimeout,
}
}

c.config.Client.Transport = otelhttp.NewTransport(c.config.Client.Transport)

if c.config.OrchestratorEndpoint == nil {
var err error
c.config.OrchestratorEndpoint, err = url.Parse(DefaultOrchestratorEndpoint)
Expand All @@ -152,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) {
}
}

if c.config.ComplianceCidPeriod == 0 {
c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod
}

if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
Expand Down Expand Up @@ -185,17 +214,29 @@ func (c *Caboose) Close() {

// Fetch allows fetching car archives by a path of the form `/ipfs/<cid>[/path/to/file]`
func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error {
traceID := requestcontext.IDFromContext(ctx)
tid, err := trace.TraceIDFromHex(traceID)

ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()

return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
if err == nil {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: tid,
SpanID: span.SpanContext().SpanID(),
Remote: true,
})
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
}

return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx))
}

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return false, err
}
Expand All @@ -206,7 +247,7 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -218,14 +259,14 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (c *Caboose) getAffinity(ctx context.Context) string {
func (c *Caboose) GetAffinity(ctx context.Context) string {
// https://github.com/ipfs/bifrost-gateway/issues/53#issuecomment-1442732865
if affG := ctx.Value(gateway.ContentPathKey); affG != nil {
contentPath := affG.(ipath.Path).String()
Expand Down
4 changes: 2 additions & 2 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"github.com/filecoin-saturn/caboose"
carv2 "github.com/ipfs/boxo/ipld/car/v2"
"github.com/ipfs/boxo/ipld/car/v2/blockstore"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
Expand Down
8 changes: 7 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ func (epr ErrPartialResponse) Error() string {
// ErrInvalidResponse can be returned from a DataCallback to indicate that the data provided for the
// requested resource was explicitly 'incorrect' - that blocks not in the requested dag, or non-car-conforming
// data was returned.
type ErrInvalidResponse error
type ErrInvalidResponse struct {
Message string
}

func (e ErrInvalidResponse) Error() string {
return e.Message
}
Loading

0 comments on commit 33db988

Please sign in to comment.