diff --git a/caboose.go b/caboose.go index 3724081..ffdb096 100644 --- a/caboose.go +++ b/caboose.go @@ -2,18 +2,22 @@ package caboose import ( "context" + "encoding/json" "io" "net/http" "net/url" "os" - "strings" "time" + requestcontext "github.com/willscott/go-requestcontext" + ipfsblockstore "github.com/ipfs/boxo/blockstore" ipath "github.com/ipfs/boxo/coreiface/path" gateway "github.com/ipfs/boxo/gateway" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -30,7 +34,7 @@ type Config struct { // OrchestratorClient is the HTTP client to use when communicating with the orchestrator. OrchestratorClient *http.Client // OrchestratorOverride replaces calls to the orchestrator with a fixed response. - OrchestratorOverride []string + OrchestratorOverride []state.NodeInfo // LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests. LoggingEndpoint url.URL @@ -77,6 +81,9 @@ type Config struct { // Harness is an internal test harness that is set during testing. Harness *state.State + + // ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid + ComplianceCidPeriod int64 } const DefaultLoggingInterval = 5 * time.Second @@ -91,10 +98,12 @@ const defaultMaxRetries = 3 // default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction const defaultMirrorFraction = 0.01 -const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200" +const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200" const DefaultPoolRefreshInterval = 5 * time.Minute const DefaultPoolTargetSize = 30 +const DefaultComplianceCidPeriod = int64(100) + // we cool off sending requests for a cid for a certain duration // if we've seen a certain number of failures for it already in a given duration. // NOTE: before getting creative here, make sure you dont break end user flow @@ -133,7 +142,13 @@ func NewCaboose(config *Config) (*Caboose, error) { config.MirrorFraction = defaultMirrorFraction } if override := os.Getenv(BackendOverrideKey); len(override) > 0 { - config.OrchestratorOverride = strings.Split(override, ",") + var overrideNodes []state.NodeInfo + err := json.Unmarshal([]byte(override), &overrideNodes) + if err != nil { + goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err) + return nil, err + } + config.OrchestratorOverride = overrideNodes } if config.PoolTargetSize == 0 { config.PoolTargetSize = DefaultPoolTargetSize @@ -151,6 +166,9 @@ func NewCaboose(config *Config) (*Caboose, error) { Timeout: DefaultCarRequestTimeout, } } + + c.config.Client.Transport = otelhttp.NewTransport(c.config.Client.Transport) + if c.config.OrchestratorEndpoint == nil { var err error c.config.OrchestratorEndpoint, err = url.Parse(DefaultOrchestratorEndpoint) @@ -159,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) { } } + if c.config.ComplianceCidPeriod == 0 { + c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod + } + if c.config.PoolRefresh == 0 { c.config.PoolRefresh = DefaultPoolRefreshInterval } @@ -192,9 +214,21 @@ func (c *Caboose) Close() { // Fetch allows fetching car archives by a path of the form `/ipfs/[/path/to/file]` func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error { + traceID := requestcontext.IDFromContext(ctx) + tid, err := trace.TraceIDFromHex(traceID) + ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path))) defer span.End() + if err == nil { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: tid, + SpanID: span.SpanContext().SpanID(), + Remote: true, + }) + ctx = trace.ContextWithRemoteSpanContext(ctx, sc) + } + return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx)) } diff --git a/cmd/caboose/main.go b/cmd/caboose/main.go index dba6e81..22caecc 100644 --- a/cmd/caboose/main.go +++ b/cmd/caboose/main.go @@ -11,9 +11,9 @@ import ( "time" "github.com/filecoin-saturn/caboose" - carv2 "github.com/ipfs/boxo/ipld/car/v2" - "github.com/ipfs/boxo/ipld/car/v2/blockstore" "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/storage/bsadapter" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" diff --git a/errors.go b/errors.go index 9870103..10934a8 100644 --- a/errors.go +++ b/errors.go @@ -80,4 +80,10 @@ func (epr ErrPartialResponse) Error() string { // ErrInvalidResponse can be returned from a DataCallback to indicate that the data provided for the // requested resource was explicitly 'incorrect' - that blocks not in the requested dag, or non-car-conforming // data was returned. -type ErrInvalidResponse error +type ErrInvalidResponse struct { + Message string +} + +func (e ErrInvalidResponse) Error() string { + return e.Message +} diff --git a/fetcher.go b/fetcher.go index d8b04a8..b7b3c80 100644 --- a/fetcher.go +++ b/fetcher.go @@ -7,10 +7,13 @@ import ( "hash/crc32" "io" "net/http" + "net/http/httptrace" "os" "strconv" + "strings" "time" + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -52,7 +55,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m return ce } - ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime))) + p.ActiveNodes.lk.RLock() + isCore := p.ActiveNodes.IsCore(from) + p.ActiveNodes.lk.RUnlock() + + ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime), attribute.Bool("core", isCore))) defer span.End() requestId := uuid.NewString() @@ -66,12 +73,20 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m proto := "unknown" respReq := &http.Request{} received := 0 - reqUrl := fmt.Sprintf("https://%s%s", from.URL, resource) + + reqUrl := "" + if strings.Contains(from.URL, "://") { + reqUrl = fmt.Sprintf("%s%s", from.URL, resource) + } else { + reqUrl = fmt.Sprintf("https://%s%s", from.URL, resource) + } + var respHeader http.Header saturnNodeId := "" saturnTransferId := "" isCacheHit := false networkError := "" + verificationError := "" isBlockRequest := false if mime == "application/vnd.ipld.raw" { @@ -100,10 +115,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m if cacheHit == saturnCacheHit { isCacheHit = true } - - for k, v := range respHeader { - received = received + len(k) + len(v) - } } durationSecs := time.Since(start).Seconds() @@ -156,12 +167,13 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m HTTPProtocol: proto, TTFBMS: int(ttfbMs), // my address - Range: "", - Referrer: respReq.Referer(), - UserAgent: respReq.UserAgent(), - NodeId: saturnNodeId, - NodeIpAddress: from.URL, - IfNetworkError: networkError, + Range: "", + Referrer: respReq.Referer(), + UserAgent: respReq.UserAgent(), + NodeId: saturnNodeId, + NodeIpAddress: from.URL, + IfNetworkError: networkError, + VerificationError: verificationError, } } } @@ -178,7 +190,9 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m reqCtx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil) + clientTrace := otelhttptrace.NewClientTrace(reqCtx) + subReqCtx := httptrace.WithClientTrace(reqCtx, clientTrace) + req, err := http.NewRequestWithContext(subReqCtx, http.MethodGet, reqUrl, nil) if err != nil { if isCtxError(reqCtx) { return reqCtx.Err() @@ -282,8 +296,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m wrapped := TrackingReader{resp.Body, time.Time{}, 0} err = cb(resource, &wrapped) received = wrapped.len - // drain body so it can be re-used. - _, _ = io.Copy(io.Discard, resp.Body) if err != nil { if isCtxError(reqCtx) { @@ -298,7 +310,15 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m saturnCallsFailureTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("failed-response-read-%s", getCacheStatus(isCacheHit)), fmt.Sprintf("%d", code)).Add(1) } - networkError = err.Error() + var target = ErrInvalidResponse{} + if errors.As(err, &target) { + verificationError = err.Error() + goLogger.Errorw("failed to read response; verification error", "err", err.Error()) + } else { + networkError = err.Error() + goLogger.Errorw("failed to read response; no verification error", "err", err.Error()) + } + return err } diff --git a/go.mod b/go.mod index 714a0f4..8bca168 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,12 @@ go 1.19 require ( github.com/google/uuid v1.3.0 - github.com/ipfs/boxo v0.10.2 + github.com/ipfs/boxo v0.11.0 github.com/ipfs/go-block-format v0.1.2 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-log/v2 v2.5.1 - github.com/ipld/go-car v0.6.1 - github.com/ipld/go-car/v2 v2.10.1 + github.com/ipld/go-car v0.6.2 + github.com/ipld/go-car/v2 v2.10.2-0.20230622090957-499d0c909d33 github.com/ipld/go-ipld-prime v0.20.0 github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd github.com/mitchellh/go-server-timing v1.0.1 @@ -19,10 +19,13 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tcnksm/go-httpstat v0.2.0 github.com/urfave/cli/v2 v2.24.2 + github.com/willscott/go-requestcontext v0.0.1 github.com/willscott/hashring v0.0.0-20230731155239-15f93a2dfb44 github.com/zyedidia/generic v1.2.2-0.20230625215236-3404399b19f1 - go.opentelemetry.io/otel v1.14.0 - go.opentelemetry.io/otel/trace v1.14.0 + go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.43.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.43.0 + go.opentelemetry.io/otel v1.17.0 + go.opentelemetry.io/otel/trace v1.17.0 ) require ( @@ -34,9 +37,9 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect - github.com/felixge/httpsnoop v1.0.0 // indirect + github.com/felixge/httpsnoop v1.0.3 // indirect github.com/gabriel-vasile/mimetype v1.4.1 // indirect - github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5 // indirect @@ -45,6 +48,7 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect github.com/ipfs/go-blockservice v0.5.0 // indirect @@ -57,7 +61,6 @@ require ( github.com/ipfs/go-ipld-cbor v0.0.6 // indirect github.com/ipfs/go-ipld-format v0.5.0 // indirect github.com/ipfs/go-ipld-legacy v0.2.1 // indirect - github.com/ipfs/go-ipns v0.3.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-merkledag v0.11.0 // indirect github.com/ipfs/go-metrics-interface v0.0.1 // indirect @@ -71,7 +74,7 @@ require ( github.com/libp2p/go-doh-resolver v0.4.0 // indirect github.com/libp2p/go-libp2p v0.26.3 // indirect github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect - github.com/libp2p/go-libp2p-kad-dht v0.21.1 // indirect + github.com/libp2p/go-libp2p-kad-dht v0.23.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect github.com/libp2p/go-libp2p-record v0.2.0 // indirect github.com/libp2p/go-libp2p-routing-helpers v0.7.0 // indirect @@ -86,7 +89,7 @@ require ( github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-multiaddr v0.8.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect - github.com/multiformats/go-multibase v0.1.1 // indirect + github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect @@ -107,6 +110,7 @@ require ( github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/otel/metric v1.17.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect @@ -118,7 +122,8 @@ require ( golang.org/x/sys v0.6.0 // indirect golang.org/x/tools v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/protobuf v1.28.1 // indirect + gonum.org/v1/gonum v0.11.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) diff --git a/go.sum b/go.sum index 60a70f2..362284f 100644 --- a/go.sum +++ b/go.sum @@ -79,8 +79,9 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/felixge/httpsnoop v1.0.0 h1:gh8fMGz0rlOv/1WmRZm7OgncIOTsAj21iNJot48omJQ= github.com/felixge/httpsnoop v1.0.0/go.mod h1:3+D9sFq0ahK/JeJPhCBUV1xlf4/eIYrUQaxulT0VzX8= +github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= +github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= @@ -98,8 +99,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= -github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -187,12 +188,14 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.1 h1:5pv5N1lT1fjLg2VQ5KWc7kmucp2x/kvFOnxuVTqZ6x4= +github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.10.2 h1:kspw9HmMyKzLQxpKk417sF69i6iuf50AXtRjFqCYyL4= -github.com/ipfs/boxo v0.10.2/go.mod h1:1qgKq45mPRCxf4ZPoJV2lnXxyxucigILMJOrQrVivv8= +github.com/ipfs/boxo v0.11.0 h1:urMxhZ3xoF4HssJVD3+0ssGT9pptEfHfbL8DYdoWFlg= +github.com/ipfs/boxo v0.11.0/go.mod h1:8IfDmp+FzFGcF4zjAgHMVPpwYw4AjN9ePEzDfkaYJ1w= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= @@ -237,8 +240,6 @@ github.com/ipfs/go-ipld-format v0.5.0 h1:WyEle9K96MSrvr47zZHKKcDxJ/vlpET6PSiQsAF github.com/ipfs/go-ipld-format v0.5.0/go.mod h1:ImdZqJQaEouMjCvqCe0ORUS+uoBmf7Hf+EO/jh+nk3M= github.com/ipfs/go-ipld-legacy v0.2.1 h1:mDFtrBpmU7b//LzLSypVrXsD8QxkEWxu5qVxN99/+tk= github.com/ipfs/go-ipld-legacy v0.2.1/go.mod h1:782MOUghNzMO2DER0FlBR94mllfdCJCkTtDtPM51otM= -github.com/ipfs/go-ipns v0.3.0 h1:ai791nTgVo+zTuq2bLvEGmWP1M0A6kGTXUsgv/Yq67A= -github.com/ipfs/go-ipns v0.3.0/go.mod h1:3cLT2rbvgPZGkHJoPO1YMJeh6LtkxopCkKFcio/wE24= github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= @@ -254,10 +255,10 @@ github.com/ipfs/go-unixfsnode v1.7.1 h1:RRxO2b6CSr5UQ/kxnGzaChTjp5LWTdf3Y4n8ANZg github.com/ipfs/go-unixfsnode v1.7.1/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= -github.com/ipld/go-car v0.6.1 h1:blWbEHf1j62JMWFIqWE//YR0m7k5ZMw0AuUOU5hjrH8= -github.com/ipld/go-car v0.6.1/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= -github.com/ipld/go-car/v2 v2.10.1 h1:MRDqkONNW9WRhB79u+Z3U5b+NoN7lYA5B8n8qI3+BoI= -github.com/ipld/go-car/v2 v2.10.1/go.mod h1:sQEkXVM3csejlb1kCCb+vQ/pWBKX9QtvsrysMQjOgOg= +github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc= +github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= +github.com/ipld/go-car/v2 v2.10.2-0.20230622090957-499d0c909d33 h1:0OZwzSYWIuiKEOXd/2vm5cMcEmmGLFn+1h6lHELCm3s= +github.com/ipld/go-car/v2 v2.10.2-0.20230622090957-499d0c909d33/go.mod h1:sQEkXVM3csejlb1kCCb+vQ/pWBKX9QtvsrysMQjOgOg= github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= github.com/ipld/go-ipld-prime v0.20.0 h1:Ud3VwE9ClxpO2LkCYP7vWPc0Fo+dYdYzgxUJZ3uRG4g= @@ -309,8 +310,8 @@ github.com/libp2p/go-libp2p v0.26.3 h1:6g/psubqwdaBqNNoidbRKSTBEYgaOuKBhHl8Q5tO+ github.com/libp2p/go-libp2p v0.26.3/go.mod h1:x75BN32YbwuY0Awm2Uix4d4KOz+/4piInkp4Wr3yOo8= github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/1q+8WeNAsw= github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI= -github.com/libp2p/go-libp2p-kad-dht v0.21.1 h1:xpfp8/t9+X2ip1l8Umap1/UGNnJ3RHJgKGAEsnRAlTo= -github.com/libp2p/go-libp2p-kad-dht v0.21.1/go.mod h1:Oy8wvbdjpB70eS5AaFaI68tOtrdo3KylTvXDjikxqFo= +github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM= +github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU= github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA= github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= @@ -369,8 +370,8 @@ github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTd github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= -github.com/multiformats/go-multibase v0.1.1 h1:3ASCDsuLX8+j4kx58qnJ4YFq/JWTJpCyDW27ztsVTOI= -github.com/multiformats/go-multibase v0.1.1/go.mod h1:ZEjHE+IsUrgp5mhlEAYjMtZwK1k4haNkcaPg9aoe1a8= +github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= +github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg= github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= @@ -490,6 +491,8 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa/go.mod h1:f github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= +github.com/willscott/go-requestcontext v0.0.1 h1:qHL7y9r4MOO/4MRdTP/JB0f0uEle+qlueTZJQVvT1YU= +github.com/willscott/go-requestcontext v0.0.1/go.mod h1:23J4EoOLguNM3JeGv2AUDtcWnzK6AFieymcLTDqXQfg= github.com/willscott/hashring v0.0.0-20230731155239-15f93a2dfb44 h1:zs4g7LTzNzjl8WC0XPJqJx2lga4/6RSH5QaZ3nXOHCg= github.com/willscott/hashring v0.0.0-20230731155239-15f93a2dfb44/go.mod h1:8ORHm5iheceXLsLvS8Ch8nFWBSjxwajLoKA3a05cjL4= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= @@ -508,10 +511,16 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/otel v1.14.0 h1:/79Huy8wbf5DnIPhemGB+zEPVwnN6fuQybr/SRXa6hM= -go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU= -go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M= -go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.43.0 h1:lp9h55W1raxWOkKkasHTnqse5R1YKVNJ5/NPcWXYjRM= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.43.0/go.mod h1:haEjy8B8Upz9+p1zuhvsKm2uPiKeYFHaNB6BddllMBE= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.43.0 h1:HKORGpiOY0R0nAPtKx/ub8/7XoHhRooP8yNRkuPfelI= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.43.0/go.mod h1:e+y1M74SYXo/FcIx3UATwth2+5dDkM8dBi7eXg1tbw8= +go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM= +go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0= +go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc= +go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o= +go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ= +go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= @@ -750,6 +759,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= +gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= @@ -826,8 +837,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/state/state.go b/internal/state/state.go index 9c9e5f0..96b959a 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -9,3 +9,12 @@ type State struct { type PoolController interface { DoRefresh() } + +type NodeInfo struct { + ID string `json:"id"` + IP string `json:"ip"` + Distance float32 `json:"distance"` + Weight int `json:"weight"` + ComplianceCid string `json:"complianceCid"` + Core bool `json:"core"` +} diff --git a/internal/util/harness.go b/internal/util/harness.go index 4b2fc6b..fe304de 100644 --- a/internal/util/harness.go +++ b/internal/util/harness.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "encoding/json" "errors" + "math/rand" "net/http" "net/http/httptest" "net/url" @@ -26,11 +27,21 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt ch := &CabooseHarness{} ch.Endpoints = make([]*Endpoint, n) - purls := make([]string, n) + purls := make([]state.NodeInfo, n) for i := 0; i < len(ch.Endpoints); i++ { ch.Endpoints[i] = &Endpoint{} ch.Endpoints[i].Setup() - purls[i] = strings.TrimPrefix(ch.Endpoints[i].Server.URL, "https://") + ip := strings.TrimPrefix(ch.Endpoints[i].Server.URL, "https://") + + cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(testBlock)) + + purls[i] = state.NodeInfo{ + IP: ip, + ID: "node-id", + Weight: rand.Intn(100), + Distance: rand.Float32(), + ComplianceCid: cid.String(), + } } ch.goodOrch = true orch := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -66,6 +77,8 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt PoolRefresh: time.Second * 50, MaxRetrievalAttempts: maxRetries, Harness: &state.State{}, + + MirrorFraction: 1.0, } for _, opt := range opts { @@ -248,6 +261,18 @@ func WithMaxFailuresBeforeCoolDown(max int) func(config *caboose.Config) { } } +func WithComplianceCidPeriod(n int64) func(config *caboose.Config) { + return func(config *caboose.Config) { + config.ComplianceCidPeriod = n + } +} + +func WithMirrorFraction(n float64) func(config *caboose.Config) { + return func(config *caboose.Config) { + config.MirrorFraction = n + } +} + func WithCidCoolDownDuration(duration time.Duration) func(config *caboose.Config) { return func(config *caboose.Config) { config.FetchKeyCoolDownDuration = duration diff --git a/log.go b/log.go index f43561f..ec978f1 100644 --- a/log.go +++ b/log.go @@ -104,4 +104,5 @@ type log struct { NodeId string `json:"nodeId"` IfNetworkError string `json:"ifNetworkError"` NodeIpAddress string `json:"nodeIpAddress"` + VerificationError string `json:"verificationError"` } diff --git a/metrics.go b/metrics.go index 2b82c49..4350a8d 100644 --- a/metrics.go +++ b/metrics.go @@ -133,6 +133,10 @@ var ( mirroredTrafficTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "mirrored_traffic_total"), }, []string{"error_status"}) + + complianceCidCallsTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: prometheus.BuildFQName("ipfs", "caboose", "compliance_cids_total"), + }, []string{"error_status"}) ) var CabooseMetrics = prometheus.NewRegistry() @@ -163,6 +167,7 @@ func init() { CabooseMetrics.MustRegister(saturnCallsTotalMetric) CabooseMetrics.MustRegister(saturnCallsFailureTotalMetric) CabooseMetrics.MustRegister(saturnConnectionFailureTotalMetric) + CabooseMetrics.MustRegister(complianceCidCallsTotalMetric) CabooseMetrics.MustRegister(saturnCallsSuccessTotalMetric) diff --git a/node.go b/node.go index 7424b41..f21f1b2 100644 --- a/node.go +++ b/node.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/filecoin-saturn/caboose/internal/state" "github.com/zyedidia/generic/queue" ) @@ -14,7 +15,9 @@ const ( ) type Node struct { - URL string + URL string + ComplianceCid string + Core bool PredictedLatency float64 PredictedThroughput float64 @@ -25,10 +28,12 @@ type Node struct { lk sync.RWMutex } -func NewNode(url string) *Node { +func NewNode(info state.NodeInfo) *Node { return &Node{ - URL: url, - Samples: queue.New[NodeSample](), + URL: info.IP, + ComplianceCid: info.ComplianceCid, + Core: info.Core, + Samples: queue.New[NodeSample](), } } diff --git a/node_heap.go b/node_heap.go index 1d6092f..c71771f 100644 --- a/node_heap.go +++ b/node_heap.go @@ -2,6 +2,7 @@ package caboose import ( "container/heap" + "math/rand" "sync" ) @@ -45,8 +46,13 @@ func (nh *NodeHeap) Best() *Node { func (nh *NodeHeap) PeekRandom() *Node { nh.lk.RLock() defer nh.lk.RUnlock() - // TODO - return nil + + if len(nh.Nodes) == 0 { + return nil + } + + randIdx := rand.Intn(len(nh.Nodes)) + return nh.Nodes[randIdx] } func (nh *NodeHeap) TopN(n int) []*Node { diff --git a/node_ring.go b/node_ring.go index dfe6b31..eafbbf3 100644 --- a/node_ring.go +++ b/node_ring.go @@ -137,6 +137,17 @@ func (nr *NodeRing) Contains(n *Node) bool { return ok } +func (nr *NodeRing) IsCore(n *Node) bool { + nr.lk.RLock() + defer nr.lk.RUnlock() + + nd, ok := nr.Nodes[n.URL] + if !ok { + return false + } + return nd.Core +} + func (nr *NodeRing) GetNodes(key string, number int) ([]*Node, error) { nr.lk.RLock() defer nr.lk.RUnlock() diff --git a/pool.go b/pool.go index 294c187..64dca91 100644 --- a/pool.go +++ b/pool.go @@ -2,15 +2,19 @@ package caboose import ( "context" + cryptoRand "crypto/rand" "encoding/json" "errors" "fmt" "io" + "math/big" "math/rand" "net/url" "sync" "time" + "github.com/filecoin-saturn/caboose/internal/state" + "github.com/patrickmn/go-cache" "github.com/ipfs/boxo/path" @@ -19,11 +23,16 @@ import ( "github.com/ipld/go-car" ) -const blockPathPattern = "/ipfs/%s?format=car&dag-scope=block" +const ( + blockPathPattern = "/ipfs/%s?format=car&dag-scope=block" + defaultMirroredConcurrency = 5 +) + +var complianceCidReqTemplate = "/ipfs/%s?format=raw" // loadPool refreshes the set of endpoints in the pool by fetching an updated list of nodes from the // Orchestrator. -func (p *pool) loadPool() ([]string, error) { +func (p *pool) loadPool() ([]state.NodeInfo, error) { if p.config.OrchestratorOverride != nil { return p.config.OrchestratorOverride, nil } @@ -35,12 +44,15 @@ func (p *pool) loadPool() ([]string, error) { } defer resp.Body.Close() - responses := make([]string, 0) + responses := make([]state.NodeInfo, 0) + if err := json.NewDecoder(resp.Body).Decode(&responses); err != nil { goLogger.Warnw("failed to decode backends from orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint.String()) return nil, err } + goLogger.Infow("got backends from orchestrators", "cnt", len(responses), "endpoint", p.config.OrchestratorEndpoint.String()) + return responses, nil } @@ -136,30 +148,67 @@ func (p *pool) refreshPool() { } } +func (p *pool) fetchComplianceCid(node *Node) error { + sc := node.ComplianceCid + if len(node.ComplianceCid) == 0 { + goLogger.Warnw("failed to find compliance cid ", "for node", node) + return fmt.Errorf("compliance cid doesn't exist for node: %s ", node) + } + trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + reqUrl := fmt.Sprintf(complianceCidReqTemplate, sc) + goLogger.Debugw("fetching compliance cid", "cid", reqUrl, "from", node) + err := p.fetchResourceAndUpdate(trialTimeout, node, reqUrl, 0, p.mirrorValidator) + cancel() + return err +} + func (p *pool) checkPool() { + sem := make(chan struct{}, defaultMirroredConcurrency) + for { select { case msg := <-p.mirrorSamples: - // see if it is to a main-tier node - if so find appropriate test node to test against. - if !p.ActiveNodes.Contains(msg.node) { - continue - } - testNode := p.AllNodes.PeekRandom() - if testNode == nil { - continue - } - if p.ActiveNodes.Contains(testNode) { - continue - } + sem <- struct{}{} + go func(msg mirroredPoolRequest) { + defer func() { <-sem }() + + // see if it is to a main-tier node - if so find appropriate test node to test against. + if !p.ActiveNodes.Contains(msg.node) { + return + } + testNode := p.AllNodes.PeekRandom() + if testNode == nil { + return + } + if p.ActiveNodes.Contains(testNode) { + rand := big.NewInt(1) + if p.config.ComplianceCidPeriod > 0 { + rand, _ = cryptoRand.Int(cryptoRand.Reader, big.NewInt(p.config.ComplianceCidPeriod)) + } + + if rand.Cmp(big.NewInt(0)) == 0 { + err := p.fetchComplianceCid(testNode) + if err != nil { + goLogger.Warnw("failed to fetch compliance cid ", "err", err) + complianceCidCallsTotalMetric.WithLabelValues("error").Add(1) + } else { + complianceCidCallsTotalMetric.WithLabelValues("success").Add(1) + } + } + return + } + + trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator) + + cancel() + if err != nil { + mirroredTrafficTotalMetric.WithLabelValues("error").Inc() + } else { + mirroredTrafficTotalMetric.WithLabelValues("no-error").Inc() + } + }(msg) - trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) - err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator) - cancel() - if err != nil { - mirroredTrafficTotalMetric.WithLabelValues("error").Inc() - } else { - mirroredTrafficTotalMetric.WithLabelValues("no-error").Inc() - } case <-p.done: return } @@ -189,7 +238,7 @@ func (p *pool) mirrorValidator(resource string, reader io.Reader) error { return err } - br, err := car.NewCarReader(reader) + br, err := car.NewCarReaderWithOptions(reader, car.WithErrorOnEmptyRoots(false)) if err != nil { return err } @@ -323,6 +372,7 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba default: } } + old := pq[0] err = p.fetchResourceAndUpdate(ctx, nodes[i], pq[0], i, cb) if err != nil && errors.Is(err, context.Canceled) { return err @@ -337,6 +387,8 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba //fetchSpeedPerBlockMetric.Observe(float64(float64(len(blk.RawData())) / float64(durationMs))) fetchDurationCarSuccessMetric.Observe(float64(durationMs)) return + } else if pq[0] == old { + continue } else { // TODO: potentially worth doing something smarter here based on what the current state // of permanent vs temporary errors is. @@ -352,11 +404,16 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba } pq = pq[1:] pq = append(pq, epr.StillNeed...) - // TODO: potentially worth doing something smarter here based on what the current state - // of permanent vs temporary errors is. - // for now: reset i on partials so we also give them a chance to retry. - i = -1 + if pq[0] == old { + continue + } else { + // TODO: potentially worth doing something smarter here based on what the current state + // of permanent vs temporary errors is. + + // for now: reset i on partials so we also give them a chance to retry. + i = -1 + } } } diff --git a/pool_refresh_test.go b/pool_refresh_test.go index e6318bf..5ae4ad4 100644 --- a/pool_refresh_test.go +++ b/pool_refresh_test.go @@ -1,8 +1,12 @@ package caboose import ( + "math/rand" "testing" + "github.com/filecoin-saturn/caboose/internal/state" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" ) @@ -29,8 +33,25 @@ func TestPoolRefresh(t *testing.T) { } func addAndAssertPool(t *testing.T, p *pool, nodes []string, expectedTotal int) { - for _, n := range nodes { + nodeStructs := genNodeStructs(nodes) + for _, n := range nodeStructs { p.AllNodes.AddIfNotPresent(NewNode(n)) } require.Equal(t, expectedTotal, p.AllNodes.Len()) } + +func genNodeStructs(nodes []string) []state.NodeInfo { + var nodeStructs []state.NodeInfo + + for _, node := range nodes { + cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(node)) + nodeStructs = append(nodeStructs, state.NodeInfo{ + IP: node, + ID: node, + Weight: rand.Intn(100), + Distance: rand.Float32(), + ComplianceCid: cid.String(), + }) + } + return nodeStructs +} diff --git a/pool_test.go b/pool_test.go index 683dca2..543078b 100644 --- a/pool_test.go +++ b/pool_test.go @@ -3,15 +3,10 @@ package caboose_test import ( "bytes" "context" - "crypto/tls" - "net/http" - "net/url" - "strings" "testing" "time" "unsafe" - "github.com/filecoin-saturn/caboose" "github.com/filecoin-saturn/caboose/internal/util" "github.com/ipfs/go-cid" "github.com/ipld/go-car/v2" @@ -28,15 +23,7 @@ func TestPoolMirroring(t *testing.T) { t.Skip("skipping for 32bit architectures because too slow") } - saturnClient := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, - } - - data := []byte("hello world") + data := []byte("hello World") ls := cidlink.DefaultLinkSystem() lsm := memstore.Store{} ls.SetReadStorage(&lsm) @@ -50,32 +37,8 @@ func TestPoolMirroring(t *testing.T) { t.Fatal(err) } - e := util.Endpoint{} - e.Setup() - e.SetResp(carBytes.Bytes(), false) - eURL := strings.TrimPrefix(e.Server.URL, "https://") - - e2 := util.Endpoint{} - e2.Setup() - e2.SetResp(carBytes.Bytes(), false) - e2URL := strings.TrimPrefix(e2.Server.URL, "https://") - - conf := caboose.Config{ - OrchestratorEndpoint: &url.URL{}, - OrchestratorClient: http.DefaultClient, - OrchestratorOverride: []string{eURL, e2URL}, - LoggingEndpoint: url.URL{}, - LoggingClient: http.DefaultClient, - LoggingInterval: time.Hour, - - Client: saturnClient, - DoValidation: false, - PoolRefresh: time.Minute, - MaxRetrievalAttempts: 1, - MirrorFraction: 1.0, - } + ch := util.BuildCabooseHarness(t, 2, 3) - c, err := caboose.NewCaboose(&conf) if err != nil { t.Fatal(err) } @@ -84,7 +47,7 @@ func TestPoolMirroring(t *testing.T) { // Make 10 requests, and expect some fraction trigger a mirror. for i := 0; i < 10; i++ { - _, err = c.Get(context.Background(), finalC) + _, err = ch.Caboose.Get(context.Background(), finalC) if err != nil { t.Fatal(err) } @@ -92,11 +55,43 @@ func TestPoolMirroring(t *testing.T) { } time.Sleep(100 * time.Millisecond) - c.Close() + ch.Caboose.Close() + + ec := ch.Endpoints[0].Count() - ec := e.Count() - e2c := e2.Count() + e2c := ch.Endpoints[1].Count() if ec+e2c < 10 { t.Fatalf("expected at least 10 fetches, got %d", ec+e2c) } } + +func TestFetchComplianceCid(t *testing.T) { + if unsafe.Sizeof(unsafe.Pointer(nil)) <= 4 { + t.Skip("skipping for 32bit architectures because too slow") + } + + ch := util.BuildCabooseHarness(t, 1, 1, util.WithComplianceCidPeriod(1), util.WithMirrorFraction(1.0)) + + ch.CaboosePool.DoRefresh() + + ls := cidlink.DefaultLinkSystem() + lsm := memstore.Store{} + ls.SetReadStorage(&lsm) + ls.SetWriteStorage(&lsm) + finalCL := ls.MustStore(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256))}, basicnode.NewBytes(testBlock)) + finalC := finalCL.(cidlink.Link).Cid + + _, err := ch.Caboose.Get(context.Background(), finalC) + if err != nil { + t.Fatal(err) + } + + time.Sleep(100 * time.Millisecond) + ch.Caboose.Close() + + e := ch.Endpoints[0] + + if e.Count() != 2 { + t.Fatalf("expected 2 primary fetch, got %d", e.Count()) + } +}