Skip to content

Commit

Permalink
feat: allow separate accounting store
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler committed Jun 16, 2021
1 parent d2c668e commit f91efac
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 22 deletions.
2 changes: 2 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const (
optionNamePriceOracleAddress = "price-oracle-address"
optionNameBlockTime = "block-time"
optionWarmUpTime = "warmup-time"
optionNameSeparateAccountingStore = "separate-accounting-store"
)

func init() {
Expand Down Expand Up @@ -241,6 +242,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().Uint64(optionNameBlockTime, 15, "chain block time")
cmd.Flags().String(optionNameSwapDeploymentGasPrice, "", "gas price in wei to use for deployment and funding")
cmd.Flags().Duration(optionWarmUpTime, time.Minute*10, "time to warmup the node before pull/push protocols can be kicked off.")
cmd.Flags().Bool(optionNameSeparateAccountingStore, true, "separate statestore for accounting purposes")
}

func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) {
Expand Down
23 changes: 16 additions & 7 deletions cmd/bee/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/ethersphere/bee/pkg/node"
"github.com/ethersphere/bee/pkg/storage"
"github.com/spf13/cobra"
)

Expand All @@ -35,20 +36,28 @@ func (c *command) initDeployCmd() error {
swapEndpoint := c.config.GetString(optionNameSwapEndpoint)
deployGasPrice := c.config.GetString(optionNameSwapDeploymentGasPrice)

stateStore, err := node.InitStateStore(logger, dataDir)
if err != nil {
return err
var accountingStore storage.StateStorer
if c.config.GetBool(optionNameSeparateAccountingStore) {
accountingStore, err = node.InitAccountingStore(logger, dataDir)
if err != nil {
return err
}
} else {
accountingStore, err = node.InitStateStore(logger, dataDir)
if err != nil {
return err
}
}

defer stateStore.Close()
defer accountingStore.Close()

signerConfig, err := c.configureSigner(cmd, logger)
if err != nil {
return err
}
signer := signerConfig.signer

err = node.CheckOverlayWithStore(signerConfig.address, stateStore)
err = node.CheckOverlayWithStore(signerConfig.address, accountingStore)
if err != nil {
return err
}
Expand All @@ -58,7 +67,7 @@ func (c *command) initDeployCmd() error {
swapBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err := node.InitChain(
ctx,
logger,
stateStore,
accountingStore,
swapEndpoint,
signer,
blocktime,
Expand All @@ -84,7 +93,7 @@ func (c *command) initDeployCmd() error {
_, err = node.InitChequebookService(
ctx,
logger,
stateStore,
accountingStore,
signer,
chainID,
swapBackend,
Expand Down
19 changes: 14 additions & 5 deletions cmd/bee/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/ethersphere/bee/pkg/node"
"github.com/ethersphere/bee/pkg/storage"
"github.com/spf13/cobra"
)

Expand All @@ -32,14 +33,22 @@ func (c *command) initInitCmd() (err error) {
}

dataDir := c.config.GetString(optionNameDataDir)
stateStore, err := node.InitStateStore(logger, dataDir)
if err != nil {
return err
var accountingStore storage.StateStorer
if c.config.GetBool(optionNameSeparateAccountingStore) {
accountingStore, err = node.InitAccountingStore(logger, dataDir)
if err != nil {
return err
}
} else {
accountingStore, err = node.InitStateStore(logger, dataDir)
if err != nil {
return err
}
}

defer stateStore.Close()
defer accountingStore.Close()

return node.CheckOverlayWithStore(signerConfig.address, stateStore)
return node.CheckOverlayWithStore(signerConfig.address, accountingStore)
},
PreRunE: func(cmd *cobra.Command, args []string) error {
return c.config.BindPFlags(cmd.Flags())
Expand Down
1 change: 1 addition & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
BlockTime: c.config.GetUint64(optionNameBlockTime),
DeployGasPrice: c.config.GetString(optionNameSwapDeploymentGasPrice),
WarmupTime: c.config.GetDuration(optionWarmUpTime),
SeparateAccountingStore: c.config.GetBool(optionNameSeparateAccountingStore),
})
if err != nil {
return err
Expand Down
34 changes: 24 additions & 10 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Bee struct {
tracerCloser io.Closer
tagsCloser io.Closer
stateStoreCloser io.Closer
accountingStoreCloser io.Closer
localstoreCloser io.Closer
topologyCloser io.Closer
topologyHalter topology.Halter
Expand Down Expand Up @@ -139,6 +140,7 @@ type Options struct {
ResolverConnectionCfgs []multiresolver.ConnectionConfig
GatewayMode bool
BootnodeMode bool
SeparateAccountingStore bool
SwapEndpoint string
SwapFactoryAddress string
SwapLegacyFactoryAddresses []string
Expand Down Expand Up @@ -229,7 +231,18 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}
b.stateStoreCloser = stateStore

err = CheckOverlayWithStore(swarmAddress, stateStore)
var accountingStore storage.StateStorer
if o.SeparateAccountingStore {
accountingStore, err = InitAccountingStore(logger, o.DataDir)
if err != nil {
return nil, err
}
b.accountingStoreCloser = stateStore
} else {
accountingStore = stateStore
}

err = CheckOverlayWithStore(swarmAddress, accountingStore)
if err != nil {
return nil, err
}
Expand All @@ -251,7 +264,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
swapBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err = InitChain(
p2pCtx,
logger,
stateStore,
accountingStore,
o.SwapEndpoint,
signer,
o.BlockTime,
Expand Down Expand Up @@ -284,7 +297,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
chequebookService, err = InitChequebookService(
p2pCtx,
logger,
stateStore,
accountingStore,
signer,
chainID,
swapBackend,
Expand All @@ -299,7 +312,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}

chequeStore, cashoutService = initChequeStoreCashout(
stateStore,
accountingStore,
swapBackend,
chequebookFactory,
chainID,
Expand All @@ -310,7 +323,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,

lightNodes := lightnode.NewContainer(swarmAddress)

txHash, err := getTxHash(stateStore, logger, o)
txHash, err := getTxHash(accountingStore, logger, o)
if err != nil {
return nil, fmt.Errorf("invalid transaction hash: %w", err)
}
Expand Down Expand Up @@ -521,7 +534,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
paymentTolerance,
paymentEarly,
logger,
stateStore,
accountingStore,
pricing,
big.NewInt(refreshRate),
p2ps,
Expand All @@ -531,7 +544,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}
b.accountingCloser = acc

pseudosettleService := pseudosettle.New(p2ps, logger, stateStore, acc, big.NewInt(refreshRate), p2ps)
pseudosettleService := pseudosettle.New(p2ps, logger, accountingStore, acc, big.NewInt(refreshRate), p2ps)
if err = p2ps.AddProtocol(pseudosettleService.Protocol()); err != nil {
return nil, fmt.Errorf("pseudosettle service: %w", err)
}
Expand All @@ -543,7 +556,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
swapService, priceOracle, err = InitSwap(
p2ps,
logger,
stateStore,
accountingStore,
networkID,
overlayEthAddress,
chequebookService,
Expand Down Expand Up @@ -842,14 +855,15 @@ func (b *Bee) Shutdown(ctx context.Context) error {
tryClose(b.tagsCloser, "tag persistence")
tryClose(b.topologyCloser, "topology driver")
tryClose(b.stateStoreCloser, "statestore")
tryClose(b.accountingCloser, "accountingstore")
tryClose(b.localstoreCloser, "localstore")
tryClose(b.errorLogWriter, "error log writer")
tryClose(b.resolverCloser, "resolver service")

return mErr
}

func getTxHash(stateStore storage.StateStorer, logger logging.Logger, o Options) ([]byte, error) {
func getTxHash(accountingStore storage.StateStorer, logger logging.Logger, o Options) ([]byte, error) {
if o.Standalone {
return nil, nil // in standalone mode tx hash is not used
}
Expand All @@ -869,7 +883,7 @@ func getTxHash(stateStore storage.StateStorer, logger logging.Logger, o Options)

var txHash common.Hash
key := chequebook.ChequebookDeploymentKey
if err := stateStore.Get(key, &txHash); err != nil {
if err := accountingStore.Get(key, &txHash); err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, errors.New("chequebook deployment transaction hash not found. Please specify the transaction hash manually.")
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/node/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ func InitStateStore(log logging.Logger, dataDir string) (ret storage.StateStorer
return leveldb.NewStateStore(filepath.Join(dataDir, "statestore"), log)
}

// InitAccountingStore will initialize the accountingStore with the given path to the
// data directory. When given an empty directory path, the function will instead
// initialize an in-memory state store that will not be persisted.
func InitAccountingStore(log logging.Logger, dataDir string) (ret storage.StateStorer, err error) {
if dataDir == "" {
ret = mock.NewStateStore()
log.Warning("using in-mem accounting store, no node state will be persisted")
return ret, nil
}
return leveldb.NewStateStore(filepath.Join(dataDir, "accountingstore"), log)
}

const overlayKey = "overlay"

// CheckOverlayWithStore checks the overlay is the same as stored in the statestore
Expand Down

0 comments on commit f91efac

Please sign in to comment.