Skip to content

Commit

Permalink
avoid execute batch twice
Browse files Browse the repository at this point in the history
  • Loading branch information
ARR552 committed Mar 15, 2023
1 parent 4290228 commit 281409d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 31 deletions.
16 changes: 8 additions & 8 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,24 +775,24 @@ func (s *State) ProcessAndStoreClosedBatch(
encodedTxs []byte,
dbTx pgx.Tx,
caller CallerLabel,
) error {
) (*pb.ProcessBatchResponse, error) {
// Decode transactions
decodedTransactions, _, err := DecodeTxs(encodedTxs)
if err != nil && !errors.Is(err, InvalidData) {
log.Debugf("error decoding transactions: %v", err)
return err
return nil, err
}

// Open the batch and process the txs
if dbTx == nil {
return ErrDBTxNil
return nil, ErrDBTxNil
}
if err := s.OpenBatch(ctx, processingCtx, dbTx); err != nil {
return err
return nil, err
}
processed, err := s.processBatch(ctx, processingCtx.BatchNumber, encodedTxs, caller, dbTx)
if err != nil {
return err
return nil, err
}

// Sanity check
Expand Down Expand Up @@ -823,19 +823,19 @@ func (s *State) ProcessAndStoreClosedBatch(

processedBatch, err := s.convertToProcessBatchResponse(decodedTransactions, processed)
if err != nil {
return err
return nil, err
}

if len(processedBatch.Responses) > 0 {
// Store processed txs into the batch
err = s.StoreTransactions(ctx, processingCtx.BatchNumber, processedBatch.Responses, dbTx)
if err != nil {
return err
return nil, err
}
}

// Close batch
return s.closeBatch(ctx, ProcessingReceipt{
return processed, s.closeBatch(ctx, ProcessingReceipt{
BatchNumber: processingCtx.BatchNumber,
StateRoot: processedBatch.NewStateRoot,
LocalExitRoot: processedBatch.NewLocalExitRoot,
Expand Down
2 changes: 1 addition & 1 deletion synchronizer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type stateInterface interface {
// GetNextForcedBatches returns the next forcedBatches in FIFO order
GetNextForcedBatches(ctx context.Context, nextForcedBatches int, dbTx pgx.Tx) ([]state.ForcedBatch, error)
AddVerifiedBatch(ctx context.Context, verifiedBatch *state.VerifiedBatch, dbTx pgx.Tx) error
ProcessAndStoreClosedBatch(ctx context.Context, processingCtx state.ProcessingContext, encodedTxs []byte, dbTx pgx.Tx, caller state.CallerLabel) error
ProcessAndStoreClosedBatch(ctx context.Context, processingCtx state.ProcessingContext, encodedTxs []byte, dbTx pgx.Tx, caller state.CallerLabel) (*pb.ProcessBatchResponse, error)
SetGenesis(ctx context.Context, block state.Block, genesis state.Genesis, dbTx pgx.Tx) ([]byte, error)
OpenBatch(ctx context.Context, processingContext state.ProcessingContext, dbTx pgx.Tx) error
CloseBatch(ctx context.Context, receipt state.ProcessingReceipt, dbTx pgx.Tx) error
Expand Down
19 changes: 14 additions & 5 deletions synchronizer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 19 additions & 15 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,26 +660,15 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman.
ForcedBatchNum: batch.ForcedBatchNum,
}

// Reprocess batch to compare the stateRoot with tBatch.StateRoot and get accInputHash
p, err := s.state.ExecuteBatch(s.ctx, batch, false, dbTx)
if err != nil {
log.Errorf("error executing L1 batch: %+v, error: %v", batch, err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Fatalf("error rolling back state. BatchNumber: %d, BlockNumber: %d, rollbackErr: %s, error : %v", batch.BatchNumber, blockNumber, rollbackErr.Error(), err)
}
log.Fatalf("error executing L1 batch: %+v, error: %v", batch, err)
}
newRoot := common.BytesToHash(p.NewStateRoot)
accumulatedInputHash := common.BytesToHash(p.NewAccInputHash)
var newRoot common.Hash

// First get trusted batch from db
tBatch, err := s.state.GetBatchByNumber(s.ctx, batch.BatchNumber, dbTx)
if err != nil {
if errors.Is(err, state.ErrNotFound) || errors.Is(err, state.ErrStateNotSynchronized) {
log.Debugf("BatchNumber: %d, not found in trusted state. Storing it...", batch.BatchNumber)
// If it is not found, store batch
err = s.state.ProcessAndStoreClosedBatch(s.ctx, processCtx, batch.BatchL2Data, dbTx, state.SynchronizerCallerLabel)
p, err := s.state.ProcessAndStoreClosedBatch(s.ctx, processCtx, batch.BatchL2Data, dbTx, state.SynchronizerCallerLabel)
if err != nil {
log.Errorf("error storing trustedBatch. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, blockNumber, err)
rollbackErr := dbTx.Rollback(s.ctx)
Expand All @@ -690,6 +679,7 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman.
log.Errorf("error storing batch. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, blockNumber, err)
return err
}
newRoot = common.BytesToHash(p.NewStateRoot)
tBatch = &batch
tBatch.StateRoot = newRoot
} else {
Expand All @@ -702,6 +692,20 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman.
return err
}
} else {
// Reprocess batch to compare the stateRoot with tBatch.StateRoot and get accInputHash
p, err := s.state.ExecuteBatch(s.ctx, batch, false, dbTx)
if err != nil {
log.Errorf("error executing L1 batch: %+v, error: %v", batch, err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state. BatchNumber: %d, BlockNumber: %d, rollbackErr: %s, error : %v", batch.BatchNumber, blockNumber, rollbackErr.Error(), err)
return rollbackErr
}
return err
}
newRoot = common.BytesToHash(p.NewStateRoot)
accumulatedInputHash := common.BytesToHash(p.NewAccInputHash)

//AddAccumulatedInputHash
err = s.state.AddAccumulatedInputHash(s.ctx, batch.BatchNumber, accumulatedInputHash, dbTx)
if err != nil {
Expand Down Expand Up @@ -744,7 +748,7 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman.
log.Errorf("error resetting trusted state. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, blockNumber, err)
return err
}
err = s.state.ProcessAndStoreClosedBatch(s.ctx, processCtx, batch.BatchL2Data, dbTx, state.SynchronizerCallerLabel)
_, err = s.state.ProcessAndStoreClosedBatch(s.ctx, processCtx, batch.BatchL2Data, dbTx, state.SynchronizerCallerLabel)
if err != nil {
log.Errorf("error storing trustedBatch. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, blockNumber, err)
rollbackErr := dbTx.Rollback(s.ctx)
Expand Down Expand Up @@ -868,7 +872,7 @@ func (s *ClientSynchronizer) processSequenceForceBatch(sequenceForceBatch []ethe
ForcedBatchNum: &forcedBatches[i].ForcedBatchNumber,
}
// Process batch
err := s.state.ProcessAndStoreClosedBatch(s.ctx, batch, forcedBatches[i].RawTxsData, dbTx, state.SynchronizerCallerLabel)
_, err := s.state.ProcessAndStoreClosedBatch(s.ctx, batch, forcedBatches[i].RawTxsData, dbTx, state.SynchronizerCallerLabel)
if err != nil {
log.Errorf("error processing batch in processSequenceForceBatch. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, block.BlockNumber, err)
rollbackErr := dbTx.Rollback(s.ctx)
Expand Down
4 changes: 2 additions & 2 deletions synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestTrustedStateReorg(t *testing.T) {

m.State.
On("ProcessAndStoreClosedBatch", ctx, processingContext, sequencedBatch.Transactions, m.DbTx, state.SynchronizerCallerLabel).
Return(nil).
Return(&pb.ProcessBatchResponse{NewStateRoot: trustedBatch.StateRoot.Bytes()},nil).
Once()

virtualBatch := &state.VirtualBatch{
Expand Down Expand Up @@ -727,7 +727,7 @@ func TestSequenceForcedBatch(t *testing.T) {

m.State.
On("ProcessAndStoreClosedBatch", ctx, processingContext, sequencedForceBatch.Transactions, m.DbTx, state.SynchronizerCallerLabel).
Return(nil).
Return(&pb.ProcessBatchResponse{},nil).
Once()

virtualBatch := &state.VirtualBatch{
Expand Down

0 comments on commit 281409d

Please sign in to comment.