Skip to content

Commit

Permalink
refactor(nodebuiler): introduce ServiceBreaker and update FX (#2091)
Browse files Browse the repository at this point in the history
This PR solves two problems:
* Updates FX and fixes issues we observed in [the
PR](#1801)
* Fixes #2041

Surprisingly, those two problems were related, so I decided to fix them
once and for all. The issue with FX happens after [this
change](uber-go/fx#983). The outcome of this
change is summarized:
> In other words, lifecycle hook annotations can no longer pull in extra
> dependencies outside of things on which
> the annotated constructor is dependent, results that the annotated
> constructor provides, context.Context
> object which is injected by Lifecycle, and the Lifecycle object itself.

Our current code does pull extra dependencies in the service having
`modfraud.Lifecycle` on them, like
[DASer](https://github.com/celestiaorg/celestia-node/blob/main/nodebuilder/das/module.go#L47).
Specifically, it pulls FraudService, and this is no longer allowed. This
forces us to rewrite the fraud lifecycling and here is the solution,
which additionally satisfies #2041.

This also unblocks
#2040, which is now
implemented in celestiaorg/go-fraud#1

I tried to split FX update and the refactor into two diff PRs. However,
the solution does not work with the old FX version, so we have to couple
those.

The chain here is that I needed a new version of FX that fixes
`OnStart/OnStop` hooks, and updating created the whole story. The PR
that does clean-ups basing on new version of FX will com right after.
  • Loading branch information
Wondertan committed Apr 18, 2023
1 parent b8c24bf commit bbe68bb
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 60 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.34.0
go.opentelemetry.io/otel/trace v1.13.0
go.opentelemetry.io/proto/otlp v0.19.0
go.uber.org/fx v1.18.2
go.uber.org/fx v1.19.2
golang.org/x/crypto v0.7.0
golang.org/x/sync v0.1.0
golang.org/x/text v0.8.0
Expand Down Expand Up @@ -304,7 +304,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.15.0 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2069,10 +2069,10 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/dig v1.15.0 h1:vq3YWr8zRj1eFGC7Gvf907hE0eRjPTZ1d3xHadD6liE=
go.uber.org/dig v1.15.0/go.mod h1:pKHs0wMynzL6brANhB2hLMro+zalv1osARTviTcqHLM=
go.uber.org/fx v1.18.2 h1:bUNI6oShr+OVFQeU8cDNbnN7VFsu+SsjHzUF51V/GAU=
go.uber.org/fx v1.18.2/go.mod h1:g0V1KMQ66zIRk8bLu3Ea5Jt2w/cHlOIp4wdRsgh0JaY=
go.uber.org/dig v1.16.1 h1:+alNIBsl0qfY0j6epRubp/9obgtrObRAc5aD+6jbWY8=
go.uber.org/dig v1.16.1/go.mod h1:557JTAUZT5bUK0SvCwikmLPPtdQhfvLYtO5tJgQSbnk=
go.uber.org/fx v1.19.2 h1:SyFgYQFr1Wl0AYstE8vyYIzP4bFz2URrScjwC4cwUvY=
go.uber.org/fx v1.19.2/go.mod h1:43G1VcqSzbIv77y00p1DRAsyZS8WdzuYdhZXmEUkMyQ=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
Expand Down
15 changes: 13 additions & 2 deletions nodebuilder/das/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (

"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/header"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

Expand Down Expand Up @@ -43,6 +45,15 @@ func newDASer(
fraudServ fraud.Service,
bFn shrexsub.BroadcastFn,
options ...das.Option,
) (*das.DASer, error) {
return das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...)
) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer], error) {
ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...)
if err != nil {
return nil, nil, err
}

return ds, &modfraud.ServiceBreaker[*das.DASer]{
Service: ds,
FraudServ: fraudServ,
FraudType: byzantine.BadEncoding,
}, nil
}
14 changes: 5 additions & 9 deletions nodebuilder/das/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ import (

"go.uber.org/fx"

"github.com/celestiaorg/go-fraud"

"github.com/celestiaorg/celestia-node/das"
fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
)

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
Expand Down Expand Up @@ -44,12 +41,11 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
fx.Provide(fx.Annotate(
newDASer,
fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, das *das.DASer) error {
return fraudServ.Lifecycle(startCtx, ctx, byzantine.BadEncoding, fservice,
das.Start, das.Stop)
fx.OnStart(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*das.DASer]) error {
return breaker.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, das *das.DASer) error {
return das.Stop(ctx)
fx.OnStop(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*das.DASer]) error {
return breaker.Stop(ctx)
}),
)),
// Module is needed for the RPC handler
Expand Down
79 changes: 58 additions & 21 deletions nodebuilder/fraud/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,78 @@ package fraud

