Skip to content

Commit

Permalink
Log events (#1905)
Browse files Browse the repository at this point in the history
* wip

* WIP

* log OOG on pool pre execution

* log high zkcounters use

* fix test

* fix test

* fix test

* fix if

* fix test

* fix test
  • Loading branch information
ToniRamirezM committed Mar 23, 2023
1 parent 88b5012 commit 8978474
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 23 deletions.
47 changes: 41 additions & 6 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -40,6 +41,12 @@ type Pool struct {
minGasPrice *big.Int
}

type preexecutionResponse struct {
usedZkCounters state.ZKCounters
isOOC bool
isOOG bool
}

// NewPool creates and initializes an instance of Pool
func NewPool(cfg Config, s storage, st stateInterface, l2BridgeAddr common.Address, chainID uint64) *Pool {
return &Pool{
Expand Down Expand Up @@ -75,36 +82,64 @@ func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isW
poolTx.IsClaims = poolTx.IsClaimTx(p.l2BridgeAddr, p.cfg.FreeClaimGasLimit)

// Execute transaction to calculate its zkCounters
zkCounters, err, isOOC := p.PreExecuteTx(ctx, tx)
preexecutionResponse, err := p.PreExecuteTx(ctx, tx)
if err != nil {
log.Debugf("PreExecuteTx error (this can be ignored): %v", err)

if isOOC {
if preexecutionResponse.isOOC {
event := &state.Event{
EventType: state.EventType_Prexecution_OOC,
Timestamp: time.Now(),
IP: ip,
TxHash: tx.Hash(),
}

err := p.state.AddEvent(ctx, event, nil)
if err != nil {
log.Errorf("Error adding event: %v", err)
}
// Do not add tx to the pool
return fmt.Errorf("out of counters")
} else if preexecutionResponse.isOOG {
event := &state.Event{
EventType: state.EventType_Prexecution_OOG,
Timestamp: time.Now(),
IP: ip,
TxHash: tx.Hash(),
}

err := p.state.AddEvent(ctx, event, nil)
if err != nil {
log.Errorf("Error adding event: %v", err)
}
}
}
poolTx.ZKCounters = zkCounters
poolTx.ZKCounters = preexecutionResponse.usedZkCounters

return p.storage.AddTx(ctx, poolTx)
}

// PreExecuteTx executes a transaction to calculate its zkCounters
func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (state.ZKCounters, error, bool) {
func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (preexecutionResponse, error) {
response := preexecutionResponse{usedZkCounters: state.ZKCounters{}, isOOC: false, isOOG: false}

processBatchResponse, err := p.state.PreProcessTransaction(ctx, &tx, nil)
if err != nil {
return state.ZKCounters{}, err, false
return response, err
}
return processBatchResponse.UsedZkCounters, processBatchResponse.ExecutorError, !processBatchResponse.IsBatchProcessed

response.usedZkCounters = processBatchResponse.UsedZkCounters

if processBatchResponse.IsBatchProcessed {
if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 &&
executor.IsROMOutOfGasError(executor.RomErrorCode(processBatchResponse.Responses[0].RomError)) {
response.isOOC = true
}
} else {
response.isOOG = !processBatchResponse.IsBatchProcessed
}

return response, nil
}

// GetPendingTxs from the pool
Expand Down
7 changes: 6 additions & 1 deletion sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (d *dbManager) loadFromPool() {
}

func (d *dbManager) addTxToWorker(tx pool.Transaction, isClaim bool) error {
txTracker, err := d.worker.NewTxTracker(tx.Transaction, isClaim, tx.ZKCounters)
txTracker, err := d.worker.NewTxTracker(tx.Transaction, isClaim, tx.ZKCounters, tx.IP)
if err != nil {
return err
}
Expand Down Expand Up @@ -556,3 +556,8 @@ func (d *dbManager) FlushMerkleTree(ctx context.Context) error {
func (d *dbManager) AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error {
return d.state.AddDebugInfo(ctx, info, dbTx)
}

// AddEvent is used to store and event in the database
func (d *dbManager) AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error {
return d.state.AddEvent(ctx, event, dbTx)
}
50 changes: 48 additions & 2 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (f *finalizer) handleTxProcessResp(ctx context.Context, tx *TxTracker, resu
}

// Check remaining resources
err := f.checkRemainingResources(result, tx)
err := f.checkRemainingResources(ctx, result, tx)
if err != nil {
return err
}
Expand Down Expand Up @@ -810,12 +810,15 @@ func (f *finalizer) isDeadlineEncountered() bool {
}

// checkRemainingResources checks if the transaction uses less resources than the remaining ones in the batch.
func (f *finalizer) checkRemainingResources(result *state.ProcessBatchResponse, tx *TxTracker) error {
func (f *finalizer) checkRemainingResources(ctx context.Context, result *state.ProcessBatchResponse, tx *TxTracker) error {
usedResources := batchResources{
zKCounters: result.UsedZkCounters,
bytes: uint64(len(tx.RawTx)),
}

// Log an event in case the TX consumed more than the double of the expected for a zkCounter
f.checkZKCounterConsumption(ctx, result.UsedZkCounters, tx)

err := f.batch.remainingResources.sub(usedResources)
if err != nil {
log.Infof("current transaction exceeds the batch limit, updating metadata for tx in worker and continuing")
Expand All @@ -828,6 +831,49 @@ func (f *finalizer) checkRemainingResources(result *state.ProcessBatchResponse,
return nil
}

func (f *finalizer) checkZKCounterConsumption(ctx context.Context, zkCounters state.ZKCounters, tx *TxTracker) {
events := ""

if zkCounters.CumulativeGasUsed > tx.BatchResources.zKCounters.CumulativeGasUsed*2 {
events += "CumulativeGasUsed "
}
if zkCounters.UsedKeccakHashes > tx.BatchResources.zKCounters.UsedKeccakHashes*2 {
events += "UsedKeccakHashes "
}
if zkCounters.UsedPoseidonHashes > tx.BatchResources.zKCounters.UsedPoseidonHashes*2 {
events += "UsedPoseidonHashes "
}
if zkCounters.UsedPoseidonPaddings > tx.BatchResources.zKCounters.UsedPoseidonPaddings*2 {
events += "UsedPoseidonPaddings "
}
if zkCounters.UsedMemAligns > tx.BatchResources.zKCounters.UsedMemAligns*2 {
events += "UsedMemAligns "
}
if zkCounters.UsedArithmetics > tx.BatchResources.zKCounters.UsedArithmetics*2 {
events += "UsedArithmetics "
}
if zkCounters.UsedBinaries > tx.BatchResources.zKCounters.UsedBinaries*2 {
events += "UsedBinaries "
}
if zkCounters.UsedSteps > tx.BatchResources.zKCounters.UsedSteps*2 {
events += "UsedSteps "
}

if events != "" {
event := &state.Event{
EventType: state.EventType_ZKCounters_Diff + " " + events,
Timestamp: time.Now(),
IP: tx.IP,
TxHash: tx.Hash,
}

err := f.dbManager.AddEvent(ctx, event, nil)
if err != nil {
log.Errorf("Error adding event: %v", err)
}
}
}

// isBatchAlmostFull checks if the current batch remaining resources are under the constraints threshold for most efficient moment to close a batch
func (f *finalizer) isBatchAlmostFull() bool {
resources := f.batch.remainingResources
Expand Down
4 changes: 3 additions & 1 deletion sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,7 @@ func TestFinalizer_isDeadlineEncountered(t *testing.T) {
func TestFinalizer_checkRemainingResources(t *testing.T) {
// arrange
f = setupFinalizer(true)
ctx := context.Background()
txResponse := &state.ProcessTransactionResponse{TxHash: oldHash}
result := &state.ProcessBatchResponse{
UsedZkCounters: state.ZKCounters{CumulativeGasUsed: 1000},
Expand Down Expand Up @@ -944,12 +945,13 @@ func TestFinalizer_checkRemainingResources(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// arrange
f.batch.remainingResources = tc.remaining
dbManagerMock.On("AddEvent", ctx, mock.Anything, nil).Return(nil)
if tc.expectedWorkerUpdate {
workerMock.On("UpdateTx", txResponse.TxHash, tc.expectedTxTracker.From, result.UsedZkCounters).Return().Once()
}

// act
err := f.checkRemainingResources(result, tc.expectedTxTracker)
err := f.checkRemainingResources(ctx, result, tc.expectedTxTracker)

// assert
if tc.expectedErr != nil {
Expand Down
5 changes: 4 additions & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type stateInterface interface {
GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error)
FlushMerkleTree(ctx context.Context) error
AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error
AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error
}

type workerInterface interface {
Expand All @@ -86,7 +87,7 @@ type workerInterface interface {
MoveTxToNotReady(txHash common.Hash, from common.Address, actualNonce *uint64, actualBalance *big.Int) []*TxTracker
DeleteTx(txHash common.Hash, from common.Address)
HandleL2Reorg(txHashes []common.Hash)
NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters) (*TxTracker, error)
NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, ip string) (*TxTracker, error)
}

// The dbManager will need to handle the errors inside the functions which don't return error as they will be used async in the other abstractions.
Expand Down Expand Up @@ -118,6 +119,7 @@ type dbManagerInterface interface {
CountReorgs(ctx context.Context, dbTx pgx.Tx) (uint64, error)
FlushMerkleTree(ctx context.Context) error
AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error
AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error
}

type dbManagerStateInterface interface {
Expand Down Expand Up @@ -149,6 +151,7 @@ type dbManagerStateInterface interface {
GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error)
FlushMerkleTree(ctx context.Context) error
AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error
AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error
}

type ethTxManager interface {
Expand Down
14 changes: 14 additions & 0 deletions sequencer/mock_db_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions sequencer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions sequencer/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion sequencer/txtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ type TxTracker struct {
Efficiency float64
RawTx []byte
ReceivedAt time.Time // To check if it has been in the efficiency list for too long
IP string // IP of the tx sender
}

// newTxTracker creates and inits a TxTracker
func newTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, constraints batchConstraints, weights batchResourceWeights) (*TxTracker, error) {
func newTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, constraints batchConstraints, weights batchResourceWeights, ip string) (*TxTracker, error) {
addr, err := state.GetSender(tx)
if err != nil {
return nil, err
Expand All @@ -44,6 +45,7 @@ func newTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters,
GasPrice: tx.GasPrice(),
Cost: tx.Cost(),
ReceivedAt: time.Now(),
IP: ip,
}

txTracker.IsClaim = isClaim
Expand Down
4 changes: 2 additions & 2 deletions sequencer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewWorker(state stateInterface, constraints batchConstraints, weights batch
}

// NewTxTracker creates and inits a TxTracker
func (w *Worker) NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters) (*TxTracker, error) {
return newTxTracker(tx, isClaim, counters, w.batchConstraints, w.batchResourceWeights)
func (w *Worker) NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, ip string) (*TxTracker, error) {
return newTxTracker(tx, isClaim, counters, w.batchConstraints, w.batchResourceWeights, ip)
}

// AddTxTracker adds a new Tx to the Worker
Expand Down
5 changes: 5 additions & 0 deletions state/runtime/executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func IsROMOutOfCountersError(error pb.RomError) bool {
return int32(error) >= ROM_ERROR_OUT_OF_COUNTERS_STEP && int32(error) <= ROM_ERROR_OUT_OF_COUNTERS_POSEIDON
}

// IsROMOutOfGasError indicates if the error is an ROM OOG
func IsROMOutOfGasError(error pb.RomError) bool {
return int32(error) == ROM_ERROR_OUT_OF_GAS
}

// IsExecutorOutOfCountersError indicates if the error is an ROM OOC
func IsExecutorOutOfCountersError(error pb.ExecutorError) bool {
return int32(error) >= EXECUTOR_ERROR_COUNTERS_OVERFLOW_KECCAK && int32(error) <= ROM_ERROR_OUT_OF_COUNTERS_POSEIDON
Expand Down
2 changes: 2 additions & 0 deletions state/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ type TrustedReorg struct {
const (
// EventType_Prexecution_OOC indicates a preexecution out of couters error
EventType_Prexecution_OOC = "PREEXECUTION OOC"
// EventType_Prexecution_OOG indicates a preexecution out of gas error
EventType_Prexecution_OOG = "PREEXECUTION OOG"
// EventType_ZKCounters_Diff indicates big different in preexecution and execution regarding ZKCounters
EventType_ZKCounters_Diff = "ZK COUNTERS DIFF"
)
Expand Down

0 comments on commit 8978474

Please sign in to comment.