Skip to content

Commit

Permalink
Rebase fix
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Oct 13, 2023
1 parent c113806 commit 81e231f
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 116 deletions.
6 changes: 3 additions & 3 deletions consensus/polybft/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *dummyCheckpointManager) GenerateExitProof(exitID uint64) (types.Proof,
func (d *dummyCheckpointManager) GetLogFilters() map[types.Address][]types.Hash {
return make(map[types.Address][]types.Hash)
}
func (d *dummyCheckpointManager) AddLog(header *types.Header,
func (d *dummyCheckpointManager) ProcessLog(header *types.Header,
log *ethgo.Log, dbTx *bolt.Tx) error {
return nil
}
Expand Down Expand Up @@ -433,9 +433,9 @@ func (c *checkpointManager) GetLogFilters() map[types.Address][]types.Hash {
}
}

// AddLog is the implementation of EventSubscriber interface,
// ProcessLog is the implementation of EventSubscriber interface,
// used to handle a log defined in GetLogFilters, provided by event provider
func (c *checkpointManager) AddLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error {
func (c *checkpointManager) ProcessLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error {
exitEvent, doesMatch, err := parseExitEvent(header, log)
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion consensus/polybft/consensus_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func (c *consensusRuntime) initStateSyncManager(logger hcf.Logger) error {
blockTrackerPollInterval: c.config.PolyBFTConfig.BlockTrackerPollInterval.Duration,
},
c,
c.config.blockchain,
)

c.stateSyncManager = stateSyncManager
Expand Down
6 changes: 3 additions & 3 deletions consensus/polybft/stake_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (d *dummyStakeManager) UpdateValidatorSet(epoch uint64,
func (d *dummyStakeManager) GetLogFilters() map[types.Address][]types.Hash {
return make(map[types.Address][]types.Hash)
}
func (d *dummyStakeManager) AddLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error {
func (d *dummyStakeManager) ProcessLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error {
return nil
}

Expand Down Expand Up @@ -393,9 +393,9 @@ func (s *stakeManager) GetLogFilters() map[types.Address][]types.Hash {
}
}

// AddLog is the implementation of EventSubscriber interface,
// ProcessLog is the implementation of EventSubscriber interface,
// used to handle a log defined in GetLogFilters, provided by event provider
func (s *stakeManager) AddLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error {
func (s *stakeManager) ProcessLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error {
var transferEvent contractsapi.TransferEvent

doesMatch, err := transferEvent.ParseLog(log)
Expand Down
2 changes: 1 addition & 1 deletion consensus/polybft/stake_manager_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func FuzzTestStakeManagerPostBlock(f *testing.F) {
require.NoError(t, err)

header := &types.Header{Number: data.BlockID}
require.NoError(t, stakeManager.AddLog(header, convertLog(createTestLogForTransferEvent(
require.NoError(t, stakeManager.ProcessLog(header, convertLog(createTestLogForTransferEvent(
t,
validatorSetAddr,
validators.GetValidator(initialSetAliases[data.ValidatorID]).Address(),
Expand Down
6 changes: 3 additions & 3 deletions consensus/polybft/stake_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestStakeManager_PostBlock(t *testing.T) {

header := &types.Header{Number: block}

require.NoError(t, stakeManager.AddLog(header, convertLog(createTestLogForTransferEvent(
require.NoError(t, stakeManager.ProcessLog(header, convertLog(createTestLogForTransferEvent(
t,
validatorSetAddr,
validators.GetValidator(initialSetAliases[firstValidator]).Address(),
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestStakeManager_PostBlock(t *testing.T) {
require.NoError(t, err)

header := &types.Header{Number: block}
require.NoError(t, stakeManager.AddLog(header, convertLog(createTestLogForTransferEvent(
require.NoError(t, stakeManager.ProcessLog(header, convertLog(createTestLogForTransferEvent(
t,
validatorSetAddr,
types.ZeroAddress,
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestStakeManager_PostBlock(t *testing.T) {
header := &types.Header{Number: block}

for i := 0; i < len(allAliases); i++ {
require.NoError(t, stakeManager.AddLog(header, convertLog(createTestLogForTransferEvent(
require.NoError(t, stakeManager.ProcessLog(header, convertLog(createTestLogForTransferEvent(
t,
validatorSetAddr,
types.ZeroAddress,
Expand Down
6 changes: 3 additions & 3 deletions consensus/polybft/state_event_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type EventSubscriber interface {
// and the value is a slice of signatures of events we want to get.
GetLogFilters() map[types.Address][]types.Hash

// AddLog is used to handle a log defined in GetLogFilters, provided by event provider
AddLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error
// ProcessLog is used to handle a log defined in GetLogFilters, provided by event provider
ProcessLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error
}

// EventProvider represents an event provider in a blockchain system
Expand Down Expand Up @@ -133,7 +133,7 @@ func (e *EventProvider) getEventsFromReceipts(blockHeader *types.Header,
if log.Topics[0] == logFilter {
convertedLog := convertLog(log)
for _, subscriber := range subscribers {
if err := e.subscribers[subscriber].AddLog(blockHeader, convertedLog, dbTx); err != nil {
if err := e.subscribers[subscriber].ProcessLog(blockHeader, convertedLog, dbTx); err != nil {
return err
}
}
Expand Down
100 changes: 53 additions & 47 deletions consensus/polybft/state_sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type StateSyncProof struct {

// StateSyncManager is an interface that defines functions for state sync workflow
type StateSyncManager interface {
EventSubscriber
Init() error
Close()
Commitment(blockNumber uint64) (*CommitmentMessageSigned, error)
Expand All @@ -50,17 +51,26 @@ var _ StateSyncManager = (*dummyStateSyncManager)(nil)
// dummyStateSyncManager is used when bridge is not enabled
type dummyStateSyncManager struct{}

func (n *dummyStateSyncManager) Init() error { return nil }
func (n *dummyStateSyncManager) Close() {}
func (n *dummyStateSyncManager) Commitment(blockNumber uint64) (*CommitmentMessageSigned, error) {
func (d *dummyStateSyncManager) Init() error { return nil }
func (data *dummyStateSyncManager) Close() {}

Check failure on line 55 in consensus/polybft/state_sync_manager.go

View workflow job for this annotation

GitHub Actions / golangci_lint

ST1016: methods on the same type should have the same receiver name (seen 1x "data", 1x "database", 6x "d") (stylecheck)

Check failure on line 55 in consensus/polybft/state_sync_manager.go

View workflow job for this annotation

GitHub Actions / golangci_lint

ST1016: methods on the same type should have the same receiver name (seen 1x "data", 1x "database", 6x "d") (stylecheck)
func (database *dummyStateSyncManager) Commitment(blockNumber uint64) (*CommitmentMessageSigned, error) {
return nil, nil
}
func (n *dummyStateSyncManager) PostBlock(req *PostBlockRequest) error { return nil }
func (n *dummyStateSyncManager) PostEpoch(req *PostEpochRequest) error { return nil }
func (n *dummyStateSyncManager) GetStateSyncProof(stateSyncID uint64) (types.Proof, error) {
func (d *dummyStateSyncManager) PostBlock(req *PostBlockRequest) error { return nil }
func (d *dummyStateSyncManager) PostEpoch(req *PostEpochRequest) error { return nil }
func (d *dummyStateSyncManager) GetStateSyncProof(stateSyncID uint64) (types.Proof, error) {
return types.Proof{}, nil
}

// EventSubscriber implementation
func (d *dummyStateSyncManager) GetLogFilters() map[types.Address][]types.Hash {
return make(map[types.Address][]types.Hash)
}
func (d *dummyStateSyncManager) ProcessLog(header *types.Header,
log *ethgo.Log, dbTx *bolt.Tx) error {
return nil
}

// stateSyncConfig holds the configuration data of state sync manager
type stateSyncConfig struct {
stateSenderAddr types.Address
Expand Down Expand Up @@ -93,9 +103,6 @@ type stateSyncManager struct {
nextCommittedIndex uint64

runtime Runtime

// eventsGetter gets StateSyncResult events (missed or current) from blocks
eventsGetter *eventsGetter[*contractsapi.StateSyncResultEvent]
}

// topic is an interface for p2p message gossiping
Expand All @@ -106,27 +113,13 @@ type topic interface {

// newStateSyncManager creates a new instance of state sync manager
func newStateSyncManager(logger hclog.Logger, state *State, config *stateSyncConfig,
runtime Runtime, blockchain blockchainBackend) *stateSyncManager {
eventsGetter := &eventsGetter[*contractsapi.StateSyncResultEvent]{
blockchain: blockchain,
isValidLogFn: func(l *types.Log) bool {
return l.Address == contracts.StateReceiverContract
},
parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.StateSyncResultEvent, bool, error) {
var stateSyncResultEvent contractsapi.StateSyncResultEvent
matches, err := stateSyncResultEvent.ParseLog(l)

return &stateSyncResultEvent, matches, err
},
}

runtime Runtime) *stateSyncManager {
return &stateSyncManager{
logger: logger,
state: state,
config: config,
closeCh: make(chan struct{}),
runtime: runtime,
eventsGetter: eventsGetter,
logger: logger,
state: state,
config: config,
closeCh: make(chan struct{}),
runtime: runtime,
}
}

Expand Down Expand Up @@ -421,24 +414,6 @@ func (s *stateSyncManager) PostEpoch(req *PostEpochRequest) error {
// so that it can build state sync proofs if a block has a commitment submission transaction.
// Additionally, it will remove any processed state sync events and their proofs from the store.
func (s *stateSyncManager) PostBlock(req *PostBlockRequest) error {
events, err := s.eventsGetter.getEventsFromReceipts(req.FullBlock.Block.Header, req.FullBlock.Receipts)
if err != nil {
s.logger.Info("failed to retrieve processed state sync result events from block", "error", err)
} else {
processedStateSyncEventIDs := make([]uint64, 0, len(events))
for _, event := range events {
if event.Status {
processedStateSyncEventIDs = append(processedStateSyncEventIDs, event.Counter.Uint64())
}
}

if len(processedStateSyncEventIDs) > 0 {
if err = s.state.StateSyncStore.removeStateSyncEventsAndProofs(processedStateSyncEventIDs); err != nil {
s.logger.Info("failed to remove processed state sync events data from store", "error", err)
}
}
}

commitment, err := getCommitmentMessageSignedTx(req.FullBlock.Block.Transactions)
if err != nil {
return err
Expand Down Expand Up @@ -641,3 +616,34 @@ func (s *stateSyncManager) multicast(msg interface{}) {
s.logger.Warn("failed to gossip bridge message", "err", err)
}
}

// EventSubscriber implementation

// GetLogFilters returns a map of log filters for getting desired events,
// where the key is the address of contract that emits desired events,
// and the value is a slice of signatures of events we want to get.
// This function is the implementation of EventSubscriber interface
func (s *stateSyncManager) GetLogFilters() map[types.Address][]types.Hash {
var stateSyncResultEvent contractsapi.StateSyncResultEvent

return map[types.Address][]types.Hash{
contracts.StateReceiverContract: {types.Hash(stateSyncResultEvent.Sig())},
}
}

// ProcessLog is the implementation of EventSubscriber interface,
// used to handle a log defined in GetLogFilters, provided by event provider
func (s *stateSyncManager) ProcessLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error {
var stateSyncResultEvent contractsapi.StateSyncResultEvent

doesMatch, err := stateSyncResultEvent.ParseLog(log)
if err != nil {
return err
}

if !doesMatch {
return nil
}

return s.state.StateSyncStore.removeStateSyncEventsAndProofs([]uint64{stateSyncResultEvent.Counter.Uint64()})
}
69 changes: 14 additions & 55 deletions consensus/polybft/state_sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func newTestStateSyncManager(t *testing.T, key *validator.TestValidator, runtime

topic := &mockTopic{}

blockchainBackend := new(blockchainMock)
s := newStateSyncManager(hclog.NewNullLogger(), state,
&stateSyncConfig{
stateSenderAddr: types.Address{},
Expand All @@ -44,7 +43,7 @@ func newTestStateSyncManager(t *testing.T, key *validator.TestValidator, runtime
topic: topic,
key: key.Key(),
maxCommitmentSize: maxCommitmentSize,
}, runtime, blockchainBackend)
}, runtime)

t.Cleanup(func() {
os.RemoveAll(tmpDir)
Expand Down Expand Up @@ -333,79 +332,39 @@ func TestStateSyncerManager_RemoveProcessedEventsAndProofs(t *testing.T) {
vals := validator.NewTestValidators(t, 5)

s := newTestStateSyncManager(t, vals.GetValidator("0"), &mockRuntime{isActiveValidator: true})
stateSyncEvents := generateStateSyncEvents(t, stateSyncEventsCount, 0)

for _, event := range generateStateSyncEvents(t, stateSyncEventsCount, 0) {
for _, event := range stateSyncEvents {
require.NoError(t, s.state.StateSyncStore.insertStateSyncEvent(event))
}

require.NoError(t, s.buildCommitment())
require.Len(t, s.pendingCommitments, 1)

mockMsg := &CommitmentMessageSigned{
Message: &contractsapi.StateSyncCommitment{
StartID: s.pendingCommitments[0].StartID,
EndID: s.pendingCommitments[0].EndID,
},
}

txData, err := mockMsg.EncodeAbi()
require.NoError(t, err)

tx := createStateTransactionWithData(1, types.Address{}, txData)

req := &PostBlockRequest{
FullBlock: &types.FullBlock{
Block: &types.Block{
Header: &types.Header{Number: 1},
Transactions: []*types.Transaction{tx},
},
},
}

// PostBlock() inserts commitment and proofs into the store
require.NoError(t, s.PostBlock(req))

// check the state after executing first PostBlock()
require.Equal(t, mockMsg.Message.EndID.Uint64()+1, s.nextCommittedIndex)
require.NoError(t, s.buildProofs(&contractsapi.StateSyncCommitment{
StartID: stateSyncEvents[0].ID,
EndID: stateSyncEvents[len(stateSyncEvents)-1].ID,
}, nil))

stateSyncEventsBefore, err := s.state.StateSyncStore.list()
require.NoError(t, err)
require.Equal(t, stateSyncEventsCount, len(stateSyncEventsBefore))

for i := 0; i < stateSyncEventsCount; i++ {
proof, err := s.state.StateSyncStore.getStateSyncProof(uint64(i))
for _, event := range stateSyncEvents {
proof, err := s.state.StateSyncStore.getStateSyncProof(event.ID.Uint64())
require.NoError(t, err)
require.NotNil(t, proof)
}

// create second PostBlockRequest to remove processed events and proofs from the store
req = &PostBlockRequest{
FullBlock: &types.FullBlock{
Block: &types.Block{
Header: &types.Header{Number: 2},
},
},
}

// add receipts with executed StateSyncResult logs
receiptSuccess := types.ReceiptSuccess

req.FullBlock.Receipts = make([]*types.Receipt, stateSyncEventsCount)
for i := uint64(0); i < stateSyncEventsCount; i++ {
req.FullBlock.Receipts[i] = &types.Receipt{
Status: &receiptSuccess,
Logs: []*types.Log{createTestLogForStateSyncResultEvent(t, i)}}
for _, event := range stateSyncEvents {
eventLog := createTestLogForStateSyncResultEvent(t, event.ID.Uint64())
require.NoError(t, s.ProcessLog(&types.Header{Number: 10}, convertLog(eventLog), nil))
}

require.NoError(t, s.PostBlock(req))

// all state sync events and their proofs should be removed from the store
stateSyncEventsAfter, err := s.state.StateSyncStore.list()
require.NoError(t, err)
require.Equal(t, 0, len(stateSyncEventsAfter))

for i := uint64(0); i < stateSyncEventsCount; i++ {
proof, err := s.state.StateSyncStore.getStateSyncProof(i)
for _, event := range stateSyncEvents {
proof, err := s.state.StateSyncStore.getStateSyncProof(event.ID.Uint64())
require.NoError(t, err)
require.Nil(t, proof)
}
Expand Down

0 comments on commit 81e231f

Please sign in to comment.