From 89784748839d80f129f5df58f5e38caaa94f69fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Thu, 23 Mar 2023 13:55:31 +0100 Subject: [PATCH] Log events (#1905) * wip * WIP * log OOG on pool pre execution * log high zkcounters use * fix test * fix test * fix test * fix if * fix test * fix test --- pool/pool.go | 47 ++++++++++++++++++++++++++---- sequencer/dbmanager.go | 7 ++++- sequencer/finalizer.go | 50 ++++++++++++++++++++++++++++++-- sequencer/finalizer_test.go | 4 ++- sequencer/interfaces.go | 5 +++- sequencer/mock_db_manager.go | 14 +++++++++ sequencer/mock_state.go | 14 +++++++++ sequencer/mock_worker.go | 18 ++++++------ sequencer/txtracker.go | 4 ++- sequencer/worker.go | 4 +-- state/runtime/executor/errors.go | 5 ++++ state/types.go | 2 ++ 12 files changed, 151 insertions(+), 23 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 5cbf490a1e..7aabdd7f6e 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -9,6 +9,7 @@ import ( "github.com/0xPolygonHermez/zkevm-node/log" "github.com/0xPolygonHermez/zkevm-node/state" + "github.com/0xPolygonHermez/zkevm-node/state/runtime/executor" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/types" @@ -40,6 +41,12 @@ type Pool struct { minGasPrice *big.Int } +type preexecutionResponse struct { + usedZkCounters state.ZKCounters + isOOC bool + isOOG bool +} + // NewPool creates and initializes an instance of Pool func NewPool(cfg Config, s storage, st stateInterface, l2BridgeAddr common.Address, chainID uint64) *Pool { return &Pool{ @@ -75,11 +82,11 @@ func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isW poolTx.IsClaims = poolTx.IsClaimTx(p.l2BridgeAddr, p.cfg.FreeClaimGasLimit) // Execute transaction to calculate its zkCounters - zkCounters, err, isOOC := p.PreExecuteTx(ctx, tx) + preexecutionResponse, err := p.PreExecuteTx(ctx, tx) if err != nil { log.Debugf("PreExecuteTx error (this can be ignored): %v", err) - if isOOC { + if preexecutionResponse.isOOC { event := &state.Event{ EventType: state.EventType_Prexecution_OOC, Timestamp: time.Now(), @@ -87,24 +94,52 @@ func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isW TxHash: tx.Hash(), } + err := p.state.AddEvent(ctx, event, nil) + if err != nil { + log.Errorf("Error adding event: %v", err) + } + // Do not add tx to the pool + return fmt.Errorf("out of counters") + } else if preexecutionResponse.isOOG { + event := &state.Event{ + EventType: state.EventType_Prexecution_OOG, + Timestamp: time.Now(), + IP: ip, + TxHash: tx.Hash(), + } + err := p.state.AddEvent(ctx, event, nil) if err != nil { log.Errorf("Error adding event: %v", err) } } } - poolTx.ZKCounters = zkCounters + poolTx.ZKCounters = preexecutionResponse.usedZkCounters return p.storage.AddTx(ctx, poolTx) } // PreExecuteTx executes a transaction to calculate its zkCounters -func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (state.ZKCounters, error, bool) { +func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (preexecutionResponse, error) { + response := preexecutionResponse{usedZkCounters: state.ZKCounters{}, isOOC: false, isOOG: false} + processBatchResponse, err := p.state.PreProcessTransaction(ctx, &tx, nil) if err != nil { - return state.ZKCounters{}, err, false + return response, err } - return processBatchResponse.UsedZkCounters, processBatchResponse.ExecutorError, !processBatchResponse.IsBatchProcessed + + response.usedZkCounters = processBatchResponse.UsedZkCounters + + if processBatchResponse.IsBatchProcessed { + if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 && + executor.IsROMOutOfGasError(executor.RomErrorCode(processBatchResponse.Responses[0].RomError)) { + response.isOOC = true + } + } else { + response.isOOG = !processBatchResponse.IsBatchProcessed + } + + return response, nil } // GetPendingTxs from the pool diff --git a/sequencer/dbmanager.go b/sequencer/dbmanager.go index 5bf78688d4..a0feff1591 100644 --- a/sequencer/dbmanager.go +++ b/sequencer/dbmanager.go @@ -152,7 +152,7 @@ func (d *dbManager) loadFromPool() { } func (d *dbManager) addTxToWorker(tx pool.Transaction, isClaim bool) error { - txTracker, err := d.worker.NewTxTracker(tx.Transaction, isClaim, tx.ZKCounters) + txTracker, err := d.worker.NewTxTracker(tx.Transaction, isClaim, tx.ZKCounters, tx.IP) if err != nil { return err } @@ -556,3 +556,8 @@ func (d *dbManager) FlushMerkleTree(ctx context.Context) error { func (d *dbManager) AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error { return d.state.AddDebugInfo(ctx, info, dbTx) } + +// AddEvent is used to store and event in the database +func (d *dbManager) AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error { + return d.state.AddEvent(ctx, event, dbTx) +} diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 9696a4c0b2..caee667de3 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -398,7 +398,7 @@ func (f *finalizer) handleTxProcessResp(ctx context.Context, tx *TxTracker, resu } // Check remaining resources - err := f.checkRemainingResources(result, tx) + err := f.checkRemainingResources(ctx, result, tx) if err != nil { return err } @@ -810,12 +810,15 @@ func (f *finalizer) isDeadlineEncountered() bool { } // checkRemainingResources checks if the transaction uses less resources than the remaining ones in the batch. -func (f *finalizer) checkRemainingResources(result *state.ProcessBatchResponse, tx *TxTracker) error { +func (f *finalizer) checkRemainingResources(ctx context.Context, result *state.ProcessBatchResponse, tx *TxTracker) error { usedResources := batchResources{ zKCounters: result.UsedZkCounters, bytes: uint64(len(tx.RawTx)), } + // Log an event in case the TX consumed more than the double of the expected for a zkCounter + f.checkZKCounterConsumption(ctx, result.UsedZkCounters, tx) + err := f.batch.remainingResources.sub(usedResources) if err != nil { log.Infof("current transaction exceeds the batch limit, updating metadata for tx in worker and continuing") @@ -828,6 +831,49 @@ func (f *finalizer) checkRemainingResources(result *state.ProcessBatchResponse, return nil } +func (f *finalizer) checkZKCounterConsumption(ctx context.Context, zkCounters state.ZKCounters, tx *TxTracker) { + events := "" + + if zkCounters.CumulativeGasUsed > tx.BatchResources.zKCounters.CumulativeGasUsed*2 { + events += "CumulativeGasUsed " + } + if zkCounters.UsedKeccakHashes > tx.BatchResources.zKCounters.UsedKeccakHashes*2 { + events += "UsedKeccakHashes " + } + if zkCounters.UsedPoseidonHashes > tx.BatchResources.zKCounters.UsedPoseidonHashes*2 { + events += "UsedPoseidonHashes " + } + if zkCounters.UsedPoseidonPaddings > tx.BatchResources.zKCounters.UsedPoseidonPaddings*2 { + events += "UsedPoseidonPaddings " + } + if zkCounters.UsedMemAligns > tx.BatchResources.zKCounters.UsedMemAligns*2 { + events += "UsedMemAligns " + } + if zkCounters.UsedArithmetics > tx.BatchResources.zKCounters.UsedArithmetics*2 { + events += "UsedArithmetics " + } + if zkCounters.UsedBinaries > tx.BatchResources.zKCounters.UsedBinaries*2 { + events += "UsedBinaries " + } + if zkCounters.UsedSteps > tx.BatchResources.zKCounters.UsedSteps*2 { + events += "UsedSteps " + } + + if events != "" { + event := &state.Event{ + EventType: state.EventType_ZKCounters_Diff + " " + events, + Timestamp: time.Now(), + IP: tx.IP, + TxHash: tx.Hash, + } + + err := f.dbManager.AddEvent(ctx, event, nil) + if err != nil { + log.Errorf("Error adding event: %v", err) + } + } +} + // isBatchAlmostFull checks if the current batch remaining resources are under the constraints threshold for most efficient moment to close a batch func (f *finalizer) isBatchAlmostFull() bool { resources := f.batch.remainingResources diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index b7864b015a..1a6c9d6c59 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -896,6 +896,7 @@ func TestFinalizer_isDeadlineEncountered(t *testing.T) { func TestFinalizer_checkRemainingResources(t *testing.T) { // arrange f = setupFinalizer(true) + ctx := context.Background() txResponse := &state.ProcessTransactionResponse{TxHash: oldHash} result := &state.ProcessBatchResponse{ UsedZkCounters: state.ZKCounters{CumulativeGasUsed: 1000}, @@ -944,12 +945,13 @@ func TestFinalizer_checkRemainingResources(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // arrange f.batch.remainingResources = tc.remaining + dbManagerMock.On("AddEvent", ctx, mock.Anything, nil).Return(nil) if tc.expectedWorkerUpdate { workerMock.On("UpdateTx", txResponse.TxHash, tc.expectedTxTracker.From, result.UsedZkCounters).Return().Once() } // act - err := f.checkRemainingResources(result, tc.expectedTxTracker) + err := f.checkRemainingResources(ctx, result, tc.expectedTxTracker) // assert if tc.expectedErr != nil { diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index 64851f329c..793ffaa7a8 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -76,6 +76,7 @@ type stateInterface interface { GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error) FlushMerkleTree(ctx context.Context) error AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error + AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error } type workerInterface interface { @@ -86,7 +87,7 @@ type workerInterface interface { MoveTxToNotReady(txHash common.Hash, from common.Address, actualNonce *uint64, actualBalance *big.Int) []*TxTracker DeleteTx(txHash common.Hash, from common.Address) HandleL2Reorg(txHashes []common.Hash) - NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters) (*TxTracker, error) + NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, ip string) (*TxTracker, error) } // The dbManager will need to handle the errors inside the functions which don't return error as they will be used async in the other abstractions. @@ -118,6 +119,7 @@ type dbManagerInterface interface { CountReorgs(ctx context.Context, dbTx pgx.Tx) (uint64, error) FlushMerkleTree(ctx context.Context) error AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error + AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error } type dbManagerStateInterface interface { @@ -149,6 +151,7 @@ type dbManagerStateInterface interface { GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error) FlushMerkleTree(ctx context.Context) error AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error + AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error } type ethTxManager interface { diff --git a/sequencer/mock_db_manager.go b/sequencer/mock_db_manager.go index dbaaa42137..19581e6157 100644 --- a/sequencer/mock_db_manager.go +++ b/sequencer/mock_db_manager.go @@ -40,6 +40,20 @@ func (_m *DbManagerMock) AddDebugInfo(ctx context.Context, info *state.DebugInfo return r0 } +// AddEvent provides a mock function with given fields: ctx, event, dbTx +func (_m *DbManagerMock) AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error { + ret := _m.Called(ctx, event, dbTx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *state.Event, pgx.Tx) error); ok { + r0 = rf(ctx, event, dbTx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // BeginStateTransaction provides a mock function with given fields: ctx func (_m *DbManagerMock) BeginStateTransaction(ctx context.Context) (pgx.Tx, error) { ret := _m.Called(ctx) diff --git a/sequencer/mock_state.go b/sequencer/mock_state.go index 5b739d88cf..98a166c1e8 100644 --- a/sequencer/mock_state.go +++ b/sequencer/mock_state.go @@ -40,6 +40,20 @@ func (_m *StateMock) AddDebugInfo(ctx context.Context, info *state.DebugInfo, db return r0 } +// AddEvent provides a mock function with given fields: ctx, event, dbTx +func (_m *StateMock) AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error { + ret := _m.Called(ctx, event, dbTx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *state.Event, pgx.Tx) error); ok { + r0 = rf(ctx, event, dbTx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Begin provides a mock function with given fields: ctx func (_m *StateMock) Begin(ctx context.Context) (pgx.Tx, error) { ret := _m.Called(ctx) diff --git a/sequencer/mock_worker.go b/sequencer/mock_worker.go index 8877a404d7..d80abe38c1 100644 --- a/sequencer/mock_worker.go +++ b/sequencer/mock_worker.go @@ -67,25 +67,25 @@ func (_m *WorkerMock) MoveTxToNotReady(txHash common.Hash, from common.Address, return r0 } -// NewTxTracker provides a mock function with given fields: tx, isClaim, counters -func (_m *WorkerMock) NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters) (*TxTracker, error) { - ret := _m.Called(tx, isClaim, counters) +// NewTxTracker provides a mock function with given fields: tx, isClaim, counters, ip +func (_m *WorkerMock) NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, ip string) (*TxTracker, error) { + ret := _m.Called(tx, isClaim, counters, ip) var r0 *TxTracker var r1 error - if rf, ok := ret.Get(0).(func(types.Transaction, bool, state.ZKCounters) (*TxTracker, error)); ok { - return rf(tx, isClaim, counters) + if rf, ok := ret.Get(0).(func(types.Transaction, bool, state.ZKCounters, string) (*TxTracker, error)); ok { + return rf(tx, isClaim, counters, ip) } - if rf, ok := ret.Get(0).(func(types.Transaction, bool, state.ZKCounters) *TxTracker); ok { - r0 = rf(tx, isClaim, counters) + if rf, ok := ret.Get(0).(func(types.Transaction, bool, state.ZKCounters, string) *TxTracker); ok { + r0 = rf(tx, isClaim, counters, ip) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*TxTracker) } } - if rf, ok := ret.Get(1).(func(types.Transaction, bool, state.ZKCounters) error); ok { - r1 = rf(tx, isClaim, counters) + if rf, ok := ret.Get(1).(func(types.Transaction, bool, state.ZKCounters, string) error); ok { + r1 = rf(tx, isClaim, counters, ip) } else { r1 = ret.Error(1) } diff --git a/sequencer/txtracker.go b/sequencer/txtracker.go index 58341503aa..1d67e2c2ef 100644 --- a/sequencer/txtracker.go +++ b/sequencer/txtracker.go @@ -27,10 +27,11 @@ type TxTracker struct { Efficiency float64 RawTx []byte ReceivedAt time.Time // To check if it has been in the efficiency list for too long + IP string // IP of the tx sender } // newTxTracker creates and inits a TxTracker -func newTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, constraints batchConstraints, weights batchResourceWeights) (*TxTracker, error) { +func newTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, constraints batchConstraints, weights batchResourceWeights, ip string) (*TxTracker, error) { addr, err := state.GetSender(tx) if err != nil { return nil, err @@ -44,6 +45,7 @@ func newTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, GasPrice: tx.GasPrice(), Cost: tx.Cost(), ReceivedAt: time.Now(), + IP: ip, } txTracker.IsClaim = isClaim diff --git a/sequencer/worker.go b/sequencer/worker.go index bc50de0709..0201b7020f 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -37,8 +37,8 @@ func NewWorker(state stateInterface, constraints batchConstraints, weights batch } // NewTxTracker creates and inits a TxTracker -func (w *Worker) NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters) (*TxTracker, error) { - return newTxTracker(tx, isClaim, counters, w.batchConstraints, w.batchResourceWeights) +func (w *Worker) NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, ip string) (*TxTracker, error) { + return newTxTracker(tx, isClaim, counters, w.batchConstraints, w.batchResourceWeights, ip) } // AddTxTracker adds a new Tx to the Worker diff --git a/state/runtime/executor/errors.go b/state/runtime/executor/errors.go index 902e5ebf9f..ceee9f8195 100644 --- a/state/runtime/executor/errors.go +++ b/state/runtime/executor/errors.go @@ -221,6 +221,11 @@ func IsROMOutOfCountersError(error pb.RomError) bool { return int32(error) >= ROM_ERROR_OUT_OF_COUNTERS_STEP && int32(error) <= ROM_ERROR_OUT_OF_COUNTERS_POSEIDON } +// IsROMOutOfGasError indicates if the error is an ROM OOG +func IsROMOutOfGasError(error pb.RomError) bool { + return int32(error) == ROM_ERROR_OUT_OF_GAS +} + // IsExecutorOutOfCountersError indicates if the error is an ROM OOC func IsExecutorOutOfCountersError(error pb.ExecutorError) bool { return int32(error) >= EXECUTOR_ERROR_COUNTERS_OVERFLOW_KECCAK && int32(error) <= ROM_ERROR_OUT_OF_COUNTERS_POSEIDON diff --git a/state/types.go b/state/types.go index 8d305d9933..e34d5f6ca6 100644 --- a/state/types.go +++ b/state/types.go @@ -175,6 +175,8 @@ type TrustedReorg struct { const ( // EventType_Prexecution_OOC indicates a preexecution out of couters error EventType_Prexecution_OOC = "PREEXECUTION OOC" + // EventType_Prexecution_OOG indicates a preexecution out of gas error + EventType_Prexecution_OOG = "PREEXECUTION OOG" // EventType_ZKCounters_Diff indicates big different in preexecution and execution regarding ZKCounters EventType_ZKCounters_Diff = "ZK COUNTERS DIFF" )