Skip to content

Commit

Permalink
feat(pruner): Implement full and bridge node pruning (#3150)
Browse files Browse the repository at this point in the history
This PR introduces `full` and `bridge` node pruning via the
`--experimental-pruning` flag. Support is included for nodes that start
from scratch with pruning enabled and also for `archival` (nodes
retaining all historical blocks) that enable the
`--experimental-pruning` flag.

_Note that this PR does not support the conversion of a pruned node into
an archival one explicitly (it would not support re-syncing deleted
blocks)._

With pruning enabled, `full` and `bridge` nodes' block stores can be
expected not to exceed ~4TB (as the upper bound).

In follow-up PRs (hardening), the following features can be expected: 
- [x] discovery for archival nodes for archival sync
- [ ] inverted_index / light node pruning
- [ ] include more metrics for errors

TODO: 
- [x] clean up some TODOs
- [x] fix one flakey unit test
- [x] change values back to the actual (GC cycle, sampling window,
pruning window, etc).
- [x] figure out whether to store error in pruner checkpoint
- [x] fix issue with pruning genesis block via findPruneableHeaders
- [x] metrics for failed prunes
- [x] set a sane default for max pruneable / consider removing
`MaxPruneablePerGC` as now context timeouts are on a per block basis
- [ ] dedup findPruneableHeader test utility
- [x] badger dep

---------

Co-authored-by: Ryan <ryanford@poluglottos.com>
  • Loading branch information
renaynay and distractedm1nd committed Mar 28, 2024
1 parent 137a1ad commit 02030bf
Show file tree
Hide file tree
Showing 23 changed files with 1,078 additions and 78 deletions.
3 changes: 3 additions & 0 deletions cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/pruner"
"github.com/celestiaorg/celestia-node/nodebuilder/rpc"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
)
Expand All @@ -22,6 +23,7 @@ func NewBridge(options ...func(*cobra.Command, []*pflag.FlagSet)) *cobra.Command
rpc.Flags(),
gateway.Flags(),
state.Flags(),
pruner.Flags(),
}
cmd := &cobra.Command{
Use: "bridge [subcommand]",
Expand Down Expand Up @@ -72,6 +74,7 @@ func NewFull(options ...func(*cobra.Command, []*pflag.FlagSet)) *cobra.Command {
rpc.Flags(),
gateway.Flags(),
state.Flags(),
pruner.Flags(),
}
cmd := &cobra.Command{
Use: "full [subcommand]",
Expand Down
26 changes: 19 additions & 7 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/pruner"
rpc_cfg "github.com/celestiaorg/celestia-node/nodebuilder/rpc"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
"github.com/celestiaorg/celestia-node/share"
Expand Down Expand Up @@ -105,13 +106,6 @@ func PersistentPreRunEnv(cmd *cobra.Command, nodeType node.Type, _ []string) err
return err
}

if nodeType != node.Bridge {
err = header.ParseFlags(cmd, &cfg.Header)
if err != nil {
return err
}
}

ctx, err = ParseMiscFlags(ctx, cmd)
if err != nil {
return err
Expand All @@ -121,6 +115,24 @@ func PersistentPreRunEnv(cmd *cobra.Command, nodeType node.Type, _ []string) err
gateway.ParseFlags(cmd, &cfg.Gateway)
state.ParseFlags(cmd, &cfg.State)

switch nodeType {
case node.Light:
err = header.ParseFlags(cmd, &cfg.Header)
if err != nil {
return err
}
case node.Full:
err = header.ParseFlags(cmd, &cfg.Header)
if err != nil {
return err
}
pruner.ParseFlags(cmd, &cfg.Pruner)
case node.Bridge:
pruner.ParseFlags(cmd, &cfg.Pruner)
default:
panic(fmt.Sprintf("invalid node type: %v", nodeType))
}

// set config
ctx = WithNodeConfig(ctx, &cfg)
cmd.SetContext(ctx)
Expand Down
34 changes: 25 additions & 9 deletions header/headertest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] {
return headertest.NewStore[*header.ExtendedHeader](t, NewTestSuite(t, 3, 0), 10)
}

func NewCustomStore(
t *testing.T,
generator headertest.Generator[*header.ExtendedHeader],
numHeaders int,
) libhead.Store[*header.ExtendedHeader] {
return headertest.NewStore[*header.ExtendedHeader](t, generator, numHeaders)
}

// NewTestSuite setups a new test suite with a given number of validators.
func NewTestSuite(t *testing.T, numValidators int, blockTime time.Duration) *TestSuite {
valSet, vals := RandValidatorSet(numValidators, 10)
Expand Down Expand Up @@ -82,8 +90,10 @@ func (s *TestSuite) genesis() *header.ExtendedHeader {
return eh
}

func MakeCommit(blockID types.BlockID, height int64, round int32,
voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time) (*types.Commit, error) {
func MakeCommit(
blockID types.BlockID, height int64, round int32,
voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time,
) (*types.Commit, error) {

// all sign
for i := 0; i < len(validators); i++ {
Expand Down Expand Up @@ -157,7 +167,8 @@ func (s *TestSuite) NextHeader() *header.ExtendedHeader {
}

func (s *TestSuite) GenRawHeader(
height uint64, lastHeader, lastCommit, dataHash libhead.Hash) *header.RawHeader {
height uint64, lastHeader, lastCommit, dataHash libhead.Hash,
) *header.RawHeader {
rh := RandRawHeader(s.t)
rh.Height = int64(height)
rh.LastBlockID = types.BlockID{Hash: bytes.HexBytes(lastHeader)}
Expand All @@ -167,9 +178,9 @@ func (s *TestSuite) GenRawHeader(
rh.NextValidatorsHash = s.valSet.Hash()
rh.ProposerAddress = s.nextProposer().Address

rh.Time = time.Now()
rh.Time = time.Now().UTC()
if s.blockTime > 0 {
rh.Time = s.Head().Time().Add(s.blockTime)
rh.Time = s.Head().Time().UTC().Add(s.blockTime)
}

return rh
Expand All @@ -189,7 +200,7 @@ func (s *TestSuite) Commit(h *header.RawHeader) *types.Commit {
ValidatorIndex: int32(i),
Height: h.Height,
Round: round,
Timestamp: tmtime.Now(),
Timestamp: tmtime.Now().UTC(),
Type: tmproto.PrecommitType,
BlockID: bid,
}
Expand All @@ -214,6 +225,11 @@ func (s *TestSuite) nextProposer() *types.Validator {

// RandExtendedHeader provides an ExtendedHeader fixture.
func RandExtendedHeader(t testing.TB) *header.ExtendedHeader {
timestamp := time.Now().UTC()
return RandExtendedHeaderAtTimestamp(t, timestamp)
}

func RandExtendedHeaderAtTimestamp(t testing.TB, timestamp time.Time) *header.ExtendedHeader {
dah := share.EmptyRoot()

rh := RandRawHeader(t)
Expand All @@ -224,7 +240,7 @@ func RandExtendedHeader(t testing.TB) *header.ExtendedHeader {
voteSet := types.NewVoteSet(rh.ChainID, rh.Height, 0, tmproto.PrecommitType, valSet)
blockID := RandBlockID(t)
blockID.Hash = rh.Hash()
commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, time.Now())
commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, timestamp)
require.NoError(t, err)

return &header.ExtendedHeader{
Expand Down Expand Up @@ -279,7 +295,7 @@ func RandRawHeader(t testing.TB) *header.RawHeader {
Version: version.Consensus{Block: 11, App: 1},
ChainID: "test",
Height: mrand.Int63(), //nolint:gosec
Time: time.Now(),
Time: time.Now().UTC(),
LastBlockID: RandBlockID(t),
LastCommitHash: tmrand.Bytes(32),
DataHash: tmrand.Bytes(32),
Expand Down Expand Up @@ -320,7 +336,7 @@ func ExtendedHeaderFromEDS(t testing.TB, height uint64, eds *rsmt2d.ExtendedData
blockID := RandBlockID(t)
blockID.Hash = gen.Hash()
voteSet := types.NewVoteSet(gen.ChainID, gen.Height, 0, tmproto.PrecommitType, valSet)
commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, vals, time.Now())
commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, vals, time.Now().UTC())
require.NoError(t, err)

eh := &header.ExtendedHeader{
Expand Down
3 changes: 3 additions & 0 deletions nodebuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/pruner"
"github.com/celestiaorg/celestia-node/nodebuilder/rpc"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
Expand All @@ -35,6 +36,7 @@ type Config struct {
Share share.Config
Header header.Config
DASer das.Config `toml:",omitempty"`
Pruner pruner.Config
}

// DefaultConfig provides a default Config for a given Node Type 'tp'.
Expand All @@ -49,6 +51,7 @@ func DefaultConfig(tp node.Type) *Config {
Gateway: gateway.DefaultConfig(),
Share: share.DefaultConfig(tp),
Header: header.DefaultConfig(tp),
Pruner: pruner.DefaultConfig(),
}

switch tp {
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
modhead "github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/prune"
"github.com/celestiaorg/celestia-node/nodebuilder/pruner"
"github.com/celestiaorg/celestia-node/nodebuilder/rpc"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
Expand Down Expand Up @@ -58,7 +58,7 @@ func ConstructModule(tp node.Type, network p2p.Network, cfg *Config, store Store
blob.ConstructModule(),
da.ConstructModule(),
node.ConstructModule(tp),
prune.ConstructModule(tp),
pruner.ConstructModule(tp, &cfg.Pruner),
rpc.ConstructModule(tp, &cfg.RPC),
)

Expand Down
47 changes: 0 additions & 47 deletions nodebuilder/prune/module.go

This file was deleted.

13 changes: 13 additions & 0 deletions nodebuilder/pruner/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package pruner

var MetricsEnabled bool

type Config struct {
EnableService bool
}

func DefaultConfig() Config {
return Config{
EnableService: false,
}
}
33 changes: 33 additions & 0 deletions nodebuilder/pruner/constructors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pruner

import (
"github.com/ipfs/go-datastore"

hdr "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
)

func newPrunerService(
p pruner.Pruner,
window pruner.AvailabilityWindow,
getter hdr.Store[*header.ExtendedHeader],
ds datastore.Batching,
opts ...pruner.Option,
) (*pruner.Service, error) {
serv, err := pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...)
if err != nil {
return nil, err
}

if MetricsEnabled {
err := pruner.WithPrunerMetrics(serv)
if err != nil {
return nil, err
}
}

return serv, nil
}
20 changes: 20 additions & 0 deletions nodebuilder/pruner/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pruner

import (
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
)

const pruningFlag = "experimental-pruning"

func Flags() *flag.FlagSet {
flags := &flag.FlagSet{}

flags.Bool(pruningFlag, false, "EXPERIMENTAL: Enables pruning of blocks outside the pruning window.")

return flags
}

func ParseFlags(cmd *cobra.Command, cfg *Config) {
cfg.EnableService = cmd.Flag(pruningFlag).Changed
}
71 changes: 71 additions & 0 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package pruner

import (
"context"

"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/archival"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/pruner/light"
"github.com/celestiaorg/celestia-node/share/eds"
)

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
if !cfg.EnableService {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Supply(light.Window)
case node.Full, node.Bridge:
return fx.Supply(archival.Window)
default:
panic("unknown node type")
}
}

baseComponents := fx.Options(
fx.Provide(fx.Annotate(
newPrunerService,
fx.OnStart(func(ctx context.Context, p *pruner.Service) error {
return p.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, p *pruner.Service) error {
return p.Stop(ctx)
}),
)),
// This is necessary to invoke the pruner service as independent thanks to a
// quirk in FX.
fx.Invoke(func(_ *pruner.Service) {}),
)

switch tp {
case node.Full:
return fx.Module("prune",
baseComponents,
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
fx.Supply(full.Window),
)
case node.Bridge:
return fx.Module("prune",
baseComponents,
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
fx.Supply(full.Window),
)
// TODO: Eventually, light nodes will be capable of pruning samples
// in which case, this can be enabled.
case node.Light:
return fx.Module("prune",
fx.Supply(light.Window),
)
default:
panic("unknown node type")
}
}
Loading

0 comments on commit 02030bf

Please sign in to comment.