Skip to content

Commit

Permalink
PostBlock optimization (#1972)
Browse files Browse the repository at this point in the history
* PostBlock optimization

* Remove unnecessary logs

* Update last processed block in db

* Lint fix

* Smooth-out code

* Concurrent read/write fix

* Code cleanup

* Comments fix

* Comments fix - remove DBTransaction interface

* Comments fix - introduce common code for eventGetter and eventProvider

* Rebase fix
  • Loading branch information
goran-ethernal committed Oct 16, 2023
1 parent 1395ddb commit 820e21a
Show file tree
Hide file tree
Showing 30 changed files with 1,029 additions and 697 deletions.
79 changes: 41 additions & 38 deletions consensus/polybft/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"math/big"
"sort"
"strconv"

"github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/0xPolygon/polygon-edge/types"
hclog "github.com/hashicorp/go-hclog"
"github.com/umbracle/ethgo"
bolt "go.etcd.io/bbolt"
)

var (
Expand All @@ -28,6 +28,7 @@ var (
)

type CheckpointManager interface {
EventSubscriber
PostBlock(req *PostBlockRequest) error
BuildEventRoot(epoch uint64) (types.Hash, error)
GenerateExitProof(exitID uint64) (types.Proof, error)
Expand All @@ -45,6 +46,15 @@ func (d *dummyCheckpointManager) GenerateExitProof(exitID uint64) (types.Proof,
return types.Proof{}, nil
}

// EventSubscriber implementation
func (d *dummyCheckpointManager) GetLogFilters() map[types.Address][]types.Hash {
return make(map[types.Address][]types.Hash)
}
func (d *dummyCheckpointManager) ProcessLog(header *types.Header,
log *ethgo.Log, dbTx *bolt.Tx) error {
return nil
}

var _ CheckpointManager = (*checkpointManager)(nil)

// checkpointManager encapsulates logic for checkpoint data submission
Expand All @@ -67,23 +77,13 @@ type checkpointManager struct {
logger hclog.Logger
// state boltDb instance
state *State
// eventGetter gets exit events (missed or current) from blocks
eventGetter *eventsGetter[*ExitEvent]
}

// newCheckpointManager creates a new instance of checkpointManager
func newCheckpointManager(key ethgo.Key, checkpointOffset uint64,
checkpointManagerSC types.Address, txRelayer txrelayer.TxRelayer,
blockchain blockchainBackend, backend polybftBackend, logger hclog.Logger,
state *State) *checkpointManager {
retry := &eventsGetter[*ExitEvent]{
blockchain: blockchain,
isValidLogFn: func(l *types.Log) bool {
return l.Address == contracts.L2StateSenderContract
},
parseEventFn: parseExitEvent,
}

return &checkpointManager{
key: key,
blockchain: blockchain,
Expand All @@ -93,7 +93,6 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64,
checkpointManagerAddr: checkpointManagerSC,
logger: logger,
state: state,
eventGetter: retry,
}
}

Expand Down Expand Up @@ -282,33 +281,8 @@ func (c *checkpointManager) isCheckpointBlock(blockNumber uint64, isEpochEndingB
}

// PostBlock is called on every insert of finalized block (either from consensus or syncer)
// It will read any exit event that happened in block and insert it to state boltDb
// It sends a checkpoint if given block is checkpoint block and block proposer is given validator
func (c *checkpointManager) PostBlock(req *PostBlockRequest) error {
block := req.FullBlock.Block.Number()

lastBlock, err := c.state.CheckpointStore.getLastSaved()
if err != nil {
return fmt.Errorf("could not get last processed block for exit events. Error: %w", err)
}

exitEvents, err := c.eventGetter.getFromBlocks(lastBlock, req.FullBlock)
if err != nil {
return err
}

sort.Slice(exitEvents, func(i, j int) bool {
// keep events in sequential order
return exitEvents[i].ID.Cmp(exitEvents[j].ID) < 0
})

if err := c.state.CheckpointStore.insertExitEvents(exitEvents); err != nil {
return err
}

if err := c.state.CheckpointStore.updateLastSaved(block); err != nil {
return err
}

if c.isCheckpointBlock(req.FullBlock.Block.Header.Number, req.IsEpochEndingBlock) &&
bytes.Equal(c.key.Address().Bytes(), req.FullBlock.Block.Header.Miner) {
go func(header *types.Header, epochNumber uint64) {
Expand Down Expand Up @@ -445,6 +419,35 @@ func (c *checkpointManager) GenerateExitProof(exitID uint64) (types.Proof, error
}, nil
}

// EventSubscriber implementation

// GetLogFilters returns a map of log filters for getting desired events,
// where the key is the address of contract that emits desired events,
// and the value is a slice of signatures of events we want to get.
// This function is the implementation of EventSubscriber interface
func (c *checkpointManager) GetLogFilters() map[types.Address][]types.Hash {
var l2StateSyncedEvent contractsapi.L2StateSyncedEvent

return map[types.Address][]types.Hash{
contracts.L2StateSenderContract: {types.Hash(l2StateSyncedEvent.Sig())},
}
}

// ProcessLog is the implementation of EventSubscriber interface,
// used to handle a log defined in GetLogFilters, provided by event provider
func (c *checkpointManager) ProcessLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error {
exitEvent, doesMatch, err := parseExitEvent(header, log)
if err != nil {
return err
}

if !doesMatch {
return nil
}

return c.state.CheckpointStore.insertExitEvent(exitEvent, dbTx)
}

// createExitTree creates an exit event merkle tree from provided exit events
func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) {
numOfEvents := len(exitEvents)
Expand Down
122 changes: 0 additions & 122 deletions consensus/polybft/checkpoint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,128 +318,6 @@ func TestCheckpointManager_IsCheckpointBlock(t *testing.T) {
}
}

func TestCheckpointManager_PostBlock(t *testing.T) {
const (
numOfReceipts = 5
block = 5
epoch = 1
)

state := newTestState(t)

createReceipts := func(startID, endID uint64) []*types.Receipt {
receipts := make([]*types.Receipt, endID-startID)
for i := startID; i < endID; i++ {
receipts[i-startID] = &types.Receipt{Logs: []*types.Log{
createTestLogForExitEvent(t, i),
}}
receipts[i-startID].SetStatus(types.ReceiptSuccess)
}

return receipts
}

extra := &Extra{
Checkpoint: &CheckpointData{
EpochNumber: epoch,
},
}

req := &PostBlockRequest{FullBlock: &types.FullBlock{Block: &types.Block{Header: &types.Header{Number: block}}},
Epoch: epoch}

req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil)

blockchain := new(blockchainMock)
checkpointManager := newCheckpointManager(wallet.NewEcdsaSigner(createTestKey(t)), 5, types.ZeroAddress,
nil, blockchain, nil, hclog.NewNullLogger(), state)

t.Run("PostBlock - not epoch ending block", func(t *testing.T) {
require.NoError(t, state.CheckpointStore.updateLastSaved(block-1)) // we got everything till the current block
req.IsEpochEndingBlock = false
req.FullBlock.Receipts = createReceipts(0, 5)
require.NoError(t, checkpointManager.PostBlock(req))

exitEvents, err := state.CheckpointStore.getExitEvents(epoch, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block
})

require.NoError(t, err)
require.Len(t, exitEvents, 5)
require.Equal(t, uint64(epoch), exitEvents[0].EpochNumber)
})

t.Run("PostBlock - epoch ending block (exit events are saved to the next epoch)", func(t *testing.T) {
require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we got everything till the current block
req.IsEpochEndingBlock = true
req.FullBlock.Receipts = createReceipts(5, 10)
extra.Validators = &validator.ValidatorSetDelta{}
req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil)
req.FullBlock.Block.Header.Number = block + 1

require.NoError(t, checkpointManager.PostBlock(req))

exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block+2 // they should be saved in the next epoch and its first block
})

require.NoError(t, err)
require.Len(t, exitEvents, 5)
require.Equal(t, uint64(block+2), exitEvents[0].BlockNumber)
require.Equal(t, uint64(epoch+1), exitEvents[0].EpochNumber)
})

