diff --git a/go.mod b/go.mod index 719ded3e9c..8a05b6dd72 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.34.0 go.opentelemetry.io/otel/trace v1.13.0 go.opentelemetry.io/proto/otlp v0.19.0 - go.uber.org/fx v1.18.2 + go.uber.org/fx v1.19.2 golang.org/x/crypto v0.7.0 golang.org/x/sync v0.1.0 golang.org/x/text v0.8.0 @@ -304,7 +304,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/dig v1.15.0 // indirect + go.uber.org/dig v1.16.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect diff --git a/go.sum b/go.sum index aea97478f3..9c542114b6 100644 --- a/go.sum +++ b/go.sum @@ -2069,10 +2069,10 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/dig v1.15.0 h1:vq3YWr8zRj1eFGC7Gvf907hE0eRjPTZ1d3xHadD6liE= -go.uber.org/dig v1.15.0/go.mod h1:pKHs0wMynzL6brANhB2hLMro+zalv1osARTviTcqHLM= -go.uber.org/fx v1.18.2 h1:bUNI6oShr+OVFQeU8cDNbnN7VFsu+SsjHzUF51V/GAU= -go.uber.org/fx v1.18.2/go.mod h1:g0V1KMQ66zIRk8bLu3Ea5Jt2w/cHlOIp4wdRsgh0JaY= +go.uber.org/dig v1.16.1 h1:+alNIBsl0qfY0j6epRubp/9obgtrObRAc5aD+6jbWY8= +go.uber.org/dig v1.16.1/go.mod h1:557JTAUZT5bUK0SvCwikmLPPtdQhfvLYtO5tJgQSbnk= +go.uber.org/fx v1.19.2 h1:SyFgYQFr1Wl0AYstE8vyYIzP4bFz2URrScjwC4cwUvY= +go.uber.org/fx v1.19.2/go.mod h1:43G1VcqSzbIv77y00p1DRAsyZS8WdzuYdhZXmEUkMyQ= go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= diff --git a/nodebuilder/das/constructors.go b/nodebuilder/das/constructors.go index 40341b686a..18f6962f40 100644 --- a/nodebuilder/das/constructors.go +++ b/nodebuilder/das/constructors.go @@ -11,7 +11,9 @@ import ( "github.com/celestiaorg/celestia-node/das" "github.com/celestiaorg/celestia-node/header" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" ) @@ -43,6 +45,15 @@ func newDASer( fraudServ fraud.Service, bFn shrexsub.BroadcastFn, options ...das.Option, -) (*das.DASer, error) { - return das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...) +) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer], error) { + ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...) + if err != nil { + return nil, nil, err + } + + return ds, &modfraud.ServiceBreaker[*das.DASer]{ + Service: ds, + FraudServ: fraudServ, + FraudType: byzantine.BadEncoding, + }, nil } diff --git a/nodebuilder/das/module.go b/nodebuilder/das/module.go index 59e8d36f69..61c935fd40 100644 --- a/nodebuilder/das/module.go +++ b/nodebuilder/das/module.go @@ -5,12 +5,9 @@ import ( "go.uber.org/fx" - "github.com/celestiaorg/go-fraud" - "github.com/celestiaorg/celestia-node/das" - fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/node" - "github.com/celestiaorg/celestia-node/share/eds/byzantine" ) func ConstructModule(tp node.Type, cfg *Config) fx.Option { @@ -44,12 +41,11 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { baseComponents, fx.Provide(fx.Annotate( newDASer, - fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, das *das.DASer) error { - return fraudServ.Lifecycle(startCtx, ctx, byzantine.BadEncoding, fservice, - das.Start, das.Stop) + fx.OnStart(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*das.DASer]) error { + return breaker.Start(ctx) }), - fx.OnStop(func(ctx context.Context, das *das.DASer) error { - return das.Stop(ctx) + fx.OnStop(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*das.DASer]) error { + return breaker.Stop(ctx) }), )), // Module is needed for the RPC handler diff --git a/nodebuilder/fraud/lifecycle.go b/nodebuilder/fraud/lifecycle.go index 6db7bc7266..24ed402f5d 100644 --- a/nodebuilder/fraud/lifecycle.go +++ b/nodebuilder/fraud/lifecycle.go @@ -2,41 +2,78 @@ package fraud import ( "context" - "time" + "fmt" "github.com/ipfs/go-datastore" "github.com/celestiaorg/go-fraud" ) -// Lifecycle controls the lifecycle of service depending on fraud proofs. -// It starts the service only if no fraud-proof exists and stops the service automatically -// if a proof arrives after the service was started. -func Lifecycle( - startCtx, lifecycleCtx context.Context, - p fraud.ProofType, - fraudServ fraud.Service, - start, stop func(context.Context) error, -) error { - proofs, err := fraudServ.Get(startCtx, p) +// service defines minimal interface with service lifecycle methods +type service interface { + Start(context.Context) error + Stop(context.Context) error +} + +// ServiceBreaker wraps any service with fraud proof subscription of a specific type. +// If proof happens the service is Stopped automatically. +// TODO(@Wondertan): Support multiple fraud types. +type ServiceBreaker[S service] struct { + Service S + FraudType fraud.ProofType + FraudServ fraud.Service + + ctx context.Context + cancel context.CancelFunc + sub fraud.Subscription +} + +// Start starts the inner service if there are no fraud proofs stored. +// Subscribes for fraud and stops the service whenever necessary. +func (breaker *ServiceBreaker[S]) Start(ctx context.Context) error { + proofs, err := breaker.FraudServ.Get(ctx, breaker.FraudType) switch err { default: - return err + return fmt.Errorf("getting proof(%s): %w", breaker.FraudType, err) case nil: return &fraud.ErrFraudExists{Proof: proofs} case datastore.ErrNotFound: } - err = start(startCtx) + + err = breaker.Service.Start(ctx) if err != nil { return err } - // handle incoming Fraud Proofs - go fraud.OnProof(lifecycleCtx, fraudServ, p, func(fraud.Proof) { - ctx, cancel := context.WithTimeout(lifecycleCtx, time.Minute) - defer cancel() - if err := stop(ctx); err != nil { - log.Error(err) - } - }) + + breaker.sub, err = breaker.FraudServ.Subscribe(breaker.FraudType) + if err != nil { + return fmt.Errorf("subscribing for proof(%s): %w", breaker.FraudType, err) + } + + breaker.ctx, breaker.cancel = context.WithCancel(context.Background()) + go breaker.awaitProof() return nil } + +// Stop stops the service and cancels subscription. +func (breaker *ServiceBreaker[S]) Stop(ctx context.Context) error { + if breaker.ctx.Err() != nil { + // short circuit if the service was already stopped + return nil + } + + breaker.sub.Cancel() + breaker.cancel() + return breaker.Service.Stop(ctx) +} + +func (breaker *ServiceBreaker[S]) awaitProof() { + _, err := breaker.sub.Proof(breaker.ctx) + if err != nil { + return + } + + if err := breaker.Stop(breaker.ctx); err != nil && err != context.Canceled { + log.Errorw("stopping service: %s", err.Error()) + } +} diff --git a/nodebuilder/header/constructors.go b/nodebuilder/header/constructors.go index 4650ed77d9..e6bb6d6c48 100644 --- a/nodebuilder/header/constructors.go +++ b/nodebuilder/header/constructors.go @@ -9,13 +9,16 @@ import ( "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.uber.org/fx" + libfraud "github.com/celestiaorg/go-fraud" libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/p2p" "github.com/celestiaorg/go-header/store" "github.com/celestiaorg/go-header/sync" "github.com/celestiaorg/celestia-node/header" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/share/eds/byzantine" ) // newP2PServer constructs a new ExchangeServer using the given Network as a protocolID prefix. @@ -72,11 +75,21 @@ func newP2PExchange(cfg Config) func( // newSyncer constructs new Syncer for headers. func newSyncer( ex libhead.Exchange[*header.ExtendedHeader], + fservice libfraud.Service, store InitStore, sub libhead.Subscriber[*header.ExtendedHeader], opts []sync.Options, -) (*sync.Syncer[*header.ExtendedHeader], error) { - return sync.NewSyncer[*header.ExtendedHeader](ex, store, sub, opts...) +) (*sync.Syncer[*header.ExtendedHeader], *modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]], error) { + syncer, err := sync.NewSyncer[*header.ExtendedHeader](ex, store, sub, opts...) + if err != nil { + return nil, nil, err + } + + return syncer, &modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]]{ + Service: syncer, + FraudType: byzantine.BadEncoding, + FraudServ: fservice, + }, nil } // InitStore is a type representing initialized header store. diff --git a/nodebuilder/header/module.go b/nodebuilder/header/module.go index 97939b7101..2b94feaec8 100644 --- a/nodebuilder/header/module.go +++ b/nodebuilder/header/module.go @@ -8,7 +8,6 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" - "github.com/celestiaorg/go-fraud" libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/p2p" "github.com/celestiaorg/go-header/store" @@ -18,7 +17,6 @@ import ( modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/node" modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" - "github.com/celestiaorg/celestia-node/share/eds/byzantine" ) var log = logging.Logger("module/header") @@ -73,15 +71,16 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(fx.Annotate( newSyncer, fx.OnStart(func( - startCtx, ctx context.Context, - fservice fraud.Service, - syncer *sync.Syncer[*header.ExtendedHeader], + ctx context.Context, + breaker *modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]], ) error { - return modfraud.Lifecycle(startCtx, ctx, byzantine.BadEncoding, fservice, - syncer.Start, syncer.Stop) + return breaker.Start(ctx) }), - fx.OnStop(func(ctx context.Context, syncer *sync.Syncer[*header.ExtendedHeader]) error { - return syncer.Stop(ctx) + fx.OnStop(func( + ctx context.Context, + breaker *modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]], + ) error { + return breaker.Stop(ctx) }), )), fx.Provide(fx.Annotate( diff --git a/nodebuilder/node.go b/nodebuilder/node.go index 6968379c02..fff79cfc7f 100644 --- a/nodebuilder/node.go +++ b/nodebuilder/node.go @@ -149,7 +149,6 @@ func (n *Node) Stop(ctx context.Context) error { func newNode(opts ...fx.Option) (*Node, error) { node := new(Node) app := fx.New( - fx.NopLogger, fx.Populate(node), fx.Options(opts...), ) diff --git a/nodebuilder/state/core.go b/nodebuilder/state/core.go index 052b6eef0c..a3eb7f7b6d 100644 --- a/nodebuilder/state/core.go +++ b/nodebuilder/state/core.go @@ -2,10 +2,13 @@ package state import ( apptypes "github.com/celestiaorg/celestia-app/x/blob/types" + libfraud "github.com/celestiaorg/go-fraud" "github.com/celestiaorg/go-header/sync" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/nodebuilder/core" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" + "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/state" ) @@ -15,6 +18,13 @@ func coreAccessor( corecfg core.Config, signer *apptypes.KeyringSigner, sync *sync.Syncer[*header.ExtendedHeader], -) *state.CoreAccessor { - return state.NewCoreAccessor(signer, sync, corecfg.IP, corecfg.RPCPort, corecfg.GRPCPort) + fraudServ libfraud.Service, +) (*state.CoreAccessor, *modfraud.ServiceBreaker[*state.CoreAccessor]) { + ca := state.NewCoreAccessor(signer, sync, corecfg.IP, corecfg.RPCPort, corecfg.GRPCPort) + + return ca, &modfraud.ServiceBreaker[*state.CoreAccessor]{ + Service: ca, + FraudType: byzantine.BadEncoding, + FraudServ: fraudServ, + } } diff --git a/nodebuilder/state/module.go b/nodebuilder/state/module.go index 18c1eaaf3b..24305dabe1 100644 --- a/nodebuilder/state/module.go +++ b/nodebuilder/state/module.go @@ -6,11 +6,8 @@ import ( logging "github.com/ipfs/go-log/v2" "go.uber.org/fx" - "github.com/celestiaorg/go-fraud" - - fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/node" - "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/state" ) @@ -27,11 +24,11 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Error(cfgErr), fx.Provide(fx.Annotate( coreAccessor, - fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, ca *state.CoreAccessor) error { - return fraudServ.Lifecycle(startCtx, ctx, byzantine.BadEncoding, fservice, ca.Start, ca.Stop) + fx.OnStart(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*state.CoreAccessor]) error { + return breaker.Start(ctx) }), - fx.OnStop(func(ctx context.Context, ca *state.CoreAccessor) error { - return ca.Stop(ctx) + fx.OnStop(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*state.CoreAccessor]) error { + return breaker.Stop(ctx) }), )), // the module is needed for the handler