From bbe68bba7d3fe233b1b3e0afa08ac54681d13b1f Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Tue, 18 Apr 2023 10:24:10 +0200 Subject: [PATCH] refactor(nodebuiler): introduce ServiceBreaker and update FX (#2091) This PR solves two problems: * Updates FX and fixes issues we observed in [the PR](https://github.com/celestiaorg/celestia-node/pull/1801) * Fixes https://github.com/celestiaorg/celestia-node/issues/2041 Surprisingly, those two problems were related, so I decided to fix them once and for all. The issue with FX happens after [this change](https://github.com/uber-go/fx/pull/983). The outcome of this change is summarized: > In other words, lifecycle hook annotations can no longer pull in extra > dependencies outside of things on which > the annotated constructor is dependent, results that the annotated > constructor provides, context.Context > object which is injected by Lifecycle, and the Lifecycle object itself. Our current code does pull extra dependencies in the service having `modfraud.Lifecycle` on them, like [DASer](https://github.com/celestiaorg/celestia-node/blob/main/nodebuilder/das/module.go#L47). Specifically, it pulls FraudService, and this is no longer allowed. This forces us to rewrite the fraud lifecycling and here is the solution, which additionally satisfies #2041. This also unblocks https://github.com/celestiaorg/celestia-node/issues/2040, which is now implemented in https://github.com/celestiaorg/go-fraud/pull/1 I tried to split FX update and the refactor into two diff PRs. However, the solution does not work with the old FX version, so we have to couple those. The chain here is that I needed a new version of FX that fixes `OnStart/OnStop` hooks, and updating created the whole story. The PR that does clean-ups basing on new version of FX will com right after. --- go.mod | 4 +- go.sum | 8 +-- nodebuilder/das/constructors.go | 15 +++++- nodebuilder/das/module.go | 14 ++---- nodebuilder/fraud/lifecycle.go | 79 ++++++++++++++++++++++-------- nodebuilder/header/constructors.go | 17 ++++++- nodebuilder/header/module.go | 17 +++---- nodebuilder/node.go | 1 - nodebuilder/state/core.go | 14 +++++- nodebuilder/state/module.go | 13 ++--- 10 files changed, 122 insertions(+), 60 deletions(-) diff --git a/go.mod b/go.mod index 719ded3e9c..8a05b6dd72 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.34.0 go.opentelemetry.io/otel/trace v1.13.0 go.opentelemetry.io/proto/otlp v0.19.0 - go.uber.org/fx v1.18.2 + go.uber.org/fx v1.19.2 golang.org/x/crypto v0.7.0 golang.org/x/sync v0.1.0 golang.org/x/text v0.8.0 @@ -304,7 +304,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/dig v1.15.0 // indirect + go.uber.org/dig v1.16.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect diff --git a/go.sum b/go.sum index aea97478f3..9c542114b6 100644 --- a/go.sum +++ b/go.sum @@ -2069,10 +2069,10 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/dig v1.15.0 h1:vq3YWr8zRj1eFGC7Gvf907hE0eRjPTZ1d3xHadD6liE= -go.uber.org/dig v1.15.0/go.mod h1:pKHs0wMynzL6brANhB2hLMro+zalv1osARTviTcqHLM= -go.uber.org/fx v1.18.2 h1:bUNI6oShr+OVFQeU8cDNbnN7VFsu+SsjHzUF51V/GAU= -go.uber.org/fx v1.18.2/go.mod h1:g0V1KMQ66zIRk8bLu3Ea5Jt2w/cHlOIp4wdRsgh0JaY= +go.uber.org/dig v1.16.1 h1:+alNIBsl0qfY0j6epRubp/9obgtrObRAc5aD+6jbWY8= +go.uber.org/dig v1.16.1/go.mod h1:557JTAUZT5bUK0SvCwikmLPPtdQhfvLYtO5tJgQSbnk= +go.uber.org/fx v1.19.2 h1:SyFgYQFr1Wl0AYstE8vyYIzP4bFz2URrScjwC4cwUvY= +go.uber.org/fx v1.19.2/go.mod h1:43G1VcqSzbIv77y00p1DRAsyZS8WdzuYdhZXmEUkMyQ= go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= diff --git a/nodebuilder/das/constructors.go b/nodebuilder/das/constructors.go index 40341b686a..18f6962f40 100644 --- a/nodebuilder/das/constructors.go +++ b/nodebuilder/das/constructors.go @@ -11,7 +11,9 @@ import ( "github.com/celestiaorg/celestia-node/das" "github.com/celestiaorg/celestia-node/header" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" ) @@ -43,6 +45,15 @@ func newDASer( fraudServ fraud.Service, bFn shrexsub.BroadcastFn, options ...das.Option, -) (*das.DASer, error) { - return das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...) +) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer], error) { + ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...) + if err != nil { + return nil, nil, err + } + + return ds, &modfraud.ServiceBreaker[*das.DASer]{ + Service: ds, + FraudServ: fraudServ, + FraudType: byzantine.BadEncoding, + }, nil } diff --git a/nodebuilder/das/module.go b/nodebuilder/das/module.go index 59e8d36f69..61c935fd40 100644 --- a/nodebuilder/das/module.go +++ b/nodebuilder/das/module.go @@ -5,12 +5,9 @@ import ( "go.uber.org/fx" - "github.com/celestiaorg/go-fraud" - "github.com/celestiaorg/celestia-node/das" - fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/node" - "github.com/celestiaorg/celestia-node/share/eds/byzantine" ) func ConstructModule(tp node.Type, cfg *Config) fx.Option { @@ -44,12 +41,11 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { baseComponents, fx.Provide(fx.Annotate( newDASer, - fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, das *das.DASer) error { - return fraudServ.Lifecycle(startCtx, ctx, byzantine.BadEncoding, fservice, - das.Start, das.Stop) + fx.OnStart(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*das.DASer]) error { + return breaker.Start(ctx) }), - fx.OnStop(func(ctx context.Context, das *das.DASer) error { - return das.Stop(ctx) + fx.OnStop(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*das.DASer]) error { + return breaker.Stop(ctx) }), )), // Module is needed for the RPC handler diff --git a/nodebuilder/fraud/lifecycle.go b/nodebuilder/fraud/lifecycle.go index 6db7bc7266..24ed402f5d 100644 --- a/nodebuilder/fraud/lifecycle.go +++ b/nodebuilder/fraud/lifecycle.go @@ -2,41 +2,78 @@ package fraud import ( "context" - "time" + "fmt" "github.com/ipfs/go-datastore" "github.com/celestiaorg/go-fraud" ) -// Lifecycle controls the lifecycle of service depending on fraud proofs. -// It starts the service only if no fraud-proof exists and stops the service automatically -// if a proof arrives after the service was started. -func Lifecycle( - startCtx, lifecycleCtx context.Context, - p fraud.ProofType, - fraudServ fraud.Service, - start, stop func(context.Context) error, -) error { - proofs, err := fraudServ.Get(startCtx, p) +// service defines minimal interface with service lifecycle methods +type service interface { + Start(context.Context) error + Stop(context.Context) error +} + +// ServiceBreaker wraps any service with fraud proof subscription of a specific type. +// If proof happens the service is Stopped automatically. +// TODO(@Wondertan): Support multiple fraud types. +type ServiceBreaker[S service] struct { + Service S + FraudType fraud.ProofType + FraudServ fraud.Service + + ctx context.Context + cancel context.CancelFunc + sub fraud.Subscription +} + +// Start starts the inner service if there are no fraud proofs stored. +// Subscribes for fraud and stops the service whenever necessary. +func (breaker *ServiceBreaker[S]) Start(ctx context.Context) error { + proofs, err := breaker.FraudServ.Get(ctx, breaker.FraudType) switch err { default: - return err + return fmt.Errorf("getting proof(%s): %w", breaker.FraudType, err) case nil: return &fraud.ErrFraudExists{Proof: proofs} case datastore.ErrNotFound: } - err = start(startCtx) + + err = breaker.Service.Start(ctx) if err != nil { return err } - // handle incoming Fraud Proofs - go fraud.OnProof(lifecycleCtx, fraudServ, p, func(fraud.Proof) { - ctx, cancel := context.WithTimeout(lifecycleCtx, time.Minute) - defer cancel() - if err := stop(ctx); err != nil { - log.Error(err) - } - }) + + breaker.sub, err = breaker.FraudServ.Subscribe(breaker.FraudType) + if err != nil { + return fmt.Errorf("subscribing for proof(%s): %w", breaker.FraudType, err) + } + + breaker.ctx, breaker.cancel = context.WithCancel(context.Background()) + go breaker.awaitProof() return nil } + +// Stop stops the service and cancels subscription. +func (breaker *ServiceBreaker[S]) Stop(ctx context.Context) error { + if breaker.ctx.Err() != nil { + // short circuit if the service was already stopped + return nil + } + + breaker.sub.Cancel() + breaker.cancel() + return breaker.Service.Stop(ctx) +} + +func (breaker *ServiceBreaker[S]) awaitProof() { + _, err := breaker.sub.Proof(breaker.ctx) + if err != nil { + return + } + + if err := breaker.Stop(breaker.ctx); err != nil && err != context.Canceled { + log.Errorw("stopping service: %s", err.Error()) + } +} diff --git a/nodebuilder/header/constructors.go b/nodebuilder/header/constructors.go index 4650ed77d9..e6bb6d6c48 100644 --- a/nodebuilder/header/constructors.go +++ b/nodebuilder/header/constructors.go @@ -9,13 +9,16 @@ import ( "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.uber.org/fx" + libfraud "github.com/celestiaorg/go-fraud" libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/p2p" "github.com/celestiaorg/go-header/store" "github.com/celestiaorg/go-header/sync" "github.com/celestiaorg/celestia-node/header" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/share/eds/byzantine" ) // newP2PServer constructs a new ExchangeServer using the given Network as a protocolID prefix. @@ -72,11 +75,21 @@ func newP2PExchange(cfg Config) func( // newSyncer constructs new Syncer for headers. func newSyncer( ex libhead.Exchange[*header.ExtendedHeader], + fservice libfraud.Service, store InitStore, sub libhead.Subscriber[*header.ExtendedHeader], opts []sync.Options, -) (*sync.Syncer[*header.ExtendedHeader], error) { - return sync.NewSyncer[*header.ExtendedHeader](ex, store, sub, opts...) +) (*sync.Syncer[*header.ExtendedHeader], *modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]], error) { + syncer, err := sync.NewSyncer[*header.ExtendedHeader](ex, store, sub, opts...) + if err != nil { + return nil, nil, err + } + + return syncer, &modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]]{ + Service: syncer, + FraudType: byzantine.BadEncoding, + FraudServ: fservice, + }, nil } // InitStore is a type representing initialized header store. diff --git a/nodebuilder/header/module.go b/nodebuilder/header/module.go index 97939b7101..2b94feaec8 100644 --- a/nodebuilder/header/module.go +++ b/nodebuilder/header/module.go @@ -8,7 +8,6 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" - "github.com/celestiaorg/go-fraud" libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/p2p" "github.com/celestiaorg/go-header/store" @@ -18,7 +17,6 @@ import ( modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/node" modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" - "github.com/celestiaorg/celestia-node/share/eds/byzantine" ) var log = logging.Logger("module/header") @@ -73,15 +71,16 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(fx.Annotate( newSyncer, fx.OnStart(func( - startCtx, ctx context.Context, - fservice fraud.Service, - syncer *sync.Syncer[*header.ExtendedHeader], + ctx context.Context, + breaker *modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]], ) error { - return modfraud.Lifecycle(startCtx, ctx, byzantine.BadEncoding, fservice, - syncer.Start, syncer.Stop) + return breaker.Start(ctx) }), - fx.OnStop(func(ctx context.Context, syncer *sync.Syncer[*header.ExtendedHeader]) error { - return syncer.Stop(ctx) + fx.OnStop(func( + ctx context.Context, + breaker *modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]], + ) error { + return breaker.Stop(ctx) }), )), fx.Provide(fx.Annotate( diff --git a/nodebuilder/node.go b/nodebuilder/node.go index 6968379c02..fff79cfc7f 100644 --- a/nodebuilder/node.go +++ b/nodebuilder/node.go @@ -149,7 +149,6 @@ func (n *Node) Stop(ctx context.Context) error { func newNode(opts ...fx.Option) (*Node, error) { node := new(Node) app := fx.New( - fx.NopLogger, fx.Populate(node), fx.Options(opts...), ) diff --git a/nodebuilder/state/core.go b/nodebuilder/state/core.go index 052b6eef0c..a3eb7f7b6d 100644 --- a/nodebuilder/state/core.go +++ b/nodebuilder/state/core.go @@ -2,10 +2,13 @@ package state import ( apptypes "github.com/celestiaorg/celestia-app/x/blob/types" + libfraud "github.com/celestiaorg/go-fraud" "github.com/celestiaorg/go-header/sync" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/nodebuilder/core" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" + "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/state" ) @@ -15,6 +18,13 @@ func coreAccessor( corecfg core.Config, signer *apptypes.KeyringSigner, sync *sync.Syncer[*header.ExtendedHeader], -) *state.CoreAccessor { - return state.NewCoreAccessor(signer, sync, corecfg.IP, corecfg.RPCPort, corecfg.GRPCPort) + fraudServ libfraud.Service, +) (*state.CoreAccessor, *modfraud.ServiceBreaker[*state.CoreAccessor]) { + ca := state.NewCoreAccessor(signer, sync, corecfg.IP, corecfg.RPCPort, corecfg.GRPCPort) + + return ca, &modfraud.ServiceBreaker[*state.CoreAccessor]{ + Service: ca, + FraudType: byzantine.BadEncoding, + FraudServ: fraudServ, + } } diff --git a/nodebuilder/state/module.go b/nodebuilder/state/module.go index 18c1eaaf3b..24305dabe1 100644 --- a/nodebuilder/state/module.go +++ b/nodebuilder/state/module.go @@ -6,11 +6,8 @@ import ( logging "github.com/ipfs/go-log/v2" "go.uber.org/fx" - "github.com/celestiaorg/go-fraud" - - fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud" + modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/node" - "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/state" ) @@ -27,11 +24,11 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Error(cfgErr), fx.Provide(fx.Annotate( coreAccessor, - fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, ca *state.CoreAccessor) error { - return fraudServ.Lifecycle(startCtx, ctx, byzantine.BadEncoding, fservice, ca.Start, ca.Stop) + fx.OnStart(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*state.CoreAccessor]) error { + return breaker.Start(ctx) }), - fx.OnStop(func(ctx context.Context, ca *state.CoreAccessor) error { - return ca.Stop(ctx) + fx.OnStop(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*state.CoreAccessor]) error { + return breaker.Stop(ctx) }), )), // the module is needed for the handler