Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow separate accounting store #2109

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const (
optionNamePriceOracleAddress = "price-oracle-address"
optionNameBlockTime = "block-time"
optionWarmUpTime = "warmup-time"
optionNameSeparateAccountingStore = "separate-accounting-store"
)

func init() {
Expand Down Expand Up @@ -241,6 +242,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().Uint64(optionNameBlockTime, 15, "chain block time")
cmd.Flags().String(optionNameSwapDeploymentGasPrice, "", "gas price in wei to use for deployment and funding")
cmd.Flags().Duration(optionWarmUpTime, time.Minute*10, "time to warmup the node before pull/push protocols can be kicked off.")
cmd.Flags().Bool(optionNameSeparateAccountingStore, false, "separate statestore for accounting purposes")
}

func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) {
Expand Down
23 changes: 16 additions & 7 deletions cmd/bee/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/ethersphere/bee/pkg/node"
"github.com/ethersphere/bee/pkg/storage"
"github.com/spf13/cobra"
)

Expand All @@ -35,20 +36,28 @@ func (c *command) initDeployCmd() error {
swapEndpoint := c.config.GetString(optionNameSwapEndpoint)
deployGasPrice := c.config.GetString(optionNameSwapDeploymentGasPrice)

stateStore, err := node.InitStateStore(logger, dataDir)
if err != nil {
return err
var accountingStore storage.StateStorer
if c.config.GetBool(optionNameSeparateAccountingStore) {
accountingStore, err = node.InitAccountingStore(logger, dataDir)
if err != nil {
return err
}
} else {
accountingStore, err = node.InitStateStore(logger, dataDir)
if err != nil {
return err
}
}

defer stateStore.Close()
defer accountingStore.Close()

signerConfig, err := c.configureSigner(cmd, logger)
if err != nil {
return err
}
signer := signerConfig.signer

err = node.CheckOverlayWithStore(signerConfig.address, stateStore)
err = node.CheckOverlayWithStore(signerConfig.address, accountingStore)
if err != nil {
return err
}
Expand All @@ -58,7 +67,7 @@ func (c *command) initDeployCmd() error {
swapBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err := node.InitChain(
ctx,
logger,
stateStore,
accountingStore,
swapEndpoint,
signer,
blocktime,
Expand All @@ -84,7 +93,7 @@ func (c *command) initDeployCmd() error {
_, err = node.InitChequebookService(
ctx,
logger,
stateStore,
accountingStore,
signer,
chainID,
swapBackend,
Expand Down
19 changes: 14 additions & 5 deletions cmd/bee/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/ethersphere/bee/pkg/node"
"github.com/ethersphere/bee/pkg/storage"
"github.com/spf13/cobra"
)

Expand All @@ -32,14 +33,22 @@ func (c *command) initInitCmd() (err error) {
}

dataDir := c.config.GetString(optionNameDataDir)
stateStore, err := node.InitStateStore(logger, dataDir)
if err != nil {
return err
var accountingStore storage.StateStorer
if c.config.GetBool(optionNameSeparateAccountingStore) {
accountingStore, err = node.InitAccountingStore(logger, dataDir)
if err != nil {
return err
}
} else {
accountingStore, err = node.InitStateStore(logger, dataDir)
if err != nil {
return err
}
}

defer stateStore.Close()
defer accountingStore.Close()

return node.CheckOverlayWithStore(signerConfig.address, stateStore)
return node.CheckOverlayWithStore(signerConfig.address, accountingStore)
},
PreRunE: func(cmd *cobra.Command, args []string) error {
return c.config.BindPFlags(cmd.Flags())
Expand Down
1 change: 1 addition & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
BlockTime: c.config.GetUint64(optionNameBlockTime),
DeployGasPrice: c.config.GetString(optionNameSwapDeploymentGasPrice),
WarmupTime: c.config.GetDuration(optionWarmUpTime),
SeparateAccountingStore: c.config.GetBool(optionNameSeparateAccountingStore),
})
if err != nil {
return err
Expand Down
34 changes: 24 additions & 10 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Bee struct {
tracerCloser io.Closer
tagsCloser io.Closer
stateStoreCloser io.Closer
accountingStoreCloser io.Closer
localstoreCloser io.Closer
topologyCloser io.Closer
topologyHalter topology.Halter
Expand Down Expand Up @@ -139,6 +140,7 @@ type Options struct {
ResolverConnectionCfgs []multiresolver.ConnectionConfig
GatewayMode bool
BootnodeMode bool
SeparateAccountingStore bool
SwapEndpoint string
SwapFactoryAddress string
SwapLegacyFactoryAddresses []string
Expand Down Expand Up @@ -229,7 +231,18 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}
b.stateStoreCloser = stateStore

err = CheckOverlayWithStore(swarmAddress, stateStore)
var accountingStore storage.StateStorer
if o.SeparateAccountingStore {
accountingStore, err = InitAccountingStore(logger, o.DataDir)
if err != nil {
return nil, err
}
b.accountingStoreCloser = stateStore
} else {
accountingStore = stateStore
}

err = CheckOverlayWithStore(swarmAddress, accountingStore)
if err != nil {
return nil, err
}
Expand All @@ -251,7 +264,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
swapBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err = InitChain(
p2pCtx,
logger,
stateStore,
accountingStore,
o.SwapEndpoint,
signer,
o.BlockTime,
Expand Down Expand Up @@ -284,7 +297,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
chequebookService, err = InitChequebookService(
p2pCtx,
logger,
stateStore,
accountingStore,
signer,
chainID,
swapBackend,
Expand All @@ -299,7 +312,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}

chequeStore, cashoutService = initChequeStoreCashout(
stateStore,
accountingStore,
swapBackend,
chequebookFactory,
chainID,
Expand All @@ -310,7 +323,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,

lightNodes := lightnode.NewContainer(swarmAddress)

txHash, err := getTxHash(stateStore, logger, o)
txHash, err := getTxHash(accountingStore, logger, o)
if err != nil {
return nil, fmt.Errorf("invalid transaction hash: %w", err)
}
Expand Down Expand Up @@ -521,7 +534,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
paymentTolerance,
paymentEarly,
logger,
stateStore,
accountingStore,
pricing,
big.NewInt(refreshRate),
p2ps,
Expand All @@ -531,7 +544,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}
b.accountingCloser = acc

pseudosettleService := pseudosettle.New(p2ps, logger, stateStore, acc, big.NewInt(refreshRate), p2ps)
pseudosettleService := pseudosettle.New(p2ps, logger, accountingStore, acc, big.NewInt(refreshRate), p2ps)
if err = p2ps.AddProtocol(pseudosettleService.Protocol()); err != nil {
return nil, fmt.Errorf("pseudosettle service: %w", err)
}
Expand All @@ -543,7 +556,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
swapService, priceOracle, err = InitSwap(
p2ps,
logger,
stateStore,
accountingStore,
networkID,
overlayEthAddress,
chequebookService,
Expand Down Expand Up @@ -842,14 +855,15 @@ func (b *Bee) Shutdown(ctx context.Context) error {
tryClose(b.tagsCloser, "tag persistence")
tryClose(b.topologyCloser, "topology driver")
tryClose(b.stateStoreCloser, "statestore")
tryClose(b.accountingCloser, "accountingstore")
tryClose(b.localstoreCloser, "localstore")
tryClose(b.errorLogWriter, "error log writer")
tryClose(b.resolverCloser, "resolver service")

return mErr
}

func getTxHash(stateStore storage.StateStorer, logger logging.Logger, o Options) ([]byte, error) {
func getTxHash(accountingStore storage.StateStorer, logger logging.Logger, o Options) ([]byte, error) {
if o.Standalone {
return nil, nil // in standalone mode tx hash is not used
}
Expand All @@ -869,7 +883,7 @@ func getTxHash(stateStore storage.StateStorer, logger logging.Logger, o Options)

var txHash common.Hash
key := chequebook.ChequebookDeploymentKey
if err := stateStore.Get(key, &txHash); err != nil {
if err := accountingStore.Get(key, &txHash); err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, errors.New("chequebook deployment transaction hash not found. Please specify the transaction hash manually.")
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/node/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ func InitStateStore(log logging.Logger, dataDir string) (ret storage.StateStorer
return leveldb.NewStateStore(filepath.Join(dataDir, "statestore"), log)
}

// InitAccountingStore will initialize the accountingStore with the given path to the
// data directory. When given an empty directory path, the function will instead
// initialize an in-memory state store that will not be persisted.
func InitAccountingStore(log logging.Logger, dataDir string) (ret storage.StateStorer, err error) {
if dataDir == "" {
ret = mock.NewStateStore()
log.Warning("using in-mem accounting store, no node state will be persisted")
return ret, nil
}
return leveldb.NewStateStore(filepath.Join(dataDir, "accountingstore"), log)
}

const overlayKey = "overlay"

// CheckOverlayWithStore checks the overlay is the same as stored in the statestore
Expand Down