import (
"context"
"time"
"fmt"

"github.com/ipfs/go-datastore"

"github.com/celestiaorg/go-fraud"
)

// Lifecycle controls the lifecycle of service depending on fraud proofs.
// It starts the service only if no fraud-proof exists and stops the service automatically
// if a proof arrives after the service was started.
func Lifecycle(
startCtx, lifecycleCtx context.Context,
p fraud.ProofType,
fraudServ fraud.Service,
start, stop func(context.Context) error,
) error {
proofs, err := fraudServ.Get(startCtx, p)
// service defines minimal interface with service lifecycle methods
type service interface {
Start(context.Context) error
Stop(context.Context) error
}

// ServiceBreaker wraps any service with fraud proof subscription of a specific type.
// If proof happens the service is Stopped automatically.
// TODO(@Wondertan): Support multiple fraud types.
type ServiceBreaker[S service] struct {
Service S
FraudType fraud.ProofType
FraudServ fraud.Service

ctx context.Context
cancel context.CancelFunc
sub fraud.Subscription
}

// Start starts the inner service if there are no fraud proofs stored.
// Subscribes for fraud and stops the service whenever necessary.
func (breaker *ServiceBreaker[S]) Start(ctx context.Context) error {
proofs, err := breaker.FraudServ.Get(ctx, breaker.FraudType)
switch err {
default:
return err
return fmt.Errorf("getting proof(%s): %w", breaker.FraudType, err)
case nil:
return &fraud.ErrFraudExists{Proof: proofs}
case datastore.ErrNotFound:
}
err = start(startCtx)

err = breaker.Service.Start(ctx)
if err != nil {
return err
}
// handle incoming Fraud Proofs
go fraud.OnProof(lifecycleCtx, fraudServ, p, func(fraud.Proof) {
ctx, cancel := context.WithTimeout(lifecycleCtx, time.Minute)
defer cancel()
if err := stop(ctx); err != nil {
log.Error(err)
}
})

breaker.sub, err = breaker.FraudServ.Subscribe(breaker.FraudType)
if err != nil {
return fmt.Errorf("subscribing for proof(%s): %w", breaker.FraudType, err)
}

breaker.ctx, breaker.cancel = context.WithCancel(context.Background())
go breaker.awaitProof()
return nil
}

// Stop stops the service and cancels subscription.
func (breaker *ServiceBreaker[S]) Stop(ctx context.Context) error {
if breaker.ctx.Err() != nil {
// short circuit if the service was already stopped
return nil
}

breaker.sub.Cancel()
breaker.cancel()
return breaker.Service.Stop(ctx)
}

func (breaker *ServiceBreaker[S]) awaitProof() {
_, err := breaker.sub.Proof(breaker.ctx)
if err != nil {
return
}

if err := breaker.Stop(breaker.ctx); err != nil && err != context.Canceled {
log.Errorw("stopping service: %s", err.Error())
}
}
17 changes: 15 additions & 2 deletions nodebuilder/header/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.uber.org/fx"

libfraud "github.com/celestiaorg/go-fraud"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/p2p"
"github.com/celestiaorg/go-header/store"
"github.com/celestiaorg/go-header/sync"

"github.com/celestiaorg/celestia-node/header"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
)

