From 201ea32a752f17edf2706a54bcbf10b1d9e967bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Mon, 9 Jan 2023 16:01:21 +0100 Subject: [PATCH] DBManager WIP (#1512) * WIP * WIP * WIP --- pool/pgpoolstorage/pgpoolstorage.go | 9 + pool/pool.go | 4 + pool/transaction.go | 4 +- sequencer/dbmanager.go | 276 ++++++++++++++++++++++------ sequencer/finalizer.go | 113 ++++++------ sequencer/interfaces.go | 40 +++- sequencer/sequencer.go | 37 +++- sequencer/worker.go | 12 +- state/batch.go | 1 + state/pgstatestorage.go | 21 +++ state/state.go | 84 +++++++-- 11 files changed, 452 insertions(+), 149 deletions(-) diff --git a/pool/pgpoolstorage/pgpoolstorage.go b/pool/pgpoolstorage/pgpoolstorage.go index c5dc8ef332..bd92a3683f 100644 --- a/pool/pgpoolstorage/pgpoolstorage.go +++ b/pool/pgpoolstorage/pgpoolstorage.go @@ -533,3 +533,12 @@ func scanTx(rows pgx.Rows) (*pool.Transaction, error) { return tx, nil } + +// DeleteTransactionByHash deletes tx by its hash +func (p *PostgresPoolStorage) DeleteTransactionByHash(ctx context.Context, hash common.Hash) error { + query := "DELETE FROM pool.txs WHERE hash = $1" + if _, err := p.db.Exec(ctx, query, hash); err != nil { + return err + } + return nil +} diff --git a/pool/pool.go b/pool/pool.go index 062e0ebc7c..6e658d5996 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -240,6 +240,8 @@ func (p *Pool) checkTxFieldCompatibilityWithExecutor(ctx context.Context, tx typ // MarkReorgedTxsAsPending updated reorged txs status from selected to pending func (p *Pool) MarkReorgedTxsAsPending(ctx context.Context) error { + // TODO: Change status to "reorged" + // get selected transactions from pool selectedTxs, err := p.GetSelectedTxs(ctx, 0) if err != nil { @@ -266,3 +268,5 @@ func (p *Pool) MarkReorgedTxsAsPending(ctx context.Context) error { return nil } + +// TODO: Create a method for the synchronizer to update Tx Statuses to "pending" or "reorged" diff --git a/pool/transaction.go b/pool/transaction.go index a4408596c5..8a76ac33b8 100644 --- a/pool/transaction.go +++ b/pool/transaction.go @@ -15,10 +15,12 @@ const ( TxStatusPending TxStatus = "pending" // TxStatusInvalid represents an invalid tx TxStatusInvalid TxStatus = "invalid" - // TxStatusSelected represents a tx that has been selected + // TxStatusSelected represents a tx that has been selected TxStatusSelected TxStatus = "selected" // TxStatusFailed represents a tx that has been failed after processing, but can be processed in the future TxStatusFailed TxStatus = "failed" + // TxStatusWIP represents a tx that is in a sequencer worker memory + TxStatusWIP TxStatus = "wip" ) // TxStatus represents the state of a tx diff --git a/sequencer/dbmanager.go b/sequencer/dbmanager.go index 66a323841d..7b69c3065b 100644 --- a/sequencer/dbmanager.go +++ b/sequencer/dbmanager.go @@ -2,33 +2,40 @@ package sequencer import ( "context" - "errors" + "sync" "time" "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/pool" + "github.com/0xPolygonHermez/zkevm-node/pool/pgpoolstorage" "github.com/0xPolygonHermez/zkevm-node/state" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/jackc/pgx/v4" ) // Pool Loader and DB Updater type dbManager struct { - txPool txPool - state stateInterface - worker workerInterface + txPool txPool + state dbManagerStateInterface + worker workerInterface + txsToStoreCh chan *txToStore + wgTxsToStore *sync.WaitGroup + l2ReorgCh chan L2ReorgEvent + ctx context.Context } -func newDBManager(txPool txPool, state stateInterface, worker *Worker) *dbManager { - return &dbManager{txPool: txPool, state: state, worker: worker} +func newDBManager(ctx context.Context, txPool txPool, state dbManagerStateInterface, worker *Worker, closingSignalCh ClosingSignalCh, txsStore TxsStore) *dbManager { + return &dbManager{ctx: ctx, txPool: txPool, state: state, worker: worker, txsToStoreCh: txsStore.Ch, wgTxsToStore: txsStore.Wg, l2ReorgCh: closingSignalCh.L2ReorgCh} } func (d *dbManager) Start() { go d.loadFromPool() + go d.StoreProcessedTxAndDeleteFromPool() } func (d *dbManager) GetLastBatchNumber(ctx context.Context) (uint64, error) { - // TODO: Fetch last BatchNumber from database - return 0, errors.New("") + return d.state.GetLastBatchNumber(ctx, nil) } func (d *dbManager) OpenBatch(ctx context.Context, processingContext state.ProcessingContext, dbTx pgx.Tx) error { @@ -37,6 +44,7 @@ func (d *dbManager) OpenBatch(ctx context.Context, processingContext state.Proce } func (d *dbManager) CreateFirstBatch(ctx context.Context, sequencerAddress common.Address) state.ProcessingContext { + // TODO: Retry in case of error processingCtx := state.ProcessingContext{ BatchNumber: 1, Coinbase: sequencerAddress, @@ -45,96 +53,204 @@ func (d *dbManager) CreateFirstBatch(ctx context.Context, sequencerAddress commo } dbTx, err := d.state.BeginStateTransaction(ctx) if err != nil { - log.Fatalf("failed to begin state transaction for opening a batch, err: %v", err) + log.Errorf("failed to begin state transaction for opening a batch, err: %v", err) + return processingCtx } err = d.state.OpenBatch(ctx, processingCtx, dbTx) if err != nil { if rollbackErr := dbTx.Rollback(ctx); rollbackErr != nil { - log.Fatalf( + log.Errorf( "failed to rollback dbTx when opening batch that gave err: %v. Rollback err: %v", rollbackErr, err, ) } - log.Fatalf("failed to open a batch, err: %v", err) + log.Errorf("failed to open a batch, err: %v", err) + return processingCtx } if err := dbTx.Commit(ctx); err != nil { - log.Fatalf("failed to commit dbTx when opening batch, err: %v", err) + log.Errorf("failed to commit dbTx when opening batch, err: %v", err) + return processingCtx } return processingCtx } func (d *dbManager) loadFromPool() { - // TODO: Endless loop that keeps loading tx from the DB into the worker + + ctx := context.Background() + + for { + // TODO: Define how to do this + time.Sleep(5 * time.Second) + + poolTransactions, err := d.txPool.GetPendingTxs(ctx, false, 0) + + if err != nil && err != pgpoolstorage.ErrNotFound { + log.Errorf("loadFromPool: %v", err) + continue + } + + poolClaims, err := d.txPool.GetPendingTxs(ctx, true, 0) + + if err != nil && err != pgpoolstorage.ErrNotFound { + log.Errorf("loadFromPool: %v", err) + continue + } + + poolTransactions = append(poolTransactions, poolClaims...) + + for _, tx := range poolTransactions { + if err != nil { + log.Errorf("loadFromPool error getting tx sender: %v", err) + continue + } + + txTracker := TxTracker{ + Hash: tx.Hash(), + // TODO: Complete + } + d.worker.AddTx(txTracker) + d.txPool.UpdateTxStatus(ctx, tx.Hash(), pool.TxStatusWIP) + } + } } func (d *dbManager) BeginStateTransaction(ctx context.Context) (pgx.Tx, error) { - tx, err := d.state.Begin(ctx) - if err != nil { - return nil, err - } - return tx, nil + return d.BeginStateTransaction(ctx) } func (d *dbManager) StoreProcessedTransaction(ctx context.Context, batchNumber uint64, processedTx *state.ProcessTransactionResponse, dbTx pgx.Tx) error { - // TODO: Implement store of transaction and adding it to the batch - return errors.New("") + return d.state.StoreTransaction(ctx, batchNumber, processedTx, dbTx) } -func (d *dbManager) DeleteTxFromPool(ctx context.Context, txHash common.Hash, dbTx pgx.Tx) error { - // TODO: Delete transaction from Pool DB - return errors.New("") +func (d *dbManager) DeleteTransactionFromPool(ctx context.Context, txHash common.Hash) error { + return d.txPool.DeleteTransactionByHash(ctx, txHash) } -func (d *dbManager) StoreProcessedTxAndDeleteFromPool(ctx context.Context, batchNumber uint64, processedTx *state.ProcessTransactionResponse) { - for { // TODO: Finish the retry mechanism - dbTx, err := d.BeginStateTransaction(ctx) +func (d *dbManager) StoreProcessedTxAndDeleteFromPool() { + // TODO: Finish the retry mechanism and error handling + for { + txToStore := <-d.txsToStoreCh + + dbTx, err := d.BeginStateTransaction(d.ctx) if err != nil { - // TODO: handle + log.Errorf("StoreProcessedTxAndDeleteFromPool :%v", err) } - err = d.StoreProcessedTransaction(ctx, batchNumber, processedTx, dbTx) + err = d.StoreProcessedTransaction(d.ctx, txToStore.batchNumber, txToStore.txResponse, dbTx) if err != nil { - err = dbTx.Rollback(ctx) + err = dbTx.Rollback(d.ctx) if err != nil { - // TODO: handle + log.Errorf("StoreProcessedTxAndDeleteFromPool :%v", err) } } - err = d.DeleteTxFromPool(ctx, processedTx.TxHash, dbTx) + + // Check if the Tx is still valid in the state to detect reorgs + latestL2Block, err := d.state.GetLastL2Block(d.ctx, dbTx) + if latestL2Block.Root() != txToStore.previousL2BlockStateRoot { + log.Info("L2 reorg detected. Old state root: %v New state root: %v", latestL2Block.Root(), txToStore.previousL2BlockStateRoot) + d.l2ReorgCh <- L2ReorgEvent{} + continue + } + + // Change Tx status to selected + d.txPool.UpdateTxStatus(d.ctx, txToStore.txResponse.TxHash, pool.TxStatusSelected) + + err = dbTx.Commit(d.ctx) if err != nil { - err = dbTx.Rollback(ctx) - if err != nil { - // TODO: handle - } + log.Errorf("StoreProcessedTxAndDeleteFromPool error committing: %v", err) } + + d.wgTxsToStore.Done() } } -func (d *dbManager) GetWIPBatch(ctx context.Context) (WipBatch, error) { - // TODO: Make this method to return ready WIP batch it has following cases: - // if lastBatch IS OPEN - load data from it but set WipBatch.initialStateRoot to Last Closed Batch - // if lastBatch IS CLOSED - open new batch in the database and load all data from the closed one without the txs and increase batch number - return WipBatch{}, errors.New("") -} +// GetWIPBatch returns ready WIP batch +// if lastBatch IS OPEN - load data from it but set wipBatch.initialStateRoot to Last Closed Batch +// if lastBatch IS CLOSED - open new batch in the database and load all data from the closed one without the txs and increase batch number +func (d *dbManager) GetWIPBatch(ctx context.Context) (*WipBatch, error) { + lastBatch, err := d.GetLastBatch(ctx) + if err != nil { + return nil, err + } + + wipBatch := &WipBatch{ + batchNumber: lastBatch.BatchNumber, + coinbase: lastBatch.Coinbase, + accInputHash: lastBatch.AccInputHash, + // initialStateRoot: lastBatch.StateRoot, + stateRoot: lastBatch.StateRoot, + timestamp: uint64(lastBatch.Timestamp.Unix()), + globalExitRoot: lastBatch.GlobalExitRoot, + + // TODO: txs + // TODO: remainingResources + } + + isClosed, err := d.IsBatchClosed(ctx, lastBatch.BatchNumber) + if err != nil { + return nil, err + } + + if isClosed { + wipBatch.batchNumber = lastBatch.BatchNumber + 1 + + processingContext := &state.ProcessingContext{ + BatchNumber: wipBatch.batchNumber, + Coinbase: wipBatch.coinbase, + Timestamp: time.Now(), + GlobalExitRoot: wipBatch.globalExitRoot, + } -func (d *dbManager) GetLastClosedBatch(ctx context.Context) (state.Batch, error) { - // TODO: Returns last closed batch - return state.Batch{}, errors.New("") + dbTx, err := d.BeginStateTransaction(ctx) + if err != nil { + return nil, err + } + + err = d.state.OpenBatch(ctx, *processingContext, dbTx) + if err != nil { + if rollbackErr := dbTx.Rollback(ctx); rollbackErr != nil { + log.Errorf( + "failed to rollback dbTx when opening batch that gave err: %v. Rollback err: %v", + rollbackErr, err, + ) + } + log.Errorf("failed to open a batch, err: %v", err) + return nil, err + } + if err := dbTx.Commit(ctx); err != nil { + log.Errorf("failed to commit dbTx when opening batch, err: %v", err) + return nil, err + } + + } else { + lastClosedBatch, err := d.GetLastClosedBatch(ctx) + if err != nil { + return nil, err + } + + wipBatch.stateRoot = lastClosedBatch.StateRoot + } + + return wipBatch, nil } -func (d *dbManager) GetLastBatch(ctx context.Context) (state.Batch, error) { - // TODO: Returns last batch - return state.Batch{}, errors.New("") +func (d *dbManager) GetLastClosedBatch(ctx context.Context) (*state.Batch, error) { + return d.state.GetLastClosedBatch(ctx, nil) +} +func (d *dbManager) GetLastBatch(ctx context.Context) (*state.Batch, error) { + batch, err := d.state.GetLastBatch(ctx) + if err != nil { + return nil, err + } + return batch, nil } func (d *dbManager) IsBatchClosed(ctx context.Context, batchNum uint64) (bool, error) { - // TODO: Returns if the batch with passed batchNum is closed - return false, errors.New("") + return d.state.IsBatchClosed(ctx, batchNum, nil) } func (d *dbManager) GetLastNBatches(ctx context.Context, numBatches uint) ([]*state.Batch, error) { - // TODO: Returns last N batches - return []*state.Batch{}, errors.New("") - + return d.state.GetLastNBatches(ctx, numBatches, nil) } func (d *dbManager) GetLatestGer(ctx context.Context) (state.GlobalExitRoot, time.Time, error) { // TODO: Get implementation from old sequencer's batchbuilder @@ -150,11 +266,57 @@ type ClosingBatchParameters struct { Txs []TxTracker } -func (d *dbManager) CloseBatch(ctx context.Context, params ClosingBatchParameters, dbTx pgx.Tx) { - // TODO: Close current open batch +func (d *dbManager) CloseBatch(ctx context.Context, params ClosingBatchParameters) error { + + // TODO: Create new type txManagerArray and refactor CloseBatch method in state + + processingReceipt := state.ProcessingReceipt{ + BatchNumber: params.BatchNumber, + StateRoot: params.StateRoot, + LocalExitRoot: params.LocalExitRoot, + AccInputHash: params.AccInputHash, + } + + transactions := make([]types.Transaction, len(params.Txs)) + + for _, tx := range params.Txs { + transaction, err := state.DecodeTx(string(tx.RawTx)) + + if err != nil { + return err + } + + transactions = append(transactions, *transaction) + } + + processingReceipt.Txs = transactions + + dbTx, err := d.BeginStateTransaction(ctx) + if err != nil { + return err + } + + err = d.state.CloseBatch(ctx, processingReceipt, dbTx) + if err != nil { + err2 := dbTx.Rollback(ctx) + if err2 != nil { + log.Errorf("CloseBatch error rolling back: %v", err2) + } + return err + } else { + err := dbTx.Commit(ctx) + if err != nil { + log.Errorf("CloseBatch error committing: %v", err) + return err + } + } + + return nil } -func (d *dbManager) MarkReorgedTxsAsPending(ctx context.Context) error { - // TODO: call pool.MarkReorgedTxsAsPending and return result - return errors.New("") +func (d *dbManager) MarkReorgedTxsAsPending(ctx context.Context) { + err := d.txPool.MarkReorgedTxsAsPending(ctx) + if err != nil { + log.Errorf("error marking reorged txs as pending: %v", err) + } } diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index b9dc91c53d..f943b07de2 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -19,22 +19,18 @@ import ( // finalizer represents the finalizer component of the sequencer. type finalizer struct { - ForcedBatchCh chan state.Batch - GERCh chan common.Hash - L2ReorgCh chan L2ReorgEvent - SendingToL1TimeoutCh chan bool - TxsToStoreCh chan *txToStore - WgTxsToStore *sync.WaitGroup - cfg FinalizerCfg - maxTxsPerBatch uint64 - isSynced func(ctx context.Context) bool - sequencerAddress common.Address - worker workerInterface - dbManager dbManagerInterface - executor stateInterface - batch WipBatch - processRequest state.ProcessSingleTxRequest - sharedResourcesMux *sync.RWMutex + closingSignalCh ClosingSignalCh + txsStore TxsStore + cfg FinalizerCfg + maxTxsPerBatch uint64 + isSynced func(ctx context.Context) bool + sequencerAddress common.Address + worker workerInterface + dbManager dbManagerInterface + executor stateInterface + batch *WipBatch + processRequest state.ProcessSingleTxRequest + sharedResourcesMux *sync.RWMutex // closing signals nextGER common.Hash nextGERDeadline int64 @@ -65,23 +61,19 @@ type WipBatch struct { } // newFinalizer returns a new instance of Finalizer. -func newFinalizer(cfg FinalizerCfg, worker workerInterface, dbManager dbManagerInterface, executor stateInterface, sequencerAddr common.Address, isSynced func(ctx context.Context) bool, maxTxsPerBatch uint64) *finalizer { +func newFinalizer(cfg FinalizerCfg, worker workerInterface, dbManager dbManagerInterface, executor stateInterface, sequencerAddr common.Address, isSynced func(ctx context.Context) bool, maxTxsPerBatch uint64, closingSignalCh ClosingSignalCh, txsStore TxsStore) *finalizer { return &finalizer{ - ForcedBatchCh: make(chan state.Batch), - GERCh: make(chan common.Hash), - L2ReorgCh: make(chan L2ReorgEvent), - SendingToL1TimeoutCh: make(chan bool), - TxsToStoreCh: make(chan *txToStore), - WgTxsToStore: &sync.WaitGroup{}, - cfg: cfg, - isSynced: isSynced, - sequencerAddress: sequencerAddr, - worker: worker, - dbManager: dbManager, - executor: executor, - maxTxsPerBatch: maxTxsPerBatch, - batch: WipBatch{}, - processRequest: state.ProcessSingleTxRequest{}, + closingSignalCh: closingSignalCh, + txsStore: txsStore, + cfg: cfg, + isSynced: isSynced, + sequencerAddress: sequencerAddr, + worker: worker, + dbManager: dbManager, + executor: executor, + maxTxsPerBatch: maxTxsPerBatch, + batch: &WipBatch{}, + processRequest: state.ProcessSingleTxRequest{}, // closing signals nextGER: common.Hash{}, nextGERDeadline: getNextGERDeadline(cfg), @@ -99,7 +91,7 @@ func (f *finalizer) Start(ctx context.Context, batch *WipBatch, OldStateRoot, Ol ) if batch != nil { - f.batch = *batch + f.batch = batch } f.processRequest = state.ProcessSingleTxRequest{ BatchNumber: f.batch.batchNumber, @@ -125,7 +117,7 @@ func (f *finalizer) Start(ctx context.Context, batch *WipBatch, OldStateRoot, Ol } } else { if f.isCurrBatchAboveLimitWindow() { - f.WgTxsToStore.Wait() + f.txsStore.Wg.Wait() f.reopenBatch(ctx) // // go (decide if we need to execute the full batch as a sanity check, DO IT IN PARALLEL) ==> if error: log this txs somewhere and remove them from the pipeline if len(f.nextForcedBatches) > 0 { @@ -225,8 +217,8 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) (succ f.processRequest.OldAccInputHash = result.NewAccInputHash // Store the processed transaction, add it to the batch and update status in the pool atomically - f.WgTxsToStore.Add(1) - f.TxsToStoreCh <- &txToStore{ + f.txsStore.Wg.Add(1) + f.txsStore.Ch <- &txToStore{ batchNumber: f.batch.batchNumber, txResponse: txResponse, previousL2BlockStateRoot: previousL2BlockStateRoot, @@ -263,7 +255,7 @@ func (f *finalizer) checkDeadlines(ctx context.Context) { } // React only if the GER has changed if ger.GlobalExitRoot != f.batch.globalExitRoot { - f.GERCh <- ger.GlobalExitRoot + f.closingSignalCh.GERCh <- ger.GlobalExitRoot } f.nextGERMux.Unlock() } @@ -273,7 +265,7 @@ func (f *finalizer) handleClosingSignals(ctx context.Context, err error) { for { select { // Forced batch ch - case fb := <-f.ForcedBatchCh: + case fb := <-f.closingSignalCh.ForcedBatchCh: f.sharedResourcesMux.Lock() f.nextForcedBatchMux.Lock() f.nextForcedBatches = append(f.nextForcedBatches, fb) @@ -282,7 +274,7 @@ func (f *finalizer) handleClosingSignals(ctx context.Context, err error) { f.nextForcedBatchMux.Unlock() f.sharedResourcesMux.Unlock() // globalExitRoot ch - case ger := <-f.GERCh: + case ger := <-f.closingSignalCh.GERCh: f.sharedResourcesMux.Lock() f.nextGERMux.Lock() f.nextGER = ger @@ -291,7 +283,7 @@ func (f *finalizer) handleClosingSignals(ctx context.Context, err error) { f.nextGERMux.Unlock() f.sharedResourcesMux.Unlock() // L2Reorg ch - case l2ReorgEvent := <-f.L2ReorgCh: + case l2ReorgEvent := <-f.closingSignalCh.L2ReorgCh: f.sharedResourcesMux.Lock() go f.worker.HandleL2Reorg(l2ReorgEvent.TxHashes) // Get current wip batch @@ -373,16 +365,24 @@ func (f *finalizer) isCurrBatchAboveLimitWindow() bool { return false } -func (f *finalizer) backupWIPBatch() WipBatch { - backup := new(WipBatch) - *backup = f.batch +func (f *finalizer) backupWIPBatch() *WipBatch { + backup := &WipBatch{ + batchNumber: f.batch.batchNumber, + coinbase: f.batch.coinbase, + accInputHash: f.batch.accInputHash, + stateRoot: f.batch.stateRoot, + localExitRoot: f.batch.localExitRoot, + timestamp: f.batch.timestamp, + globalExitRoot: f.batch.localExitRoot, + remainingResources: f.batch.remainingResources, + } backup.txs = make([]TxTracker, 0, len(f.batch.txs)) backup.txs = append(backup.txs, f.batch.txs...) - return *backup + return backup } -func (f *finalizer) newWIPBatch(ctx context.Context) (WipBatch, error) { +func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) { var ( dbTx pgx.Tx err error @@ -399,36 +399,39 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (WipBatch, error) { } if f.batch.stateRoot.String() == "" || f.batch.localExitRoot.String() == "" { - return WipBatch{}, errors.New("state root and local exit root must have value to close batch") + return nil, errors.New("state root and local exit root must have value to close batch") } dbTx, err = f.dbManager.BeginStateTransaction(ctx) if err != nil { - return WipBatch{}, fmt.Errorf("failed to begin state transaction to close batch, err: %w", err) + return nil, fmt.Errorf("failed to begin state transaction to close batch, err: %w", err) + } + err = f.closeBatch(ctx, dbTx) + if err != nil { + return nil, fmt.Errorf("failed to close batch, err: %w", err) } - f.closeBatch(ctx, dbTx) return f.openWIPBatch(ctx, dbTx) } -func (f *finalizer) openWIPBatch(ctx context.Context, dbTx pgx.Tx) (WipBatch, error) { +func (f *finalizer) openWIPBatch(ctx context.Context, dbTx pgx.Tx) (*WipBatch, error) { // open next batch gerHash, err := f.getGERHash(ctx, dbTx) if err != nil { - return WipBatch{}, err + return nil, err } _, err = f.openBatch(ctx, gerHash, dbTx) if err != nil { if rollbackErr := dbTx.Rollback(ctx); rollbackErr != nil { - return WipBatch{}, fmt.Errorf( + return nil, fmt.Errorf( "failed to rollback dbTx when getting last batch num that gave err: %s. Rollback err: %s", rollbackErr.Error(), err.Error(), ) } - return WipBatch{}, err + return nil, err } if err := dbTx.Commit(ctx); err != nil { - return WipBatch{}, err + return nil, err } // Check if synchronizer is up-to-date @@ -467,7 +470,7 @@ func (f *finalizer) reopenBatch(ctx context.Context) { } } -func (f *finalizer) closeBatch(ctx context.Context, dbTx pgx.Tx) { +func (f *finalizer) closeBatch(ctx context.Context, dbTx pgx.Tx) error { receipt := ClosingBatchParameters{ BatchNumber: f.batch.batchNumber, AccInputHash: f.processRequest.OldAccInputHash, @@ -475,7 +478,7 @@ func (f *finalizer) closeBatch(ctx context.Context, dbTx pgx.Tx) { LocalExitRoot: f.processRequest.GlobalExitRoot, Txs: f.batch.txs, } - f.dbManager.CloseBatch(ctx, receipt, dbTx) + return f.dbManager.CloseBatch(ctx, receipt) } func (f *finalizer) openBatch(ctx context.Context, gerHash common.Hash, dbTx pgx.Tx) (state.ProcessingContext, error) { diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index 3f98317269..bb3cf8442a 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -6,6 +6,7 @@ import ( "time" ethmanTypes "github.com/0xPolygonHermez/zkevm-node/etherman/types" + "github.com/0xPolygonHermez/zkevm-node/pool" "github.com/0xPolygonHermez/zkevm-node/state" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -17,6 +18,10 @@ import ( // txPool contains the methods required to interact with the tx pool. type txPool interface { DeleteTxsByHashes(ctx context.Context, hashes []common.Hash) error + DeleteTransactionByHash(ctx context.Context, hash common.Hash) error + MarkReorgedTxsAsPending(ctx context.Context) error + GetPendingTxs(ctx context.Context, isClaims bool, limit uint64) ([]pool.Transaction, error) + UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus) } // etherman contains the methods required to interact with ethereum. @@ -30,15 +35,16 @@ type etherman interface { // stateInterface gathers the methods required to interact with the state. type stateInterface interface { - OpenBatch(ctx context.Context, processingContext state.ProcessingContext, dbTx pgx.Tx) error - GetLastVirtualBatchNum(ctx context.Context, dbTx pgx.Tx) (uint64, error) GetTimeForLatestBatchVirtualization(ctx context.Context, dbTx pgx.Tx) (time.Time, error) GetTxsOlderThanNL1Blocks(ctx context.Context, nL1Blocks uint64, dbTx pgx.Tx) ([]common.Hash, error) GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error) GetTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (txs []types.Transaction, err error) + ProcessSingleTx(ctx context.Context, request state.ProcessSingleTxRequest) (state.ProcessBatchResponse, error) + BeginStateTransaction(ctx context.Context) (pgx.Tx, error) + GetLastVirtualBatchNum(ctx context.Context, dbTx pgx.Tx) (uint64, error) IsBatchClosed(ctx context.Context, batchNum uint64, dbTx pgx.Tx) (bool, error) + GetSender(tx types.Transaction) (common.Address, error) Begin(ctx context.Context) (pgx.Tx, error) - BeginStateTransaction(ctx context.Context) (pgx.Tx, error) ProcessSingleTransaction(ctx context.Context, request state.ProcessSingleTxRequest, dbTx pgx.Tx) (*state.ProcessBatchResponse, error) } @@ -50,6 +56,7 @@ type workerInterface interface { GetBestFittingTx(resources BatchResources) *TxTracker UpdateAfterSingleSuccessfulTxExecution(from common.Address, touchedAddresses map[common.Address]*state.TouchedAddress) UpdateTx(txHash common.Hash, from common.Address, ZKCounters state.ZKCounters) + AddTx(tx TxTracker) MoveTxToNotReady(txHash common.Hash, from common.Address, actualNonce *uint64, actualBalance *big.Int) DeleteTx(txHash common.Hash, from common.Address, actualFromNonce *uint64, actualFromBalance *big.Int) HandleL2Reorg(txHashes []common.Hash) @@ -63,14 +70,29 @@ type dbManagerInterface interface { CreateFirstBatch(ctx context.Context, sequencerAddress common.Address) state.ProcessingContext GetLastBatchNumber(ctx context.Context) (uint64, error) StoreProcessedTransaction(ctx context.Context, batchNumber uint64, processedTx *state.ProcessTransactionResponse, dbTx pgx.Tx) error - DeleteTxFromPool(ctx context.Context, txHash common.Hash, dbTx pgx.Tx) error - StoreProcessedTxAndDeleteFromPool(ctx context.Context, batchNumber uint64, response *state.ProcessTransactionResponse) - CloseBatch(ctx context.Context, params ClosingBatchParameters, dbTx pgx.Tx) - GetWIPBatch(ctx context.Context) (WipBatch, error) - GetLastBatch(ctx context.Context) (state.Batch, error) + DeleteTransactionFromPool(ctx context.Context, txHash common.Hash) error + CloseBatch(ctx context.Context, params ClosingBatchParameters) error + GetWIPBatch(ctx context.Context) (*WipBatch, error) + GetLastBatch(ctx context.Context) (*state.Batch, error) GetLastNBatches(ctx context.Context, numBatches uint) ([]*state.Batch, error) - GetLastClosedBatch(ctx context.Context) (state.Batch, error) + GetLastClosedBatch(ctx context.Context) (*state.Batch, error) IsBatchClosed(ctx context.Context, batchNum uint64) (bool, error) + MarkReorgedTxsAsPending(ctx context.Context) + GetLatestGer(ctx context.Context) (state.GlobalExitRoot, time.Time, error) +} + +type dbManagerStateInterface interface { + BeginStateTransaction(ctx context.Context) (pgx.Tx, error) + OpenBatch(ctx context.Context, processingContext state.ProcessingContext, dbTx pgx.Tx) error + GetLastVirtualBatchNum(ctx context.Context, dbTx pgx.Tx) (uint64, error) + GetLastNBatches(ctx context.Context, numBatches uint, dbTx pgx.Tx) ([]*state.Batch, error) + StoreTransaction(ctx context.Context, batchNumber uint64, processedTx *state.ProcessTransactionResponse, dbTx pgx.Tx) error + CloseBatch(ctx context.Context, receipt state.ProcessingReceipt, dbTx pgx.Tx) error + IsBatchClosed(ctx context.Context, batchNum uint64, dbTx pgx.Tx) (bool, error) + GetLastClosedBatch(ctx context.Context, dbTx pgx.Tx) (*state.Batch, error) + GetLastBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error) + GetLastBatch(ctx context.Context) (*state.Batch, error) + GetLastL2Block(ctx context.Context, dbTx pgx.Tx) (*types.Block, error) MarkReorgedTxsAsPending(ctx context.Context) error GetLatestGer(ctx context.Context) (state.GlobalExitRoot, time.Time, error) } diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index 53e03be475..4589de3be1 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/0xPolygonHermez/zkevm-node/log" @@ -12,12 +13,25 @@ import ( "github.com/ethereum/go-ethereum/common" ) +type ClosingSignalCh struct { + ForcedBatchCh chan state.Batch + GERCh chan common.Hash + L2ReorgCh chan L2ReorgEvent + SendingToL1TimeoutCh chan bool +} + +type TxsStore struct { + Ch chan *txToStore + Wg *sync.WaitGroup +} + // Sequencer represents a sequencer type Sequencer struct { cfg Config pool txPool state stateInterface + dbManager dbManagerStateInterface txManager txManager etherman etherman @@ -25,7 +39,7 @@ type Sequencer struct { } // New init sequencer -func New(cfg Config, txPool txPool, state stateInterface, etherman etherman, manager txManager) (*Sequencer, error) { +func New(cfg Config, txPool txPool, state stateInterface, dbManager dbManagerStateInterface, etherman etherman, manager txManager) (*Sequencer, error) { addr, err := etherman.TrustedSequencer() if err != nil { return nil, fmt.Errorf("failed to get trusted sequencer address, err: %v", err) @@ -35,6 +49,7 @@ func New(cfg Config, txPool txPool, state stateInterface, etherman etherman, man cfg: cfg, pool: txPool, state: state, + dbManager: dbManager, etherman: etherman, txManager: manager, address: addr, @@ -49,11 +64,23 @@ func (s *Sequencer) Start(ctx context.Context) { } metrics.Register() + closingSignalCh := ClosingSignalCh{ + ForcedBatchCh: make(chan state.Batch), + GERCh: make(chan common.Hash), + L2ReorgCh: make(chan L2ReorgEvent), + SendingToL1TimeoutCh: make(chan bool), + } + + txsStore := TxsStore{ + Ch: make(chan *txToStore), + Wg: new(sync.WaitGroup), + } + worker := newWorker() - dbManager := newDBManager(s.pool, s.state, worker) + dbManager := newDBManager(ctx, s.pool, s.dbManager, worker, closingSignalCh, txsStore) go dbManager.Start() - finalizer := newFinalizer(s.cfg.Finalizer, worker, dbManager, s.state, s.address, s.isSynced, s.cfg.MaxTxsPerBatch) + finalizer := newFinalizer(s.cfg.Finalizer, worker, dbManager, s.state, s.address, s.isSynced, s.cfg.MaxTxsPerBatch, closingSignalCh, txsStore) currBatch, OldAccInputHash, OldStateRoot := s.bootstrap(ctx, dbManager, finalizer) go finalizer.Start(ctx, currBatch, OldStateRoot, OldAccInputHash) @@ -102,6 +129,10 @@ func (s *Sequencer) bootstrap(ctx context.Context, dbManager *dbManager, finaliz timestamp: uint64(processingCtx.Timestamp.Unix()), txs: make([]TxTracker, 0, s.cfg.MaxTxsPerBatch), } + + if err != nil { + return nil, common.Hash{}, common.Hash{} + } } else { // Check if synchronizer is up-to-date for !s.isSynced(ctx) { diff --git a/sequencer/worker.go b/sequencer/worker.go index cdfb6d25a9..99bff2f128 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -29,6 +29,12 @@ func (w *Worker) AddTx(tx TxTracker) { // // B) There was a tx ready (and it's worst than the new one) => delete from pool and efficiency list, add new one } +func (w *Worker) HandleL2Reorg(txHashes []common.Hash) { + // 1. Delete related txs from w.efficiencyList + // 2. Mark the affected addresses as "reorged" in w.Pool + // 3. Update these addresses (go to MT, update nonce and balance into w.Pool) +} + func (w *Worker) UpdateAfterSingleSuccessfulTxExecution(from common.Address, touchedAddresses map[common.Address]*state.TouchedAddress) { fromNonce, fromBalance := touchedAddresses[from].Nonce, touchedAddresses[from].Balance w.ApplyAddressUpdate(from, fromNonce, fromBalance) @@ -205,9 +211,3 @@ func (w *Worker) GetBestFittingTx(resources BatchResources) *TxTracker { wg.Wait() return tx } - -func (w *Worker) HandleL2Reorg(txHashes []common.Hash) { - // 1. Delete related txs from w.efficiencyList - // 2. Mark the affected addresses as "reorged" in w.Pool - // 3. Update these addresses (go to MT, update nonce and balance into w.Pool) -} diff --git a/state/batch.go b/state/batch.go index 5b9eb8db32..f1cb779a92 100644 --- a/state/batch.go +++ b/state/batch.go @@ -36,6 +36,7 @@ type ProcessingReceipt struct { LocalExitRoot common.Hash AccInputHash common.Hash Txs []types.Transaction + BatchL2Data []byte } // VerifiedBatch represents a VerifiedBatch diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index a28100dd61..a5f2ff9147 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -1966,3 +1966,24 @@ func (p *PostgresStorage) DeleteUngeneratedProofs(ctx context.Context, dbTx pgx. _, err := e.Exec(ctx, deleteUngeneratedProofsSQL) return err } + +// GetLastClosedBatch returns the latest closed batch +func (p *PostgresStorage) GetLastClosedBatch(ctx context.Context, dbTx pgx.Tx) (*Batch, error) { + const getLastClosedBatchSQL = ` + SELECT bt.batch_num, bt.global_exit_root, bt.local_exit_root, bt.acc_input_hash, bt.state_root, bt.timestamp, bt.coinbase, bt.raw_txs_data + FROM state.batch bt + WHERE global_exit_root IS NOT NULL AND state_root IS NOT NULL + ORDER BY bt.batch_num DESC + LIMIT 1;` + + e := p.getExecQuerier(dbTx) + row := e.QueryRow(ctx, getLastClosedBatchSQL) + batch, err := scanBatch(row) + + if errors.Is(err, pgx.ErrNoRows) { + return nil, ErrStateNotSynchronized + } else if err != nil { + return nil, err + } + return &batch, nil +} diff --git a/state/state.go b/state/state.go index ed4188cda9..b48639548f 100644 --- a/state/state.go +++ b/state/state.go @@ -481,14 +481,15 @@ func (s *State) ProcessSingleTransaction(ctx context.Context, request ProcessSin } var result *ProcessBatchResponse if len(request.TxData) > 0 { - tx, err := DecodeTx(string(request.TxData)) + txs, _, err := DecodeTxs(request.TxData) + if err != nil { + return nil, err + } + result, err = convertToProcessBatchResponse(txs, res) + if err != nil { return nil, err } - result, err = convertToProcessBatchResponse([]types.Transaction{*tx}, res) - } - if err != nil { - return nil, err } log.Debugf("ProcessSingleTransaction end") log.Debugf("*******************************************") @@ -712,19 +713,6 @@ func (s *State) closeSynchronizedBatch(ctx context.Context, receipt ProcessingRe return err } - // TODO: Modification done to bypass situation detected during testnet testing - // Further analysis is needed - /* - if len(txs) == 0 { - return ErrClosingBatchWithoutTxs - } - */ - - // batchL2Data, err := EncodeTransactions(txs) - // if err != nil { - // return err - // } - return s.PostgresStorage.closeBatch(ctx, receipt, batchL2Data, dbTx) } @@ -1551,3 +1539,63 @@ type NewL2BlockEvent struct { func (s *State) RegisterNewL2BlockEventHandler(h NewL2BlockEventHandler) { s.newL2BlockEventHandlers = append(s.newL2BlockEventHandlers, h) } + +// StoreTransactions is used by the sequencer to add processed transactions into +// an open batch. +func (s *State) StoreTransaction(ctx context.Context, batchNumber uint64, processedTx *ProcessTransactionResponse, dbTx pgx.Tx) error { + if dbTx == nil { + return ErrDBTxNil + } + + // Check if last batch is closed. Note that it's assumed that only the latest batch can be open + isBatchClosed, err := s.PostgresStorage.IsBatchClosed(ctx, batchNumber, dbTx) + if err != nil { + return err + } + if isBatchClosed { + return ErrBatchAlreadyClosed + } + + processingContext, err := s.GetProcessingContext(ctx, batchNumber, dbTx) + if err != nil { + return err + } + + // if the transaction has an intrinsic invalid tx error it means + // the transaction has not changed the state, so we don't store it + if executor.IsIntrinsicError(executor.ErrorCode(processedTx.Error)) { + return nil + } + + lastL2Block, err := s.GetLastL2Block(ctx, dbTx) + if err != nil { + return err + } + + header := &types.Header{ + Number: new(big.Int).SetUint64(lastL2Block.Number().Uint64() + 1), + ParentHash: lastL2Block.Hash(), + Coinbase: processingContext.Coinbase, + Root: processedTx.StateRoot, + GasUsed: processedTx.GasUsed, + GasLimit: s.cfg.MaxCumulativeGasUsed, + Time: uint64(processingContext.Timestamp.Unix()), + } + transactions := []*types.Transaction{&processedTx.Tx} + + receipt := generateReceipt(header.Number, processedTx) + receipts := []*types.Receipt{receipt} + + // Create block to be able to calculate its hash + block := types.NewBlock(header, transactions, []*types.Header{}, receipts, &trie.StackTrie{}) + block.ReceivedAt = processingContext.Timestamp + + receipt.BlockHash = block.Hash() + + // Store L2 block and its transaction + if err := s.AddL2Block(ctx, batchNumber, block, receipts, dbTx); err != nil { + return err + } + + return nil +}