Skip to content

Commit

Permalink
add deposit_counter to pool (#1958)
Browse files Browse the repository at this point in the history
  • Loading branch information
tclemos committed Mar 30, 2023
1 parent 04fbdc7 commit c04d513
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 58 deletions.
7 changes: 7 additions & 0 deletions db/migrations/pool/0006.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up
ALTER TABLE pool.transaction
ADD COLUMN deposit_count BIGINT;

-- +migrate Down
ALTER TABLE pool.transaction
DROP COLUMN deposit_count;
4 changes: 2 additions & 2 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/types"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/pool/pgpoolstorage"
"github.com/0xPolygonHermez/zkevm-node/pool"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -519,7 +519,7 @@ func (e *EthEndpoints) GetTransactionByHash(hash types.ArgHash) (interface{}, ty
return e.getTransactionByHashFromSequencerNode(hash.Hash())
}
poolTx, err := e.pool.GetTxByHash(ctx, hash.Hash())
if errors.Is(err, pgpoolstorage.ErrNotFound) {
if errors.Is(err, pool.ErrNotFound) {
return nil, nil
} else if err != nil {
return rpcErrorResponse(types.DefaultErrorCode, "failed to load transaction by hash from pool", err)
Expand Down
3 changes: 1 addition & 2 deletions jsonrpc/endpoints_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/types"
"github.com/0xPolygonHermez/zkevm-node/pool"
"github.com/0xPolygonHermez/zkevm-node/pool/pgpoolstorage"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime"
"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -2118,7 +2117,7 @@ func TestGetTransactionByHash(t *testing.T) {

m.Pool.
On("GetTxByHash", context.Background(), tc.Hash).
Return(nil, pgpoolstorage.ErrNotFound).
Return(nil, pool.ErrNotFound).
Once()
},
},
Expand Down
1 change: 1 addition & 0 deletions pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type storage interface {
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
MinGasPriceSince(ctx context.Context, timestamp time.Time) (uint64, error)
DepositCountExists(ctx context.Context, depositCount uint64) (bool, error)
}

type stateInterface interface {
Expand Down
49 changes: 31 additions & 18 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
)

var (
// ErrNotFound indicates an object has not been found for the search criteria used
ErrNotFound = errors.New("object not found")
)

// PostgresPoolStorage is an implementation of the Pool interface
// that uses a postgres database to store the data
type PostgresPoolStorage struct {
Expand Down Expand Up @@ -79,10 +74,11 @@ func (p *PostgresPoolStorage) AddTx(ctx context.Context, tx pool.Transaction) er
received_at,
from_address,
is_wip,
ip
ip,
deposit_count
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
ON CONFLICT (hash) DO UPDATE SET
encoded = $2,
decoded = $3,
Expand All @@ -101,7 +97,8 @@ func (p *PostgresPoolStorage) AddTx(ctx context.Context, tx pool.Transaction) er
received_at = $16,
from_address = $17,
is_wip = $18,
ip = $19
ip = $19,
deposit_count = $20
`

// Get FromAddress from the JSON data
Expand Down Expand Up @@ -130,7 +127,8 @@ func (p *PostgresPoolStorage) AddTx(ctx context.Context, tx pool.Transaction) er
tx.ReceivedAt,
fromAddress,
tx.IsWIP,
tx.IP); err != nil {
tx.IP,
tx.DepositCount); err != nil {
return err
}
return nil
Expand All @@ -146,10 +144,10 @@ func (p *PostgresPoolStorage) GetTxsByStatus(ctx context.Context, status pool.Tx
sql string
)
if limit == 0 {
sql = "SELECT encoded, status, received_at, is_wip, ip FROM pool.transaction WHERE status = $1 ORDER BY gas_price DESC"
sql = "SELECT encoded, status, received_at, is_wip, ip, deposit_count FROM pool.transaction WHERE status = $1 ORDER BY gas_price DESC"
rows, err = p.db.Query(ctx, sql, status.String())
} else {
sql = "SELECT encoded, status, received_at, is_wip, ip FROM pool.transaction WHERE status = $1 AND is_claims = $2 ORDER BY gas_price DESC LIMIT $3"
sql = "SELECT encoded, status, received_at, is_wip, ip, deposit_count FROM pool.transaction WHERE status = $1 AND is_claims = $2 ORDER BY gas_price DESC LIMIT $3"
rows, err = p.db.Query(ctx, sql, status.String(), isClaims, limit)
}
if err != nil {
Expand Down Expand Up @@ -179,10 +177,10 @@ func (p *PostgresPoolStorage) GetNonWIPTxsByStatus(ctx context.Context, status p
sql string
)
if limit == 0 {
sql = "SELECT encoded, status, received_at, is_wip, ip FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC"
sql = "SELECT encoded, status, received_at, is_wip, ip, deposit_count FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC"
rows, err = p.db.Query(ctx, sql, status.String())
} else {
sql = "SELECT encoded, status, received_at, is_wip, ip FROM pool.transaction WHERE is_wip IS FALSE and status = $1 AND is_claims = $2 ORDER BY gas_price DESC LIMIT $3"
sql = "SELECT encoded, status, received_at, is_wip, ip, deposit_count FROM pool.transaction WHERE is_wip IS FALSE and status = $1 AND is_claims = $2 ORDER BY gas_price DESC LIMIT $3"
rows, err = p.db.Query(ctx, sql, status.String(), isClaims, limit)
}
if err != nil {
Expand Down Expand Up @@ -299,7 +297,7 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt

rows, err := p.db.Query(ctx, query, args...)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
return nil, pool.ErrNotFound
} else if err != nil {
return nil, err
}
Expand Down Expand Up @@ -463,7 +461,7 @@ func (p *PostgresPoolStorage) IsTxPending(ctx context.Context, hash common.Hash)

// GetTxsByFromAndNonce get all the transactions from the pool with the same from and nonce
func (p *PostgresPoolStorage) GetTxsByFromAndNonce(ctx context.Context, from common.Address, nonce uint64) ([]pool.Transaction, error) {
sql := `SELECT encoded, status, received_at, is_wip, ip
sql := `SELECT encoded, status, received_at, is_wip, ip, deposit_count
FROM pool.transaction
WHERE from_address = $1
AND nonce = $2`
Expand Down Expand Up @@ -554,7 +552,7 @@ func (p *PostgresPoolStorage) GetTxByHash(ctx context.Context, hash common.Hash)
WHERE hash = $1`
err := p.db.QueryRow(ctx, sql, hash.String()).Scan(&encoded, &status, &receivedAt, &isWIP, &ip)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
return nil, pool.ErrNotFound
} else if err != nil {
return nil, err
}
Expand Down Expand Up @@ -585,9 +583,10 @@ func scanTx(rows pgx.Rows) (*pool.Transaction, error) {
encoded, status, ip string
receivedAt time.Time
isWIP bool
depositCount *uint64
)

if err := rows.Scan(&encoded, &status, &receivedAt, &isWIP, &ip); err != nil {
if err := rows.Scan(&encoded, &status, &receivedAt, &isWIP, &ip, &depositCount); err != nil {
return nil, err
}

Expand All @@ -606,6 +605,7 @@ func scanTx(rows pgx.Rows) (*pool.Transaction, error) {
tx.ReceivedAt = receivedAt
tx.IsWIP = isWIP
tx.IP = ip
tx.DepositCount = depositCount

return tx, nil
}
Expand All @@ -629,7 +629,7 @@ func (p *PostgresPoolStorage) GetTxZkCountersByHash(ctx context.Context, hash co
&zkCounters.UsedPoseidonHashes, &zkCounters.UsedPoseidonPaddings,
&zkCounters.UsedMemAligns, &zkCounters.UsedArithmetics, &zkCounters.UsedBinaries, &zkCounters.UsedSteps)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
return nil, pool.ErrNotFound
} else if err != nil {
return nil, err
}
Expand All @@ -655,3 +655,16 @@ func (p *PostgresPoolStorage) UpdateTxWIPStatus(ctx context.Context, hash common
}
return nil
}

// DepositCountExists checks if already exists a transaction in the pool with the
// provided deposit count
func (p *PostgresPoolStorage) DepositCountExists(ctx context.Context, depositCount uint64) (bool, error) {
var exists bool
req := "SELECT exists (SELECT 1 FROM pool.transaction WHERE deposit_count = $1)"
err := p.db.QueryRow(ctx, req, depositCount).Scan(&exists)
if err != nil && err != sql.ErrNoRows {
return false, err
}

return exists, nil
}
106 changes: 77 additions & 29 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
Expand All @@ -22,6 +23,9 @@ const (
)

var (
// ErrNotFound indicates an object has not been found for the search criteria used
ErrNotFound = errors.New("object not found")

// ErrAlreadyKnown is returned if the transactions is already contained
// within the pool.
ErrAlreadyKnown = errors.New("already known")
Expand All @@ -43,10 +47,11 @@ type Pool struct {
minSuggestedGasPriceMux *sync.RWMutex
}

type preexecutionResponse struct {
type preExecutionResponse struct {
usedZkCounters state.ZKCounters
isOOC bool
isOOG bool
isReverted bool
}

// NewPool creates and initializes an instance of Pool
Expand Down Expand Up @@ -90,46 +95,88 @@ func (p *Pool) AddTx(ctx context.Context, tx types.Transaction, ip string) error
func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isWIP bool) error {
poolTx := NewTransaction(tx, ip, isWIP, p)
// Execute transaction to calculate its zkCounters
preexecutionResponse, err := p.PreExecuteTx(ctx, tx)
preExecutionResponse, err := p.PreExecuteTx(ctx, tx)
if err != nil {
log.Debugf("PreExecuteTx error (this can be ignored): %v", err)
}
if preExecutionResponse.isOOC {
event := &state.Event{
EventType: state.EventType_Prexecution_OOC,
Timestamp: time.Now(),
IP: ip,
TxHash: tx.Hash(),
}

if preexecutionResponse.isOOC {
event := &state.Event{
EventType: state.EventType_Prexecution_OOC,
Timestamp: time.Now(),
IP: ip,
TxHash: tx.Hash(),
}
err := p.state.AddEvent(ctx, event, nil)
if err != nil {
log.Errorf("Error adding event: %v", err)
}
// Do not add tx to the pool
return fmt.Errorf("out of counters")
} else if preExecutionResponse.isOOG {
event := &state.Event{
EventType: state.EventType_Prexecution_OOG,
Timestamp: time.Now(),
IP: ip,
TxHash: tx.Hash(),
}

err := p.state.AddEvent(ctx, event, nil)
if err != nil {
log.Errorf("Error adding event: %v", err)
}
}

err := p.state.AddEvent(ctx, event, nil)
if poolTx.IsClaims {
isFreeTx := poolTx.GasPrice().Cmp(big.NewInt(0)) <= 0
if isFreeTx && preExecutionResponse.isReverted {
return fmt.Errorf("free claim reverted")
} else {
depositCount, err := p.extractDepositCountFromClaimTx(poolTx)
if err != nil {
log.Errorf("Error adding event: %v", err)
return err
}
// Do not add tx to the pool
return fmt.Errorf("out of counters")
} else if preexecutionResponse.isOOG {
event := &state.Event{
EventType: state.EventType_Prexecution_OOG,
Timestamp: time.Now(),
IP: ip,
TxHash: tx.Hash(),
exists, err := p.storage.DepositCountExists(ctx, *depositCount)
if err != nil && !errors.Is(err, ErrNotFound) {
return err
}

err := p.state.AddEvent(ctx, event, nil)
if err != nil {
log.Errorf("Error adding event: %v", err)
if exists {
return fmt.Errorf("deposit count already exists")
}

poolTx.DepositCount = depositCount
}
}
poolTx.ZKCounters = preexecutionResponse.usedZkCounters

poolTx.ZKCounters = preExecutionResponse.usedZkCounters

return p.storage.AddTx(ctx, *poolTx)
}

// extractDepositCountFromClaimTx reads the transaction data if this is a
// proper defined claim transaction, extracts the deposit count parameter
// from its data
func (p *Pool) extractDepositCountFromClaimTx(poolTx *Transaction) (*uint64, error) {
data := make([]byte, len(poolTx.Data()))
copy(data, poolTx.Data())

const methodLength = 4
const skipParamsLength = 32 * 32
const depositCountLength = 32
const minimumDataLength = methodLength + skipParamsLength + depositCountLength
if len(data) < minimumDataLength {
return nil, fmt.Errorf("invalid data length")
}

depositCountBytes := data[methodLength+skipParamsLength : methodLength+skipParamsLength+depositCountLength]
depositCountBig := big.NewInt(0).SetBytes(depositCountBytes)
depositCount := depositCountBig.Uint64()
return &depositCount, nil
}

// PreExecuteTx executes a transaction to calculate its zkCounters
func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (preexecutionResponse, error) {
response := preexecutionResponse{usedZkCounters: state.ZKCounters{}, isOOC: false, isOOG: false}
func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (preExecutionResponse, error) {
response := preExecutionResponse{usedZkCounters: state.ZKCounters{}, isOOC: false, isOOG: false, isReverted: false}

processBatchResponse, err := p.state.PreProcessTransaction(ctx, &tx, nil)
if err != nil {
Expand All @@ -139,9 +186,10 @@ func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (preexecu
response.usedZkCounters = processBatchResponse.UsedZkCounters

if processBatchResponse.IsBatchProcessed {
if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 &&
executor.IsROMOutOfGasError(executor.RomErrorCode(processBatchResponse.Responses[0].RomError)) {
response.isOOC = true
if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 {
r := processBatchResponse.Responses[0]
response.isOOC = executor.IsROMOutOfGasError(executor.RomErrorCode(r.RomError))
response.isReverted = errors.Is(r.RomError, runtime.ErrExecutionReverted)
}
} else {
response.isOOG = !processBatchResponse.IsBatchProcessed
Expand Down
Loading

0 comments on commit c04d513

Please sign in to comment.