// newP2PServer constructs a new ExchangeServer using the given Network as a protocolID prefix.
Expand Down Expand Up @@ -72,11 +75,21 @@ func newP2PExchange(cfg Config) func(
// newSyncer constructs new Syncer for headers.
func newSyncer(
ex libhead.Exchange[*header.ExtendedHeader],
fservice libfraud.Service,
store InitStore,
sub libhead.Subscriber[*header.ExtendedHeader],
opts []sync.Options,
) (*sync.Syncer[*header.ExtendedHeader], error) {
return sync.NewSyncer[*header.ExtendedHeader](ex, store, sub, opts...)
) (*sync.Syncer[*header.ExtendedHeader], *modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]], error) {
syncer, err := sync.NewSyncer[*header.ExtendedHeader](ex, store, sub, opts...)
if err != nil {
return nil, nil, err
}

return syncer, &modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]]{
Service: syncer,
FraudType: byzantine.BadEncoding,
FraudServ: fservice,
}, nil
}

// InitStore is a type representing initialized header store.
Expand Down
17 changes: 8 additions & 9 deletions nodebuilder/header/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"

"github.com/celestiaorg/go-fraud"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/p2p"
"github.com/celestiaorg/go-header/store"
Expand All @@ -18,7 +17,6 @@ import (
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
)

var log = logging.Logger("module/header")
Expand Down Expand Up @@ -73,15 +71,16 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(fx.Annotate(
newSyncer,
fx.OnStart(func(
startCtx, ctx context.Context,
fservice fraud.Service,
syncer *sync.Syncer[*header.ExtendedHeader],
ctx context.Context,
breaker *modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]],
) error {
return modfraud.Lifecycle(startCtx, ctx, byzantine.BadEncoding, fservice,
syncer.Start, syncer.Stop)
return breaker.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, syncer *sync.Syncer[*header.ExtendedHeader]) error {
return syncer.Stop(ctx)
fx.OnStop(func(
ctx context.Context,
breaker *modfraud.ServiceBreaker[*sync.Syncer[*header.ExtendedHeader]],
) error {
return breaker.Stop(ctx)
}),
)),
fx.Provide(fx.Annotate(
Expand Down
1 change: 0 additions & 1 deletion nodebuilder/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func (n *Node) Stop(ctx context.Context) error {
func newNode(opts ...fx.Option) (*Node, error) {
node := new(Node)
app := fx.New(
fx.NopLogger,
fx.Populate(node),
fx.Options(opts...),
)
Expand Down
14 changes: 12 additions & 2 deletions nodebuilder/state/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package state

import (
apptypes "github.com/celestiaorg/celestia-app/x/blob/types"
libfraud "github.com/celestiaorg/go-fraud"
"github.com/celestiaorg/go-header/sync"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/state"
)

Expand All @@ -15,6 +18,13 @@ func coreAccessor(
corecfg core.Config,
signer *apptypes.KeyringSigner,
sync *sync.Syncer[*header.ExtendedHeader],
) *state.CoreAccessor {
return state.NewCoreAccessor(signer, sync, corecfg.IP, corecfg.RPCPort, corecfg.GRPCPort)
fraudServ libfraud.Service,
) (*state.CoreAccessor, *modfraud.ServiceBreaker[*state.CoreAccessor]) {
ca := state.NewCoreAccessor(signer, sync, corecfg.IP, corecfg.RPCPort, corecfg.GRPCPort)

return ca, &modfraud.ServiceBreaker[*state.CoreAccessor]{
Service: ca,
FraudType: byzantine.BadEncoding,
FraudServ: fraudServ,
}
}
13 changes: 5 additions & 8 deletions nodebuilder/state/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ import (
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

"github.com/celestiaorg/go-fraud"

fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/state"
)

Expand All @@ -27,11 +24,11 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Error(cfgErr),
fx.Provide(fx.Annotate(
coreAccessor,
fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, ca *state.CoreAccessor) error {
return fraudServ.Lifecycle(startCtx, ctx, byzantine.BadEncoding, fservice, ca.Start, ca.Stop)
fx.OnStart(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*state.CoreAccessor]) error {
return breaker.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, ca *state.CoreAccessor) error {
return ca.Stop(ctx)
fx.OnStop(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*state.CoreAccessor]) error {
return breaker.Stop(ctx)
}),
)),
// the module is needed for the handler
Expand Down

0 comments on commit bbe68bb

Please sign in to comment.