t.Run("PostBlock - there are missing events", func(t *testing.T) {
require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we are missing one block

missedReceipts := createReceipts(10, 13)
newReceipts := createReceipts(13, 15)

extra := &Extra{
Checkpoint: &CheckpointData{
EpochNumber: epoch + 1,
},
}

blockchain.On("GetHeaderByNumber", uint64(block+1)).Return(&types.Header{
Number: block + 1,
ExtraData: extra.MarshalRLPTo(nil),
Hash: types.BytesToHash([]byte{0, 1, 2, 3}),
}, true)
blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{0, 1, 2, 3})).Return([]*types.Receipt{}, nil)
blockchain.On("GetHeaderByNumber", uint64(block+2)).Return(&types.Header{
Number: block + 2,
ExtraData: extra.MarshalRLPTo(nil),
Hash: types.BytesToHash([]byte{4, 5, 6, 7}),
}, true)
blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{4, 5, 6, 7})).Return(missedReceipts, nil)

req.IsEpochEndingBlock = false
req.FullBlock.Block.Header.Number = block + 3 // new block
req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil) // same epoch
req.FullBlock.Receipts = newReceipts
require.NoError(t, checkpointManager.PostBlock(req))

exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block+2
})

require.NoError(t, err)
// receipts from missed block + events from previous test case that were saved in the next epoch
// since they were in epoch ending block
require.Len(t, exitEvents, len(missedReceipts)+5)
require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber)

exitEvents, err = state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool {
return exitEvent.BlockNumber == block+3
})

require.NoError(t, err)
require.Len(t, exitEvents, len(newReceipts))
require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber)
})
}

func TestCheckpointManager_BuildEventRoot(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 820e21a

Please sign in to comment.