From e8ae7994fc313aec04a0cc3040cc092b3d3a5a09 Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Wed, 3 May 2023 19:05:02 +0200 Subject: [PATCH 1/5] fix(nodebuilder): fix error log and shut the fx (#2160) --- logs/logs.go | 1 + nodebuilder/node.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/logs/logs.go b/logs/logs.go index bedb74cc58..4ae1cddd3f 100644 --- a/logs/logs.go +++ b/logs/logs.go @@ -21,6 +21,7 @@ func SetAllLoggers(level logging.LogLevel) { _ = logging.SetLogLevel("net/identify", "ERROR") _ = logging.SetLogLevel("shrex/nd", "WARN") _ = logging.SetLogLevel("shrex/eds", "WARN") + _ = logging.SetLogLevel("fx", "FATAL") } func SetDebugLogging() { diff --git a/nodebuilder/node.go b/nodebuilder/node.go index 95f5d02116..90caac5dc9 100644 --- a/nodebuilder/node.go +++ b/nodebuilder/node.go @@ -30,7 +30,7 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/state" ) -var Timeout = time.Second * 30 +var Timeout = time.Minute * 2 var ( log = logging.Logger("node") @@ -100,7 +100,7 @@ func (n *Node) Start(ctx context.Context) error { if err != nil { log.Debugf("error starting %s Node: %s", n.Type, err) if errors.Is(err, context.DeadlineExceeded) { - return fmt.Errorf("node: failed to start within timeout(%s): %w", Timeout, errors.Unwrap(err)) + return fmt.Errorf("node: failed to start within timeout(%s): %w", Timeout, err) } return fmt.Errorf("node: failed to start: %w", err) } @@ -145,7 +145,7 @@ func (n *Node) Stop(ctx context.Context) error { if err != nil { log.Debugf("error stopping %s Node: %s", n.Type, err) if errors.Is(err, context.DeadlineExceeded) { - return fmt.Errorf("node: failed to stop within timeout(%s): %w", Timeout, errors.Unwrap(err)) + return fmt.Errorf("node: failed to stop within timeout(%s): %w", Timeout, err) } return fmt.Errorf("node: failed to stop: %w", err) } From 0100141b992cdc148c8f4fc4351cea5b4a0c60a0 Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Thu, 4 May 2023 15:34:29 +0200 Subject: [PATCH 2/5] fix(share/discovery): fixes from Advertise logic audit (#2163) * Use only TTL provided by us. * Don't wait 8hs upon error * Increase the advertise timeout --- share/availability/discovery/discovery.go | 29 +++++++++++++---------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index 0a2deb7858..cd1ccbb8f8 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -35,11 +35,6 @@ const ( defaultRetryTimeout = time.Second ) -// waitF calculates time to restart announcing. -var waitF = func(ttl time.Duration) time.Duration { - return 7 * ttl / 8 -} - type Parameters struct { // PeersLimit defines the soft limit of FNs to connect to via discovery. // Set 0 to disable. @@ -67,8 +62,9 @@ func (p Parameters) withDefaults() Parameters { func DefaultParameters() Parameters { return Parameters{ - PeersLimit: 5, - AdvertiseInterval: time.Hour * 8, + PeersLimit: 5, + // based on https://github.com/libp2p/go-libp2p-kad-dht/pull/793 + AdvertiseInterval: time.Hour * 22, } } @@ -151,32 +147,41 @@ func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) { // TODO: Start advertising only after the reachability is confirmed by AutoNAT func (d *Discovery) Advertise(ctx context.Context) { if d.params.AdvertiseInterval == -1 { + log.Warn("AdvertiseInterval is set to -1. Skipping advertising...") return } timer := time.NewTimer(d.params.AdvertiseInterval) defer timer.Stop() for { - ttl, err := d.disc.Advertise(ctx, rendezvousPoint) + _, err := d.disc.Advertise(ctx, rendezvousPoint) if err != nil { - log.Debugf("Error advertising %s: %s", rendezvousPoint, err.Error()) if ctx.Err() != nil { return } + log.Warn("error advertising %s: %s", rendezvousPoint, err.Error()) + errTimer := time.NewTimer(time.Minute) select { - case <-timer.C: - timer.Reset(d.params.AdvertiseInterval) + case <-errTimer.C: + errTimer.Stop() + if !timer.Stop() { + <-timer.C + } continue case <-ctx.Done(): + errTimer.Stop() return } } log.Debugf("advertised") + if !timer.Stop() { + <-timer.C + } + timer.Reset(d.params.AdvertiseInterval) select { case <-timer.C: - timer.Reset(waitF(ttl)) case <-ctx.Done(): return } From 929a334e90b0a81e890a113beccb43caa2dee485 Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Fri, 5 May 2023 18:32:42 +0300 Subject: [PATCH 3/5] feat(metrics): set build info as an attribute in metrics (#2165) --- cmd/celestia/util.go | 6 ++++++ cmd/celestia/version.go | 7 +++++-- cmd/env.go | 11 +++++++++++ cmd/flags_misc.go | 2 +- nodebuilder/node/buildInfo.go | 9 +++++++++ nodebuilder/node_test.go | 1 + nodebuilder/settings.go | 10 +++++----- 7 files changed, 38 insertions(+), 8 deletions(-) create mode 100644 nodebuilder/node/buildInfo.go diff --git a/cmd/celestia/util.go b/cmd/celestia/util.go index a38860d1f7..85505c3a60 100644 --- a/cmd/celestia/util.go +++ b/cmd/celestia/util.go @@ -26,6 +26,12 @@ func persistentPreRunEnv(cmd *cobra.Command, nodeType node.Type, _ []string) err return err } ctx = cmdnode.WithNetwork(ctx, parsedNetwork) + ctx = cmdnode.WithNodeBuildInfo(ctx, &node.BuildInfo{ + LastCommit: lastCommit, + SemanticVersion: semanticVersion, + SystemVersion: systemVersion, + GolangVersion: golangVersion, + }) // loads existing config into the environment ctx, err = cmdnode.ParseNodeFlags(ctx, cmd, cmdnode.Network(ctx)) diff --git a/cmd/celestia/version.go b/cmd/celestia/version.go index f6fba4a007..462f17b474 100644 --- a/cmd/celestia/version.go +++ b/cmd/celestia/version.go @@ -11,6 +11,9 @@ var ( buildTime string lastCommit string semanticVersion string + + systemVersion = fmt.Sprintf("%s/%s", runtime.GOARCH, runtime.GOOS) + golangVersion = runtime.Version() ) var versionCmd = &cobra.Command{ @@ -24,6 +27,6 @@ func printBuildInfo(_ *cobra.Command, _ []string) { fmt.Printf("Semantic version: %s\n", semanticVersion) fmt.Printf("Commit: %s\n", lastCommit) fmt.Printf("Build Date: %s\n", buildTime) - fmt.Printf("System version: %s/%s\n", runtime.GOARCH, runtime.GOOS) - fmt.Printf("Golang version: %s\n", runtime.Version()) + fmt.Printf("System version: %s\n", systemVersion) + fmt.Printf("Golang version: %s\n", golangVersion) } diff --git a/cmd/env.go b/cmd/env.go index f9860a2de8..ca915d884f 100644 --- a/cmd/env.go +++ b/cmd/env.go @@ -38,6 +38,11 @@ func NodeConfig(ctx context.Context) nodebuilder.Config { return cfg } +// NodeInfo reads the node build inforamtion from the context. +func NodeInfo(ctx context.Context) node.BuildInfo { + return ctx.Value(buildInfo{}).(node.BuildInfo) +} + // WithNodeType sets the node type in the given context. func WithNodeType(ctx context.Context, tp node.Type) context.Context { return context.WithValue(ctx, nodeTypeKey{}, tp) @@ -73,10 +78,16 @@ func WithNodeConfig(ctx context.Context, config *nodebuilder.Config) context.Con return context.WithValue(ctx, configKey{}, *config) } +// WithNodeConfig sets the node config build information. +func WithNodeBuildInfo(ctx context.Context, info *node.BuildInfo) context.Context { + return context.WithValue(ctx, buildInfo{}, *info) +} + type ( optionsKey struct{} configKey struct{} storePathKey struct{} nodeTypeKey struct{} networkKey struct{} + buildInfo struct{} ) diff --git a/cmd/flags_misc.go b/cmd/flags_misc.go index f671840eab..4483e17201 100644 --- a/cmd/flags_misc.go +++ b/cmd/flags_misc.go @@ -262,7 +262,7 @@ func ParseMiscFlags(ctx context.Context, cmd *cobra.Command) (context.Context, e opts = append(opts, otlpmetrichttp.WithInsecure()) } - ctx = WithNodeOptions(ctx, nodebuilder.WithMetrics(opts, NodeType(ctx))) + ctx = WithNodeOptions(ctx, nodebuilder.WithMetrics(opts, NodeType(ctx), NodeInfo(ctx))) } ok, err = cmd.Flags().GetBool(p2pMetrics) diff --git a/nodebuilder/node/buildInfo.go b/nodebuilder/node/buildInfo.go new file mode 100644 index 0000000000..5f5bdde28e --- /dev/null +++ b/nodebuilder/node/buildInfo.go @@ -0,0 +1,9 @@ +package node + +// BuildInfo stores all necessary information for the current build. +type BuildInfo struct { + LastCommit string + SemanticVersion string + SystemVersion string + GolangVersion string +} diff --git a/nodebuilder/node_test.go b/nodebuilder/node_test.go index aa22b0fcc7..11b27b076a 100644 --- a/nodebuilder/node_test.go +++ b/nodebuilder/node_test.go @@ -80,6 +80,7 @@ func TestLifecycle_WithMetrics(t *testing.T) { otlpmetrichttp.WithInsecure(), }, tt.tp, + node.BuildInfo{}, ), ) require.NotNil(t, node) diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index 67a30793f8..8da9f90da6 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -62,9 +62,10 @@ func WithPyroscope(endpoint string, nodeType node.Type) fx.Option { } // WithMetrics enables metrics exporting for the node. -func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Option { +func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type, buildInfo node.BuildInfo) fx.Option { baseComponents := fx.Options( fx.Supply(metricOpts), + fx.Supply(buildInfo), fx.Invoke(initializeMetrics), fx.Invoke(state.WithMetrics), fx.Invoke(fraud.WithMetrics), @@ -109,6 +110,7 @@ func initializeMetrics( lc fx.Lifecycle, peerID peer.ID, nodeType node.Type, + buildInfo node.BuildInfo, opts []otlpmetrichttp.Option, ) error { exp, err := otlpmetrichttp.New(ctx, opts...) @@ -120,17 +122,15 @@ func initializeMetrics( metric.WithReader(metric.NewPeriodicReader(exp, metric.WithTimeout(2*time.Second))), metric.WithResource(resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceNameKey.String(fmt.Sprintf("Celestia-%s", nodeType.String())), - // TODO(@Wondertan): Versioning: semconv.ServiceVersionKey + semconv.ServiceNamespaceKey.String(fmt.Sprintf("Celestia-%s", nodeType.String())), + semconv.ServiceNameKey.String(fmt.Sprintf("semver-%s", buildInfo.SemanticVersion)), semconv.ServiceInstanceIDKey.String(peerID.String()), ))) - lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return provider.Shutdown(ctx) }, }) global.SetMeterProvider(provider) - return nil } From ff65dff4c36412ee5934a53951787f197c3c08a2 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 8 May 2023 10:03:17 +0200 Subject: [PATCH 4/5] deps: Bump celestia-app version to 0.13.0 (#2164) Hardfork version bump. --- core/testing.go | 1 + go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/testing.go b/core/testing.go index 8b483b40a2..88154dc7d9 100644 --- a/core/testing.go +++ b/core/testing.go @@ -79,6 +79,7 @@ func StartTestNodeWithConfig(t *testing.T, cfg *TestConfig) testnode.Context { state, kr, "private", + nil, ) require.NoError(t, err) diff --git a/go.mod b/go.mod index a7221327bd..25bb89586e 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/BurntSushi/toml v1.2.1 github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921 github.com/benbjohnson/clock v1.3.0 - github.com/celestiaorg/celestia-app v0.12.2 + github.com/celestiaorg/celestia-app v0.13.0 github.com/celestiaorg/go-fraud v0.1.0 github.com/celestiaorg/go-header v0.2.7 github.com/celestiaorg/go-libp2p-messenger v0.2.0 diff --git a/go.sum b/go.sum index e04d77814d..76ec8166ce 100644 --- a/go.sum +++ b/go.sum @@ -340,8 +340,8 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7 github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= -github.com/celestiaorg/celestia-app v0.12.2 h1:Mlxzz2SS+uvE4RnbSuXAh4MagiJBW7GnhDa+8kVbzKE= -github.com/celestiaorg/celestia-app v0.12.2/go.mod h1:lKhL1Oxk4Z29M+GQ25luTHBgwSvgiT4puPeBrjdsgXc= +github.com/celestiaorg/celestia-app v0.13.0 h1:NPOR1P98YCCv+E2I9TdqsO1/UnGyzTHW5CBhctWHaOY= +github.com/celestiaorg/celestia-app v0.13.0/go.mod h1:OcPBfWDyowJgoEQ89NB2LgLOm9LSwloCgCzdZKjmi78= github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23 h1:BHvn41IHOtvHeX1VZqO/xBFIHj93llcw9ZQfNxyVRlI= github.com/celestiaorg/celestia-core v1.15.0-tm-v0.34.23/go.mod h1:nL+vkAMKy/A8wWemWqMwBy4pOGWYYbboAVTEe3N5gIU= github.com/celestiaorg/cosmos-sdk v1.8.0-sdk-v0.46.7 h1:EADZy33ufskVIy6Rj6jbi3SOVCeYYo26zUi7iYx+QR0= From 7f556f06e175267e0dd60b444a68554f592710a0 Mon Sep 17 00:00:00 2001 From: hrt/derrandz Date: Mon, 8 May 2023 10:17:51 +0100 Subject: [PATCH 5/5] feat(share)!: add functional options to share pkg (#1798) ## Overview This PR introduces functional options to the share package and deprecates usage of default values directly in code, in favor of using parameterized values. ## Breaking This PR breaks the configuration. The on-disk configuration becomes (_note that `Share.Availability` is only available for light nodes_): ``` [Share] [Share.Availability] SampleAmount = 16 [Share.Discovery] PeersLimit = 5 AdvertiseInterval = "8h0m0s" ``` ## Checklist - [x] New and updated code has appropriate documentation - [x] New and updated code has new and/or updated testing - [x] Required CI checks are passing - [x] Visual proof for any user facing features like CLI or documentation updates - [x] Linked issues closed with keywords --- nodebuilder/config.go | 2 +- nodebuilder/share/config.go | 51 ++++++++++++---- nodebuilder/share/constructors.go | 5 +- nodebuilder/share/module.go | 9 ++- share/availability.go | 6 +- share/availability/cache/availability.go | 20 +++---- share/availability/discovery/discovery.go | 59 ++++++------------- .../availability/discovery/discovery_test.go | 21 +++---- share/availability/discovery/options.go | 59 +++++++++++++++++++ share/availability/full/availability.go | 6 +- share/availability/full/testing.go | 11 ++-- share/availability/light/availability.go | 22 +++++-- share/availability/light/options.go | 50 ++++++++++++++++ share/availability/light/sample.go | 4 -- share/getters/shrex_test.go | 7 +-- share/p2p/peers/manager_test.go | 20 +++---- 16 files changed, 235 insertions(+), 117 deletions(-) create mode 100644 share/availability/discovery/options.go create mode 100644 share/availability/light/options.go diff --git a/nodebuilder/config.go b/nodebuilder/config.go index d4df75d2f6..3607aa593e 100644 --- a/nodebuilder/config.go +++ b/nodebuilder/config.go @@ -44,7 +44,7 @@ func DefaultConfig(tp node.Type) *Config { P2P: p2p.DefaultConfig(tp), RPC: rpc.DefaultConfig(), Gateway: gateway.DefaultConfig(), - Share: share.DefaultConfig(), + Share: share.DefaultConfig(tp), Header: header.DefaultConfig(tp), } diff --git a/nodebuilder/share/config.go b/nodebuilder/share/config.go index d843e78dd2..cd9514fb75 100644 --- a/nodebuilder/share/config.go +++ b/nodebuilder/share/config.go @@ -1,14 +1,18 @@ package share import ( - disc "github.com/celestiaorg/celestia-node/share/availability/discovery" + "fmt" + + "github.com/celestiaorg/celestia-node/nodebuilder/node" + "github.com/celestiaorg/celestia-node/share/availability/discovery" + "github.com/celestiaorg/celestia-node/share/availability/light" "github.com/celestiaorg/celestia-node/share/p2p/peers" "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" ) +// TODO: some params are pointers and other are not, Let's fix this. type Config struct { - // UseShareExchange is a flag toggling the usage of shrex protocols for blocksync. UseShareExchange bool // ShrExEDSParams sets shrexeds client and server configuration parameters ShrExEDSParams *shrexeds.Parameters @@ -16,27 +20,50 @@ type Config struct { ShrExNDParams *shrexnd.Parameters // PeerManagerParams sets peer-manager configuration parameters PeerManagerParams peers.Parameters - // Discovery sets peer discovery configuration parameters. - Discovery disc.Parameters + + LightAvailability light.Parameters `toml:",omitempty"` + Discovery discovery.Parameters } -func DefaultConfig() Config { - return Config{ - UseShareExchange: true, +func DefaultConfig(tp node.Type) Config { + cfg := Config{ + Discovery: discovery.DefaultParameters(), ShrExEDSParams: shrexeds.DefaultParameters(), ShrExNDParams: shrexnd.DefaultParameters(), + UseShareExchange: true, PeerManagerParams: peers.DefaultParameters(), - Discovery: disc.DefaultParameters(), } + + if tp == node.Light { + cfg.LightAvailability = light.DefaultParameters() + } + + return cfg } // Validate performs basic validation of the config. -func (cfg *Config) Validate() error { +func (cfg *Config) Validate(tp node.Type) error { + if tp == node.Light { + if err := cfg.LightAvailability.Validate(); err != nil { + return fmt.Errorf("nodebuilder/share: %w", err) + } + } + + if err := cfg.Discovery.Validate(); err != nil { + return fmt.Errorf("nodebuilder/share: %w", err) + } + if err := cfg.ShrExNDParams.Validate(); err != nil { - return err + return fmt.Errorf("nodebuilder/share: %w", err) } + if err := cfg.ShrExEDSParams.Validate(); err != nil { - return err + return fmt.Errorf("nodebuilder/share: %w", err) } - return cfg.PeerManagerParams.Validate() + + if err := cfg.PeerManagerParams.Validate(); err != nil { + return fmt.Errorf("nodebuilder/share: %w", err) + } + + return nil } diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index 59189243a5..a3fe691798 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -21,7 +21,7 @@ import ( "github.com/celestiaorg/celestia-node/share/getters" ) -func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery { +func newDiscovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery { return func( r routing.ContentRouting, h host.Host, @@ -29,7 +29,8 @@ func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discove return disc.NewDiscovery( h, routingdisc.NewRoutingDiscovery(r), - cfg.Discovery, + disc.WithPeersLimit(cfg.Discovery.PeersLimit), + disc.WithAdvertiseInterval(cfg.Discovery.AdvertiseInterval), ) } } diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index a43194d53d..6cfcfdb475 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -24,7 +24,7 @@ import ( func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option { // sanitize config values before constructing module - cfgErr := cfg.Validate() + cfgErr := cfg.Validate(tp) baseComponents := fx.Options( fx.Supply(*cfg), @@ -33,7 +33,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fx.Provide(newModule), fx.Invoke(func(disc *disc.Discovery) {}), fx.Provide(fx.Annotate( - discovery(*cfg), + newDiscovery(*cfg), fx.OnStart(func(ctx context.Context, d *disc.Discovery) error { return d.Start(ctx) }), @@ -170,6 +170,11 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option return fx.Module( "share", baseComponents, + fx.Provide(func() []light.Option { + return []light.Option{ + light.WithSampleAmount(cfg.LightAvailability.SampleAmount), + } + }), shrexGetterComponents, fx.Invoke(share.EnsureEmptySquareExists), fx.Provide(getters.NewIPLDGetter), diff --git a/share/availability.go b/share/availability.go index f02a9f55f2..9538573114 100644 --- a/share/availability.go +++ b/share/availability.go @@ -4,7 +4,7 @@ import ( "context" "errors" - "github.com/celestiaorg/celestia-app/pkg/da" + da "github.com/celestiaorg/celestia-app/pkg/da" ) // ErrNotAvailable is returned whenever DA sampling fails. @@ -15,11 +15,9 @@ var ErrNotAvailable = errors.New("share: data not available") type Root = da.DataAvailabilityHeader // Availability defines interface for validation of Shares' availability. -// -//go:generate mockgen -destination=availability/mocks/availability.go -package=mocks . Availability type Availability interface { // SharesAvailable subjectively validates if Shares committed to the given Root are available on - // the Network by requesting the EDS from the provided peers. + // the Network. SharesAvailable(context.Context, *Root) error // ProbabilityOfAvailability calculates the probability of the data square // being available based on the number of samples collected. diff --git a/share/availability/cache/availability.go b/share/availability/cache/availability.go index bd061b4e9b..d6496f7ff8 100644 --- a/share/availability/cache/availability.go +++ b/share/availability/cache/availability.go @@ -15,17 +15,12 @@ import ( "github.com/celestiaorg/celestia-node/share" ) -var log = logging.Logger("share/cache") - var ( - // DefaultWriteBatchSize defines the size of the batched header write. - // Headers are written in batches not to thrash the underlying Datastore with writes. - // TODO(@Wondertan, @renaynay): Those values must be configurable and proper defaults should be set - // for specific node type. (#709) - DefaultWriteBatchSize = 2048 - cacheAvailabilityPrefix = datastore.NewKey("sampling_result") - + log = logging.Logger("share/cache") minRoot = da.MinDataAvailabilityHeader() + + cacheAvailabilityPrefix = datastore.NewKey("sampling_result") + writeBatchSize = 2048 ) // ShareAvailability wraps a given share.Availability (whether it's light or full) @@ -42,9 +37,12 @@ type ShareAvailability struct { // NewShareAvailability wraps the given share.Availability with an additional datastore // for sampling result caching. -func NewShareAvailability(avail share.Availability, ds datastore.Batching) *ShareAvailability { +func NewShareAvailability( + avail share.Availability, + ds datastore.Batching, +) *ShareAvailability { ds = namespace.Wrap(ds, cacheAvailabilityPrefix) - autoDS := autobatch.NewAutoBatching(ds, DefaultWriteBatchSize) + autoDS := autobatch.NewAutoBatching(ds, writeBatchSize) return &ShareAvailability{ avail: avail, diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index cd1ccbb8f8..a9b0304dbc 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -35,53 +35,24 @@ const ( defaultRetryTimeout = time.Second ) -type Parameters struct { - // PeersLimit defines the soft limit of FNs to connect to via discovery. - // Set 0 to disable. - PeersLimit uint - // AdvertiseInterval is a interval between advertising sessions. - // Set -1 to disable. - // NOTE: only full and bridge can advertise themselves. - AdvertiseInterval time.Duration - // discoveryRetryTimeout is an interval between discovery attempts - // when we discovered lower than PeersLimit peers. - // Set -1 to disable. - discoveryRetryTimeout time.Duration -} - -func (p Parameters) withDefaults() Parameters { - def := DefaultParameters() - if p.AdvertiseInterval == 0 { - p.AdvertiseInterval = def.AdvertiseInterval - } - if p.discoveryRetryTimeout == 0 { - p.discoveryRetryTimeout = defaultRetryTimeout - } - return p -} - -func DefaultParameters() Parameters { - return Parameters{ - PeersLimit: 5, - // based on https://github.com/libp2p/go-libp2p-kad-dht/pull/793 - AdvertiseInterval: time.Hour * 22, - } -} +// defaultRetryTimeout defines time interval between discovery attempts. +var discoveryRetryTimeout = defaultRetryTimeout // Discovery combines advertise and discover services and allows to store discovered nodes. // TODO: The code here gets horribly hairy, so we should refactor this at some point type Discovery struct { - params Parameters - - set *limitedSet - host host.Host - disc discovery.Discovery - connector *backoffConnector + set *limitedSet + host host.Host + disc discovery.Discovery + connector *backoffConnector + // onUpdatedPeers will be called on peer set changes onUpdatedPeers OnUpdatedPeers triggerDisc chan struct{} cancel context.CancelFunc + + params Parameters } type OnUpdatedPeers func(peerID peer.ID, isAdded bool) @@ -90,15 +61,21 @@ type OnUpdatedPeers func(peerID peer.ID, isAdded bool) func NewDiscovery( h host.Host, d discovery.Discovery, - params Parameters, + opts ...Option, ) *Discovery { + params := DefaultParameters() + + for _, opt := range opts { + opt(¶ms) + } + return &Discovery{ - params: params.withDefaults(), set: newLimitedSet(params.PeersLimit), host: h, disc: d, connector: newBackoffConnector(h, defaultBackoffFactory), onUpdatedPeers: func(peer.ID, bool) {}, + params: params, triggerDisc: make(chan struct{}), } } @@ -191,7 +168,7 @@ func (d *Discovery) Advertise(ctx context.Context) { // discoveryLoop ensures we always have '~peerLimit' connected peers. // It starts peer discovery per request and restarts the process until the soft limit reached. func (d *Discovery) discoveryLoop(ctx context.Context) { - t := time.NewTicker(d.params.discoveryRetryTimeout) + t := time.NewTicker(discoveryRetryTimeout) defer t.Stop() for { // drain all previous ticks from channel diff --git a/share/availability/discovery/discovery_test.go b/share/availability/discovery/discovery_test.go index fd88a98586..eada732aa9 100644 --- a/share/availability/discovery/discovery_test.go +++ b/share/availability/discovery/discovery_test.go @@ -24,11 +24,11 @@ func TestDiscovery(t *testing.T) { tn := newTestnet(ctx, t) - peerA := tn.discovery(Parameters{ - PeersLimit: nodes, - discoveryRetryTimeout: time.Millisecond * 100, - AdvertiseInterval: -1, // we don't want to be found but only find - }) + peerA := tn.discovery( + WithPeersLimit(nodes), + WithAdvertiseInterval(-1), + ) + discoveryRetryTimeout = time.Millisecond * 100 // defined in discovery.go type peerUpdate struct { peerID peer.ID @@ -41,11 +41,8 @@ func TestDiscovery(t *testing.T) { discs := make([]*Discovery, nodes) for i := range discs { - discs[i] = tn.discovery(Parameters{ - PeersLimit: 0, - discoveryRetryTimeout: -1, - AdvertiseInterval: time.Millisecond * 100, - }) + discs[i] = tn.discovery(WithPeersLimit(0), WithAdvertiseInterval(time.Millisecond*100)) + discoveryRetryTimeout = -1 // defined in discovery.go select { case res := <-updateCh: @@ -98,9 +95,9 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet { return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)} } -func (t *testnet) discovery(params Parameters) *Discovery { +func (t *testnet) discovery(opts ...Option) *Discovery { hst, routingDisc := t.peer() - disc := NewDiscovery(hst, routingDisc, params) + disc := NewDiscovery(hst, routingDisc, opts...) err := disc.Start(t.ctx) require.NoError(t.T, err) t.T.Cleanup(func() { diff --git a/share/availability/discovery/options.go b/share/availability/discovery/options.go new file mode 100644 index 0000000000..8a6a162e11 --- /dev/null +++ b/share/availability/discovery/options.go @@ -0,0 +1,59 @@ +package discovery + +import ( + "fmt" + "time" +) + +// Parameters is the set of Parameters that must be configured for the Discovery module +type Parameters struct { + // PeersLimit defines the soft limit of FNs to connect to via discovery. + // Set 0 to disable. + PeersLimit uint + // AdvertiseInterval is a interval between advertising sessions. + // Set -1 to disable. + // NOTE: only full and bridge can advertise themselves. + AdvertiseInterval time.Duration +} + +// Option is a function that configures Discovery Parameters +type Option func(*Parameters) + +// DefaultParameters returns the default Parameters' configuration values +// for the Discovery module +func DefaultParameters() Parameters { + return Parameters{ + PeersLimit: 5, + // based on https://github.com/libp2p/go-libp2p-kad-dht/pull/793 + AdvertiseInterval: time.Hour * 22, + } +} + +// Validate validates the values in Parameters +func (p *Parameters) Validate() error { + if p.AdvertiseInterval <= 0 { + return fmt.Errorf( + "discovery: invalid option: value AdvertiseInterval %s, %s", + "is 0 or negative.", + "value must be positive", + ) + } + + return nil +} + +// WithPeersLimit is a functional option that Discovery +// uses to set the PeersLimit configuration param +func WithPeersLimit(peersLimit uint) Option { + return func(p *Parameters) { + p.PeersLimit = peersLimit + } +} + +// WithAdvertiseInterval is a functional option that Discovery +// uses to set the AdvertiseInterval configuration param +func WithAdvertiseInterval(advInterval time.Duration) Option { + return func(p *Parameters) { + p.AdvertiseInterval = advInterval + } +} diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index 9eef8d1372..b5d1d439a5 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -27,7 +27,11 @@ type ShareAvailability struct { } // NewShareAvailability creates a new full ShareAvailability. -func NewShareAvailability(store *eds.Store, getter share.Getter, disc *discovery.Discovery) *ShareAvailability { +func NewShareAvailability( + store *eds.Store, + getter share.Getter, + disc *discovery.Discovery, +) *ShareAvailability { return &ShareAvailability{ store: store, getter: getter, diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index df4561a8eb..1e86b1e381 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -37,10 +37,11 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { } func TestAvailability(getter share.Getter) *ShareAvailability { - params := discovery.DefaultParameters() - params.AdvertiseInterval = time.Second - params.PeersLimit = 10 - disc := discovery.NewDiscovery(nil, - routing.NewRoutingDiscovery(routinghelpers.Null{}), params) + disc := discovery.NewDiscovery( + nil, + routing.NewRoutingDiscovery(routinghelpers.Null{}), + discovery.WithAdvertiseInterval(time.Second), + discovery.WithPeersLimit(10), + ) return NewShareAvailability(nil, getter, disc) } diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index c947e65e63..07a3e801d9 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -20,14 +20,24 @@ var log = logging.Logger("share/light") // on the network doing sampling over the same Root to collectively verify its availability. type ShareAvailability struct { getter share.Getter + params Parameters } // NewShareAvailability creates a new light Availability. -func NewShareAvailability(getter share.Getter) *ShareAvailability { - return &ShareAvailability{getter} +func NewShareAvailability( + getter share.Getter, + opts ...Option, +) *ShareAvailability { + params := DefaultParameters() + + for _, opt := range opts { + opt(¶ms) + } + + return &ShareAvailability{getter, params} } -// SharesAvailable randomly samples DefaultSampleAmount amount of Shares committed to the given +// SharesAvailable randomly samples `params.SampleAmount` amount of Shares committed to the given // Root. This way SharesAvailable subjectively verifies that Shares are available. func (la *ShareAvailability) SharesAvailable(ctx context.Context, dah *share.Root) error { log.Debugw("Validate availability", "root", dah.String()) @@ -38,7 +48,7 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, dah *share.Roo "err", err) panic(err) } - samples, err := SampleSquare(len(dah.RowsRoots), DefaultSampleAmount) + samples, err := SampleSquare(len(dah.RowsRoots), int(la.params.SampleAmount)) if err != nil { return err } @@ -90,9 +100,9 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, dah *share.Roo // ProbabilityOfAvailability calculates the probability that the // data square is available based on the amount of samples collected -// (DefaultSampleAmount). +// (params.SampleAmount). // // Formula: 1 - (0.75 ** amount of samples) func (la *ShareAvailability) ProbabilityOfAvailability(context.Context) float64 { - return 1 - math.Pow(0.75, float64(DefaultSampleAmount)) + return 1 - math.Pow(0.75, float64(la.params.SampleAmount)) } diff --git a/share/availability/light/options.go b/share/availability/light/options.go new file mode 100644 index 0000000000..80fd27acfd --- /dev/null +++ b/share/availability/light/options.go @@ -0,0 +1,50 @@ +package light + +import ( + "fmt" +) + +// SampleAmount specifies the minimum required amount of samples a light node must perform +// before declaring that a block is available +var ( + DefaultSampleAmount uint = 16 +) + +// Parameters is the set of Parameters that must be configured for the light +// availability implementation +type Parameters struct { + SampleAmount uint // The minimum required amount of samples to perform +} + +// Option is a function that configures light availability Parameters +type Option func(*Parameters) + +// DefaultParameters returns the default Parameters' configuration values +// for the light availability implementation +func DefaultParameters() Parameters { + return Parameters{ + SampleAmount: DefaultSampleAmount, + } +} + +// Validate validates the values in Parameters +func (p *Parameters) Validate() error { + if p.SampleAmount <= 0 { + return fmt.Errorf( + "light availability: invalid option: value %s was %s, where it should be %s", + "SampleAmount", + "<= 0", // current value + ">= 0", // what the value should be + ) + } + + return nil +} + +// WithSampleAmount is a functional option that the Availability interface +// implementers use to set the SampleAmount configuration param +func WithSampleAmount(sampleAmount uint) Option { + return func(p *Parameters) { + p.SampleAmount = sampleAmount + } +} diff --git a/share/availability/light/sample.go b/share/availability/light/sample.go index 12d8505397..e66ff9aafe 100644 --- a/share/availability/light/sample.go +++ b/share/availability/light/sample.go @@ -6,10 +6,6 @@ import ( "math/big" ) -// DefaultSampleAmount sets the default amount of samples to be sampled from the network by -// ShareAvailability. -var DefaultSampleAmount = 16 - // Sample is a point in 2D space over square. type Sample struct { Row, Col int diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index b93a40d488..01322b014c 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -158,10 +158,9 @@ func testManager(ctx context.Context, host host.Host, headerSub libhead.Subscrib } disc := discovery.NewDiscovery(nil, - routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), discovery.Parameters{ - PeersLimit: 10, - AdvertiseInterval: time.Second, - }, + routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), + discovery.WithPeersLimit(10), + discovery.WithAdvertiseInterval(time.Second), ) connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore())) if err != nil { diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index 804dd4b673..cdcffef4db 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -393,10 +393,8 @@ func TestIntegration(t *testing.T) { bnDisc := discovery.NewDiscovery( nw.Hosts()[0], routingdisc.NewRoutingDiscovery(router1), - discovery.Parameters{ - PeersLimit: 0, - AdvertiseInterval: time.Second, - }, + discovery.WithPeersLimit(0), + discovery.WithAdvertiseInterval(time.Second), ) // set up full node / receiver node @@ -406,10 +404,8 @@ func TestIntegration(t *testing.T) { fnDisc := discovery.NewDiscovery( nw.Hosts()[1], routingdisc.NewRoutingDiscovery(router2), - discovery.Parameters{ - PeersLimit: 10, - AdvertiseInterval: time.Second, - }, + discovery.WithPeersLimit(10), + discovery.WithAdvertiseInterval(time.Second), ) err = fnDisc.Start(ctx) require.NoError(t, err) @@ -463,10 +459,10 @@ func testManager(ctx context.Context, headerSub libhead.Subscriber[*header.Exten } disc := discovery.NewDiscovery(nil, - routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), discovery.Parameters{ - PeersLimit: 0, - AdvertiseInterval: time.Second, - }) + routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), + discovery.WithPeersLimit(0), + discovery.WithAdvertiseInterval(time.Second), + ) connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) if err != nil { return nil, err