diff --git a/consensus/polybft/checkpoint_manager.go b/consensus/polybft/checkpoint_manager.go index bb427cbf3a..1f4f1e2e43 100644 --- a/consensus/polybft/checkpoint_manager.go +++ b/consensus/polybft/checkpoint_manager.go @@ -16,6 +16,7 @@ import ( "github.com/0xPolygon/polygon-edge/types" hclog "github.com/hashicorp/go-hclog" "github.com/umbracle/ethgo" + bolt "go.etcd.io/bbolt" ) var ( @@ -50,7 +51,7 @@ func (d *dummyCheckpointManager) GetLogFilters() map[types.Address][]types.Hash return make(map[types.Address][]types.Hash) } func (d *dummyCheckpointManager) AddLog(header *types.Header, - log *ethgo.Log, dbTx DBTransaction) error { + log *ethgo.Log, dbTx *bolt.Tx) error { return nil } @@ -428,7 +429,7 @@ func (c *checkpointManager) GetLogFilters() map[types.Address][]types.Hash { // AddLog is the implementation of EventSubscriber interface, // used to handle a log defined in GetLogFilters, provided by event provider -func (c *checkpointManager) AddLog(header *types.Header, log *ethgo.Log, dbTx DBTransaction) error { +func (c *checkpointManager) AddLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error { exitEvent, doesMatch, err := parseExitEvent(header, log) if err != nil { return err diff --git a/consensus/polybft/consensus_runtime.go b/consensus/polybft/consensus_runtime.go index 8d3e8840ff..7b9bb31879 100644 --- a/consensus/polybft/consensus_runtime.go +++ b/consensus/polybft/consensus_runtime.go @@ -17,6 +17,7 @@ import ( "github.com/0xPolygon/polygon-edge/contracts" "github.com/0xPolygon/polygon-edge/txrelayer" "github.com/0xPolygon/polygon-edge/types" + bolt "go.etcd.io/bbolt" "github.com/0xPolygon/go-ibft/messages" "github.com/0xPolygon/go-ibft/messages/proto" @@ -238,7 +239,7 @@ func (c *consensusRuntime) initCheckpointManager(logger hcf.Logger) error { } // initStakeManager initializes stake manager -func (c *consensusRuntime) initStakeManager(logger hcf.Logger, dbTx DBTransaction) error { +func (c *consensusRuntime) initStakeManager(logger hcf.Logger, dbTx *bolt.Tx) error { rootRelayer, err := txrelayer.NewTxRelayer(txrelayer.WithIPAddress(c.config.PolyBFTConfig.Bridge.JSONRPCEndpoint)) if err != nil { return err @@ -495,7 +496,7 @@ func (c *consensusRuntime) FSM() error { // restartEpoch resets the previously run epoch and moves to the next one // returns *epochMetadata different from nil if the lastEpoch is not the current one and everything was successful -func (c *consensusRuntime) restartEpoch(header *types.Header, dbTx DBTransaction) (*epochMetadata, error) { +func (c *consensusRuntime) restartEpoch(header *types.Header, dbTx *bolt.Tx) (*epochMetadata, error) { lastEpoch := c.epoch systemState, err := c.getSystemState(header) diff --git a/consensus/polybft/handlers.go b/consensus/polybft/handlers.go index c1b7acb6f3..2133896698 100644 --- a/consensus/polybft/handlers.go +++ b/consensus/polybft/handlers.go @@ -3,6 +3,7 @@ package polybft import ( "github.com/0xPolygon/polygon-edge/consensus/polybft/validator" "github.com/0xPolygon/polygon-edge/types" + bolt "go.etcd.io/bbolt" ) type PostBlockRequest struct { @@ -14,7 +15,7 @@ type PostBlockRequest struct { IsEpochEndingBlock bool // DBTx is the opened transaction on state store (in our case boltDB) // used to save necessary data on PostBlock - DBTx DBTransaction + DBTx *bolt.Tx } type PostEpochRequest struct { @@ -33,5 +34,5 @@ type PostEpochRequest struct { // DBTx is the opened transaction on state store (in our case boltDB) // used to save necessary data on PostEpoch - DBTx DBTransaction + DBTx *bolt.Tx } diff --git a/consensus/polybft/mocks_test.go b/consensus/polybft/mocks_test.go index 26ea216d5a..182a9f7eb6 100644 --- a/consensus/polybft/mocks_test.go +++ b/consensus/polybft/mocks_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/umbracle/ethgo" "github.com/umbracle/ethgo/contract" + bolt "go.etcd.io/bbolt" ) var _ blockchainBackend = (*blockchainMock)(nil) @@ -148,7 +149,7 @@ func (p *polybftBackendMock) GetValidators(blockNumber uint64, parents []*types. } func (p *polybftBackendMock) GetValidatorsWithTx(blockNumber uint64, parents []*types.Header, - dbTx DBTransaction) (validator.AccountSet, error) { + dbTx *bolt.Tx) (validator.AccountSet, error) { args := p.Called(blockNumber, parents, dbTx) if len(args) == 1 { accountSet, _ := args.Get(0).(validator.AccountSet) diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 2f9792f82b..421625956b 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -11,6 +11,7 @@ import ( "time" "github.com/hashicorp/go-hclog" + bolt "go.etcd.io/bbolt" "github.com/0xPolygon/polygon-edge/chain" "github.com/0xPolygon/polygon-edge/consensus" @@ -46,7 +47,7 @@ type polybftBackend interface { // GetValidators retrieves validator set for the given block // Function expects that db tx is already open GetValidatorsWithTx(blockNumber uint64, parents []*types.Header, - dbTx DBTransaction) (validator.AccountSet, error) + dbTx *bolt.Tx) (validator.AccountSet, error) } // Factory is the factory function to create a discovery consensus @@ -729,7 +730,7 @@ func (p *Polybft) GetValidators(blockNumber uint64, parents []*types.Header) (va } func (p *Polybft) GetValidatorsWithTx(blockNumber uint64, parents []*types.Header, - dbTx DBTransaction) (validator.AccountSet, error) { + dbTx *bolt.Tx) (validator.AccountSet, error) { return p.validatorsCache.GetSnapshot(blockNumber, parents, dbTx) } diff --git a/consensus/polybft/proposer_calculator.go b/consensus/polybft/proposer_calculator.go index e9ee4a8af4..2ec25fa32a 100644 --- a/consensus/polybft/proposer_calculator.go +++ b/consensus/polybft/proposer_calculator.go @@ -9,6 +9,7 @@ import ( "github.com/0xPolygon/polygon-edge/helper/common" "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" + bolt "go.etcd.io/bbolt" ) var ( @@ -33,7 +34,7 @@ type ProposerSnapshot struct { } // NewProposerSnapshotFromState create ProposerSnapshot from state if possible or from genesis block -func NewProposerSnapshotFromState(config *runtimeConfig, dbTx DBTransaction) (*ProposerSnapshot, error) { +func NewProposerSnapshotFromState(config *runtimeConfig, dbTx *bolt.Tx) (*ProposerSnapshot, error) { snapshot, err := config.State.ProposerSnapshotStore.getProposerSnapshot(dbTx) if err != nil { return nil, err @@ -161,7 +162,7 @@ type ProposerCalculator struct { // NewProposerCalculator creates a new proposer calculator object func NewProposerCalculator(config *runtimeConfig, logger hclog.Logger, - dbTx DBTransaction) (*ProposerCalculator, error) { + dbTx *bolt.Tx) (*ProposerCalculator, error) { snap, err := NewProposerSnapshotFromState(config, dbTx) if err != nil { @@ -215,7 +216,7 @@ func (pc *ProposerCalculator) PostBlock(req *PostBlockRequest) error { return pc.update(blockNumber, req.DBTx) } -func (pc *ProposerCalculator) update(blockNumber uint64, dbTx DBTransaction) error { +func (pc *ProposerCalculator) update(blockNumber uint64, dbTx *bolt.Tx) error { pc.logger.Debug("Update proposers snapshot started", "target block", blockNumber) from := pc.snapshot.Height @@ -242,7 +243,7 @@ func (pc *ProposerCalculator) update(blockNumber uint64, dbTx DBTransaction) err } // Updates ProposerSnapshot to block block with number `blockNumber` -func (pc *ProposerCalculator) updatePerBlock(blockNumber uint64, dbTx DBTransaction) error { +func (pc *ProposerCalculator) updatePerBlock(blockNumber uint64, dbTx *bolt.Tx) error { if pc.snapshot.Height != blockNumber { return fmt.Errorf("proposers snapshot update called for wrong block. block number=%d, snapshot block number=%d", blockNumber, pc.snapshot.Height) diff --git a/consensus/polybft/stake_manager.go b/consensus/polybft/stake_manager.go index 907acedb75..65d766d62f 100644 --- a/consensus/polybft/stake_manager.go +++ b/consensus/polybft/stake_manager.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/umbracle/ethgo" "github.com/umbracle/ethgo/abi" + bolt "go.etcd.io/bbolt" ) var ( @@ -50,7 +51,7 @@ func (d *dummyStakeManager) UpdateValidatorSet(epoch uint64, func (d *dummyStakeManager) GetLogFilters() map[types.Address][]types.Hash { return make(map[types.Address][]types.Hash) } -func (d *dummyStakeManager) AddLog(header *types.Header, log *ethgo.Log, dbTx DBTransaction) error { +func (d *dummyStakeManager) AddLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error { return nil } @@ -79,7 +80,7 @@ func newStakeManager( blockchain blockchainBackend, polybftBackend polybftBackend, maxValidatorSetSize int, - dbTx DBTransaction, + dbTx *bolt.Tx, ) (*stakeManager, error) { sm := &stakeManager{ logger: logger, @@ -123,7 +124,7 @@ func (s *stakeManager) PostBlock(req *PostBlockRequest) error { return s.state.StakeStore.insertFullValidatorSet(fullValidatorSet, req.DBTx) } -func (s *stakeManager) init(blockchain blockchainBackend, dbTx DBTransaction) error { +func (s *stakeManager) init(blockchain blockchainBackend, dbTx *bolt.Tx) error { currentHeader := blockchain.CurrentHeader() currentBlockNumber := currentHeader.Number @@ -180,7 +181,7 @@ func (s *stakeManager) init(blockchain blockchainBackend, dbTx DBTransaction) er return s.state.StakeStore.insertFullValidatorSet(validatorSet, dbTx) } -func (s *stakeManager) getOrInitValidatorSet(dbTx DBTransaction) (validatorSetState, error) { +func (s *stakeManager) getOrInitValidatorSet(dbTx *bolt.Tx) (validatorSetState, error) { validatorSet, err := s.state.StakeStore.getFullValidatorSet(dbTx) if err != nil { if !errors.Is(err, errNoFullValidatorSet) { @@ -392,7 +393,7 @@ func (s *stakeManager) GetLogFilters() map[types.Address][]types.Hash { // AddLog is the implementation of EventSubscriber interface, // used to handle a log defined in GetLogFilters, provided by event provider -func (s *stakeManager) AddLog(header *types.Header, log *ethgo.Log, dbTx DBTransaction) error { +func (s *stakeManager) AddLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error { var transferEvent contractsapi.TransferEvent doesMatch, err := transferEvent.ParseLog(log) diff --git a/consensus/polybft/state.go b/consensus/polybft/state.go index 524909b775..98d8111b4a 100644 --- a/consensus/polybft/state.go +++ b/consensus/polybft/state.go @@ -13,15 +13,6 @@ var ( edgeEventsLastProcessedBlockKey = []byte("EdgeEventsLastProcessedBlockKey") ) -type DBTransaction interface { - Commit() error - Rollback() error - Bucket(bucket []byte) *bolt.Bucket - DeleteBucket(bucket []byte) error - CreateBucket(bucket []byte) (*bolt.Bucket, error) - CreateBucketIfNotExists(bucket []byte) (*bolt.Bucket, error) -} - // MessageSignature encapsulates sender identifier and its signature type MessageSignature struct { // Signer of the vote @@ -126,8 +117,8 @@ func (s *State) initStorages() error { } // insertLastProcessedEventsBlock inserts the last processed block for events on Edge -func (s *State) insertLastProcessedEventsBlock(block uint64, dbTx DBTransaction) error { - insertFn := func(tx DBTransaction) error { +func (s *State) insertLastProcessedEventsBlock(block uint64, dbTx *bolt.Tx) error { + insertFn := func(tx *bolt.Tx) error { return tx.Bucket(edgeEventsLastProcessedBlockBucket).Put( edgeEventsLastProcessedBlockKey, common.EncodeUint64ToBytes(block)) } @@ -142,13 +133,13 @@ func (s *State) insertLastProcessedEventsBlock(block uint64, dbTx DBTransaction) } // getLastProcessedEventsBlock gets the last processed block for events on Edge -func (s *State) getLastProcessedEventsBlock(dbTx DBTransaction) (uint64, error) { +func (s *State) getLastProcessedEventsBlock(dbTx *bolt.Tx) (uint64, error) { var ( lastProcessed uint64 err error ) - getFn := func(tx DBTransaction) { + getFn := func(tx *bolt.Tx) { value := tx.Bucket(edgeEventsLastProcessedBlockBucket).Get(edgeEventsLastProcessedBlockKey) if value != nil { lastProcessed = common.EncodeBytesToUint64(value) @@ -170,7 +161,7 @@ func (s *State) getLastProcessedEventsBlock(dbTx DBTransaction) (uint64, error) // beginDBTransaction creates and begins a transaction on BoltDB // Note that transaction needs to be manually rollback or committed -func (s *State) beginDBTransaction(isWriteTx bool) (DBTransaction, error) { +func (s *State) beginDBTransaction(isWriteTx bool) (*bolt.Tx, error) { return s.db.Begin(isWriteTx) } diff --git a/consensus/polybft/state_event_getter.go b/consensus/polybft/state_event_getter.go index 1e72786742..c8ca5cd180 100644 --- a/consensus/polybft/state_event_getter.go +++ b/consensus/polybft/state_event_getter.go @@ -5,6 +5,7 @@ import ( "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" "github.com/0xPolygon/polygon-edge/types" "github.com/umbracle/ethgo" + bolt "go.etcd.io/bbolt" ) // EventSubscriber specifies functions needed for a component to subscribe to eventProvider @@ -15,7 +16,7 @@ type EventSubscriber interface { GetLogFilters() map[types.Address][]types.Hash // AddLog is used to handle a log defined in GetLogFilters, provided by event provider - AddLog(header *types.Header, log *ethgo.Log, dbTx DBTransaction) error + AddLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error } // EventProvider represents an event provider in a blockchain system @@ -74,7 +75,7 @@ func (e *EventProvider) Subscribe(subscriber EventSubscriber) { // - error - if a block or its receipts could not be retrieved from blockchain func (e *EventProvider) GetEventsFromBlocks(lastProcessedBlock uint64, latestBlock *types.FullBlock, - dbTx DBTransaction) error { + dbTx *bolt.Tx) error { if err := e.getEventsFromBlocksRange(lastProcessedBlock+1, latestBlock.Block.Number()-1, dbTx); err != nil { return err } @@ -92,7 +93,7 @@ func (e *EventProvider) GetEventsFromBlocks(lastProcessedBlock uint64, // Returns: // - nil - if getting events finished successfully // - error - if a block or its receipts could not be retrieved from blockchain -func (e *EventProvider) getEventsFromBlocksRange(from, to uint64, dbTx DBTransaction) error { +func (e *EventProvider) getEventsFromBlocksRange(from, to uint64, dbTx *bolt.Tx) error { for i := from; i <= to; i++ { blockHeader, found := e.blockchain.GetHeaderByNumber(i) if !found { @@ -124,7 +125,7 @@ func (e *EventProvider) getEventsFromBlocksRange(from, to uint64, dbTx DBTransac // - error - if a subscriber for a certain log (event) returns an error on log (event) handling func (e *EventProvider) getEventsFromReceipts(blockHeader *types.Header, receipts []*types.Receipt, - dbTx DBTransaction) error { + dbTx *bolt.Tx) error { for _, receipt := range receipts { if receipt.Status == nil || *receipt.Status != types.ReceiptSuccess { continue diff --git a/consensus/polybft/state_store_checkpoint.go b/consensus/polybft/state_store_checkpoint.go index 2c53280a8a..544a4b5fb9 100644 --- a/consensus/polybft/state_store_checkpoint.go +++ b/consensus/polybft/state_store_checkpoint.go @@ -73,8 +73,8 @@ func (s *CheckpointStore) initialize(tx *bolt.Tx) error { } // insertExitEventWithTx inserts an exit event to db -func (s *CheckpointStore) insertExitEvent(exitEvent *ExitEvent, dbTx DBTransaction) error { - insertFn := func(tx DBTransaction) error { +func (s *CheckpointStore) insertExitEvent(exitEvent *ExitEvent, dbTx *bolt.Tx) error { + insertFn := func(tx *bolt.Tx) error { raw, err := json.Marshal(exitEvent) if err != nil { return err @@ -179,13 +179,13 @@ func (s *CheckpointStore) getExitEvents(epoch uint64, filter func(exitEvent *Exi } // updateLastSaved saves the last block processed for exit events -func (s *CheckpointStore) getLastSaved(dbTx DBTransaction) (uint64, error) { +func (s *CheckpointStore) getLastSaved(dbTx *bolt.Tx) (uint64, error) { var ( lastSavedBlock uint64 err error ) - getFn := func(tx DBTransaction) error { + getFn := func(tx *bolt.Tx) error { v := tx.Bucket(exitEventLastProcessedBlockBucket).Get(lastProcessedBlockKey) if v == nil { return errNoLastSavedEntry diff --git a/consensus/polybft/state_store_epoch.go b/consensus/polybft/state_store_epoch.go index fa7778c565..b2750aeea4 100644 --- a/consensus/polybft/state_store_epoch.go +++ b/consensus/polybft/state_store_epoch.go @@ -58,8 +58,8 @@ func (s *EpochStore) initialize(tx *bolt.Tx) error { } // insertValidatorSnapshot inserts a validator snapshot for the given block to its bucket in db -func (s *EpochStore) insertValidatorSnapshot(validatorSnapshot *validatorSnapshot, dbTx DBTransaction) error { - insertFn := func(tx DBTransaction) error { +func (s *EpochStore) insertValidatorSnapshot(validatorSnapshot *validatorSnapshot, dbTx *bolt.Tx) error { + insertFn := func(tx *bolt.Tx) error { raw, err := json.Marshal(validatorSnapshot) if err != nil { return err @@ -96,13 +96,13 @@ func (s *EpochStore) getValidatorSnapshot(epoch uint64) (*validatorSnapshot, err // getLastSnapshot returns the last snapshot saved in db // since they are stored by epoch number (uint64), they are sequentially stored, // so the latest epoch will be the last snapshot in db -func (s *EpochStore) getLastSnapshot(dbTx DBTransaction) (*validatorSnapshot, error) { +func (s *EpochStore) getLastSnapshot(dbTx *bolt.Tx) (*validatorSnapshot, error) { var ( snapshot *validatorSnapshot err error ) - getFn := func(tx DBTransaction) error { + getFn := func(tx *bolt.Tx) error { c := tx.Bucket(validatorSnapshotsBucket).Cursor() k, v := c.Last() @@ -126,8 +126,8 @@ func (s *EpochStore) getLastSnapshot(dbTx DBTransaction) (*validatorSnapshot, er } // insertEpoch inserts a new epoch to db with its meta data -func (s *EpochStore) insertEpoch(epoch uint64, dbTx DBTransaction) error { - insertFn := func(tx DBTransaction) error { +func (s *EpochStore) insertEpoch(epoch uint64, dbTx *bolt.Tx) error { + insertFn := func(tx *bolt.Tx) error { epochBucket, err := tx.Bucket(epochsBucket).CreateBucketIfNotExists(common.EncodeUint64ToBytes(epoch)) if err != nil { return err @@ -160,7 +160,7 @@ func (s *EpochStore) isEpochInserted(epoch uint64) bool { } // getEpochBucket returns bucket from db associated with given epoch -func getEpochBucket(tx DBTransaction, epoch uint64) (*bolt.Bucket, error) { +func getEpochBucket(tx *bolt.Tx, epoch uint64) (*bolt.Bucket, error) { epochBucket := tx.Bucket(epochsBucket).Bucket(common.EncodeUint64ToBytes(epoch)) if epochBucket == nil { return nil, fmt.Errorf("could not find bucket for epoch: %v", epoch) @@ -170,8 +170,8 @@ func getEpochBucket(tx DBTransaction, epoch uint64) (*bolt.Bucket, error) { } // cleanEpochsFromDB cleans epoch buckets from db -func (s *EpochStore) cleanEpochsFromDB(dbTx DBTransaction) error { - cleanFn := func(tx DBTransaction) error { +func (s *EpochStore) cleanEpochsFromDB(dbTx *bolt.Tx) error { + cleanFn := func(tx *bolt.Tx) error { if err := tx.DeleteBucket(epochsBucket); err != nil { return err } @@ -192,8 +192,8 @@ func (s *EpochStore) cleanEpochsFromDB(dbTx DBTransaction) error { // cleanValidatorSnapshotsFromDB cleans the validator snapshots bucket if a limit is reached, // but it leaves the latest (n) number of snapshots -func (s *EpochStore) cleanValidatorSnapshotsFromDB(epoch uint64, dbTx DBTransaction) error { - cleanFn := func(tx DBTransaction) error { +func (s *EpochStore) cleanValidatorSnapshotsFromDB(epoch uint64, dbTx *bolt.Tx) error { + cleanFn := func(tx *bolt.Tx) error { bucket := tx.Bucket(validatorSnapshotsBucket) // paired list @@ -274,7 +274,7 @@ func (s *EpochStore) validatorSnapshotsDBStats() (*bolt.BucketStats, error) { } // getNestedBucketInEpoch returns a nested (child) bucket from db associated with given epoch -func getNestedBucketInEpoch(tx DBTransaction, epoch uint64, bucketKey []byte) (*bolt.Bucket, error) { +func getNestedBucketInEpoch(tx *bolt.Tx, epoch uint64, bucketKey []byte) (*bolt.Bucket, error) { epochBucket, err := getEpochBucket(tx, epoch) if err != nil { return nil, err diff --git a/consensus/polybft/state_store_proposer_snapshot.go b/consensus/polybft/state_store_proposer_snapshot.go index 1cf316be60..abcae8456c 100644 --- a/consensus/polybft/state_store_proposer_snapshot.go +++ b/consensus/polybft/state_store_proposer_snapshot.go @@ -35,13 +35,13 @@ func (s *ProposerSnapshotStore) initialize(tx *bolt.Tx) error { } // getProposerSnapshot gets latest proposer snapshot -func (s *ProposerSnapshotStore) getProposerSnapshot(dbTx DBTransaction) (*ProposerSnapshot, error) { +func (s *ProposerSnapshotStore) getProposerSnapshot(dbTx *bolt.Tx) (*ProposerSnapshot, error) { var ( snapshot *ProposerSnapshot err error ) - getFn := func(tx DBTransaction) error { + getFn := func(tx *bolt.Tx) error { value := tx.Bucket(proposerSnapshotBucket).Get(proposerSnapshotKey) if value == nil { return nil @@ -62,8 +62,8 @@ func (s *ProposerSnapshotStore) getProposerSnapshot(dbTx DBTransaction) (*Propos } // writeProposerSnapshot writes proposer snapshot -func (s *ProposerSnapshotStore) writeProposerSnapshot(snapshot *ProposerSnapshot, dbTx DBTransaction) error { - insertFn := func(tx DBTransaction) error { +func (s *ProposerSnapshotStore) writeProposerSnapshot(snapshot *ProposerSnapshot, dbTx *bolt.Tx) error { + insertFn := func(tx *bolt.Tx) error { raw, err := json.Marshal(snapshot) if err != nil { return err diff --git a/consensus/polybft/state_store_stake.go b/consensus/polybft/state_store_stake.go index 3b81d5920c..1734b63760 100644 --- a/consensus/polybft/state_store_stake.go +++ b/consensus/polybft/state_store_stake.go @@ -32,8 +32,8 @@ func (s *StakeStore) initialize(tx *bolt.Tx) error { // insertFullValidatorSet inserts full validator set to its bucket (or updates it if exists) // If the passed tx is already open (not nil), it will use it to insert full validator set // If the passed tx is not open (it is nil), it will open a new transaction on db and insert full validator set -func (s *StakeStore) insertFullValidatorSet(fullValidatorSet validatorSetState, dbTx DBTransaction) error { - insertFn := func(tx DBTransaction) error { +func (s *StakeStore) insertFullValidatorSet(fullValidatorSet validatorSetState, dbTx *bolt.Tx) error { + insertFn := func(tx *bolt.Tx) error { raw, err := fullValidatorSet.Marshal() if err != nil { return err @@ -54,13 +54,13 @@ func (s *StakeStore) insertFullValidatorSet(fullValidatorSet validatorSetState, // getFullValidatorSet returns full validator set from its bucket if exists // If the passed tx is already open (not nil), it will use it to get full validator set // If the passed tx is not open (it is nil), it will open a new transaction on db and get full validator set -func (s *StakeStore) getFullValidatorSet(dbTx DBTransaction) (validatorSetState, error) { +func (s *StakeStore) getFullValidatorSet(dbTx *bolt.Tx) (validatorSetState, error) { var ( fullValidatorSet validatorSetState err error ) - getFn := func(tx DBTransaction) error { + getFn := func(tx *bolt.Tx) error { raw := tx.Bucket(validatorSetBucket).Get(fullValidatorSetKey) if raw == nil { return errNoFullValidatorSet diff --git a/consensus/polybft/state_store_state_sync.go b/consensus/polybft/state_store_state_sync.go index e2d8f32bb6..c1437ee82f 100644 --- a/consensus/polybft/state_store_state_sync.go +++ b/consensus/polybft/state_store_state_sync.go @@ -100,13 +100,13 @@ func (s *StateSyncStore) list() ([]*contractsapi.StateSyncedEvent, error) { // getStateSyncEventsForCommitment returns state sync events for commitment func (s *StateSyncStore) getStateSyncEventsForCommitment( - fromIndex, toIndex uint64, dbTx DBTransaction) ([]*contractsapi.StateSyncedEvent, error) { + fromIndex, toIndex uint64, dbTx *bolt.Tx) ([]*contractsapi.StateSyncedEvent, error) { var ( events []*contractsapi.StateSyncedEvent err error ) - getFn := func(tx DBTransaction) error { + getFn := func(tx *bolt.Tx) error { bucket := tx.Bucket(stateSyncEventsBucket) for i := fromIndex; i <= toIndex; i++ { v := bucket.Get(common.EncodeUint64ToBytes(i)) @@ -164,8 +164,8 @@ func (s *StateSyncStore) getCommitmentForStateSync(stateSyncID uint64) (*Commitm // insertCommitmentMessage inserts signed commitment to db func (s *StateSyncStore) insertCommitmentMessage(commitment *CommitmentMessageSigned, - dbTx DBTransaction) error { - insertFn := func(tx DBTransaction) error { + dbTx *bolt.Tx) error { + insertFn := func(tx *bolt.Tx) error { raw, err := json.Marshal(commitment) if err != nil { return err @@ -206,13 +206,13 @@ func (s *StateSyncStore) getCommitmentMessage(toIndex uint64) (*CommitmentMessag // insertMessageVote inserts given vote to signatures bucket of given epoch func (s *StateSyncStore) insertMessageVote(epoch uint64, key []byte, - vote *MessageSignature, dbTx DBTransaction) (int, error) { + vote *MessageSignature, dbTx *bolt.Tx) (int, error) { var ( numOfSignatures int err error ) - insertFn := func(tx DBTransaction) error { + insertFn := func(tx *bolt.Tx) error { signatures, err := s.getMessageVotesLocked(tx, epoch, key) if err != nil { return err @@ -279,7 +279,7 @@ func (s *StateSyncStore) getMessageVotes(epoch uint64, hash []byte) ([]*MessageS } // getMessageVotesLocked gets all signatures from db associated with given epoch and hash -func (s *StateSyncStore) getMessageVotesLocked(tx DBTransaction, epoch uint64, +func (s *StateSyncStore) getMessageVotesLocked(tx *bolt.Tx, epoch uint64, hash []byte) ([]*MessageSignature, error) { bucket, err := getNestedBucketInEpoch(tx, epoch, messageVotesBucket) if err != nil { @@ -300,8 +300,8 @@ func (s *StateSyncStore) getMessageVotesLocked(tx DBTransaction, epoch uint64, } // insertStateSyncProofs inserts the provided state sync proofs to db -func (s *StateSyncStore) insertStateSyncProofs(stateSyncProof []*StateSyncProof, dbTx DBTransaction) error { - insertFn := func(tx DBTransaction) error { +func (s *StateSyncStore) insertStateSyncProofs(stateSyncProof []*StateSyncProof, dbTx *bolt.Tx) error { + insertFn := func(tx *bolt.Tx) error { bucket := tx.Bucket(stateSyncProofsBucket) for _, ssp := range stateSyncProof { diff --git a/consensus/polybft/state_sync_manager.go b/consensus/polybft/state_sync_manager.go index 4de814eb11..35f1c982b7 100644 --- a/consensus/polybft/state_sync_manager.go +++ b/consensus/polybft/state_sync_manager.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/libp2p/go-libp2p/core/peer" "github.com/umbracle/ethgo" + bolt "go.etcd.io/bbolt" "google.golang.org/protobuf/proto" ) @@ -465,7 +466,7 @@ func (s *stateSyncManager) GetStateSyncProof(stateSyncID uint64) (types.Proof, e // buildProofs builds state sync proofs for the submitted commitment and saves them in boltDb for later execution func (s *stateSyncManager) buildProofs(commitmentMsg *contractsapi.StateSyncCommitment, - dbTx DBTransaction) error { + dbTx *bolt.Tx) error { from := commitmentMsg.StartID.Uint64() to := commitmentMsg.EndID.Uint64() @@ -514,7 +515,7 @@ func (s *stateSyncManager) buildProofs(commitmentMsg *contractsapi.StateSyncComm } // buildCommitment builds a new commitment, signs it and gossips its vote for it -func (s *stateSyncManager) buildCommitment(dbTx DBTransaction) error { +func (s *stateSyncManager) buildCommitment(dbTx *bolt.Tx) error { if !s.runtime.IsActiveValidator() { // don't build commitment if not a validator return nil diff --git a/consensus/polybft/validators_snapshot.go b/consensus/polybft/validators_snapshot.go index 999db68d56..8d8fecf7fa 100644 --- a/consensus/polybft/validators_snapshot.go +++ b/consensus/polybft/validators_snapshot.go @@ -9,6 +9,7 @@ import ( "github.com/0xPolygon/polygon-edge/consensus/polybft/validator" "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" + bolt "go.etcd.io/bbolt" ) type validatorSnapshot struct { @@ -51,7 +52,7 @@ func newValidatorsSnapshotCache( // applies pending validator set deltas to it. // Otherwise, it builds a snapshot from scratch and applies pending validator set deltas. func (v *validatorsSnapshotCache) GetSnapshot( - blockNumber uint64, parents []*types.Header, dbTx DBTransaction) (validator.AccountSet, error) { + blockNumber uint64, parents []*types.Header, dbTx *bolt.Tx) (validator.AccountSet, error) { tx := dbTx isPassedTxNil := dbTx == nil @@ -241,7 +242,7 @@ func (v *validatorsSnapshotCache) computeSnapshot( } // storeSnapshot stores given snapshot to the in-memory cache and database -func (v *validatorsSnapshotCache) storeSnapshot(snapshot *validatorSnapshot, dbTx DBTransaction) error { +func (v *validatorsSnapshotCache) storeSnapshot(snapshot *validatorSnapshot, dbTx *bolt.Tx) error { copySnap := snapshot.copy() v.snapshots[copySnap.Epoch] = copySnap @@ -255,7 +256,7 @@ func (v *validatorsSnapshotCache) storeSnapshot(snapshot *validatorSnapshot, dbT } // Cleanup cleans the validators cache in memory and db -func (v *validatorsSnapshotCache) cleanup(dbTx DBTransaction) error { +func (v *validatorsSnapshotCache) cleanup(dbTx *bolt.Tx) error { if len(v.snapshots) >= validatorSnapshotLimit { latestEpoch := uint64(0) @@ -287,7 +288,7 @@ func (v *validatorsSnapshotCache) cleanup(dbTx DBTransaction) error { // getLastCachedSnapshot gets the latest snapshot cached // If it doesn't have snapshot cached for desired epoch, it will return the latest one it has func (v *validatorsSnapshotCache) getLastCachedSnapshot(currentEpoch uint64, - dbTx DBTransaction) (*validatorSnapshot, error) { + dbTx *bolt.Tx) (*validatorSnapshot, error) { cachedSnapshot := v.snapshots[currentEpoch] if cachedSnapshot != nil { return cachedSnapshot, nil