Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Halt finalizer in case of full execution error #1879

Merged
merged 9 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 46 additions & 30 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ func (f *finalizer) listenForClosingSignals(ctx context.Context) {
}
f.nextGERMux.Unlock()
// L2Reorg ch
case l2ReorgEvent := <-f.closingSignalCh.L2ReorgCh:
case <-f.closingSignalCh.L2ReorgCh:
log.Debug("finalizer received L2 reorg event")
f.handlingL2Reorg = true
f.worker.HandleL2Reorg(l2ReorgEvent.TxHashes)
f.halt(ctx, fmt.Errorf("L2 reorg event received"))
return
// Too much time without batches in L1 ch
case <-f.closingSignalCh.SendingToL1TimeoutCh:
Expand All @@ -203,26 +203,23 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
log.Debugf("processing tx: %s", tx.Hash.Hex())
err := f.processTransaction(ctx, tx)
if err != nil {
log.Errorf("failed to process transaction in finalizeBatches, Err: %s", err)
log.Errorf("failed to process transaction in finalizeBatches, Err: %v", err)
}

f.sharedResourcesMux.Unlock()
} else {
if f.isBatchAlmostFull() {
// Wait for all transactions to be stored in the DB
log.Infof("Closing batch: %d, because it's almost full.", f.batch.batchNumber)
// The perfect moment to finalize the batch
f.finalizeBatch(ctx)
} else {
// wait for new txs
log.Debugf("no transactions to be processed. Sleeping for %v", f.cfg.SleepDurationInMs.Duration)
if f.cfg.SleepDurationInMs.Duration > 0 {
time.Sleep(f.cfg.SleepDurationInMs.Duration)
}
// wait for new txs
log.Debugf("no transactions to be processed. Sleeping for %v", f.cfg.SleepDurationInMs.Duration)
if f.cfg.SleepDurationInMs.Duration > 0 {
time.Sleep(f.cfg.SleepDurationInMs.Duration)
}
}

if f.isDeadlineEncountered() || f.isBatchFull() {
if f.isDeadlineEncountered() {
log.Infof("Closing batch: %d, because deadline was encountered.", f.batch.batchNumber)
f.finalizeBatch(ctx)
} else if f.isBatchFull() || f.isBatchAlmostFull() {
log.Infof("Closing batch: %d, because it's almost full.", f.batch.batchNumber)
f.finalizeBatch(ctx)
}

Expand Down Expand Up @@ -256,6 +253,24 @@ func (f *finalizer) finalizeBatch(ctx context.Context) {
}
}

func (f *finalizer) halt(ctx context.Context, err error) {
debugInfo := &state.DebugInfo{
ErrorType: state.DebugInfoErrorType_FINALIZER_HALT,
Timestamp: time.Now(),
Payload: err.Error(),
}
debugInfoErr := f.dbManager.AddDebugInfo(ctx, debugInfo, nil)
if debugInfoErr != nil {
log.Errorf("error storing finalizer halt debug info: %v", debugInfoErr)
}

for {
log.Errorf("fatal error: %s", err)
log.Error("halting the finalizer")
time.Sleep(5 * time.Second) //nolint:gomnd
}
}

// newWIPBatch closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch
func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {
f.sharedResourcesMux.Lock()
Expand All @@ -266,24 +281,23 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {
return nil, errors.New("state root and local exit root must have value to close batch")
}

// Reprocess full batch as sanity check
processBatchResponse, err := f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.stateRoot)
if err != nil || !processBatchResponse.IsBatchProcessed {
log.Info("halting the finalizer because of a reprocessing error")
if err != nil {
f.halt(ctx, fmt.Errorf("failed to reprocess batch, err: %v", err))
} else {
f.halt(ctx, fmt.Errorf("out of counters during reprocessFullBath"))
}
}

// Close the current batch
err = f.closeBatch(ctx)
if err != nil {
return nil, fmt.Errorf("failed to close batch, err: %w", err)
}

// Reprocess full batch to persist the merkle tree
go func() {
processBatchResponse, err := f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.stateRoot)
if err != nil || !processBatchResponse.IsBatchProcessed {
log.Info("restarting the sequencer node because of a reprocessing error")
if err != nil {
log.Fatalf("failed to reprocess batch, err: %v", err)
} else {
log.Fatal("Out of counters during reprocessFullBath")
}
}
}()

// Metadata for the next batch
stateRoot := f.batch.stateRoot
lastBatchNumber := f.batch.batchNumber
Expand Down Expand Up @@ -691,7 +705,7 @@ func (f *finalizer) openBatch(ctx context.Context, num uint64, ger common.Hash,
func (f *finalizer) reprocessFullBatch(ctx context.Context, batchNum uint64, expectedStateRoot common.Hash) (*state.ProcessBatchResponse, error) {
batch, err := f.dbManager.GetBatchByNumber(ctx, batchNum, nil)
if err != nil {
return nil, fmt.Errorf("failed to get batch by number, err: %w", err)
return nil, fmt.Errorf("failed to get batch by number, err: %v", err)
}
processRequest := state.ProcessRequest{
BatchNumber: batch.BatchNumber,
Expand All @@ -705,7 +719,8 @@ func (f *finalizer) reprocessFullBatch(ctx context.Context, batchNum uint64, exp
log.Infof("reprocessFullBatch: BatchNumber: %d, OldStateRoot: %s, Ger: %s", batch.BatchNumber, f.batch.initialStateRoot.String(), batch.GlobalExitRoot.String())
txs, _, err := state.DecodeTxs(batch.BatchL2Data)
if err != nil {
log.Error("reprocessFullBatch: error decoding BatchL2Data before reprocessing full batch: %d. Error: %v", batch.BatchNumber, err)
log.Errorf("reprocessFullBatch: error decoding BatchL2Data before reprocessing full batch: %d. Error: %v", batch.BatchNumber, err)
return nil, fmt.Errorf("reprocessFullBatch: error decoding BatchL2Data before reprocessing full batch: %d. Error: %v", batch.BatchNumber, err)
}
for i, tx := range txs {
log.Infof("reprocessFullBatch: Tx position %d. TxHash: %s", i, tx.Hash())
Expand Down Expand Up @@ -739,6 +754,7 @@ func (f *finalizer) reprocessFullBatch(ctx context.Context, batchNum uint64, exp

if result.NewStateRoot != expectedStateRoot {
log.Errorf("batchNumber: %d, reprocessed batch has different state root, expected: %s, got: %s", batch.BatchNumber, expectedStateRoot.Hex(), result.NewStateRoot.Hex())
return nil, fmt.Errorf("batchNumber: %d, reprocessed batch has different state root, expected: %s, got: %s", batch.BatchNumber, expectedStateRoot.Hex(), result.NewStateRoot.Hex())
}

return result, nil
Expand Down
16 changes: 8 additions & 8 deletions sequencer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,14 @@ func (w *Worker) UpdateTx(txHash common.Hash, addr common.Address, counters stat
w.workerMutex.Lock()
defer w.workerMutex.Unlock()
log.Infof("UpdateTx tx(%s) addr(%s)", txHash.String(), addr.String())
log.Debugf("UpdateTx counters.CumulativeGasUsed: %s", counters.CumulativeGasUsed)
log.Debugf("UpdateTx counters.UsedKeccakHashes: %s", counters.UsedKeccakHashes)
log.Debugf("UpdateTx counters.UsedPoseidonHashes: %s", counters.UsedPoseidonHashes)
log.Debugf("UpdateTx counters.UsedPoseidonPaddings: %s", counters.UsedPoseidonPaddings)
log.Debugf("UpdateTx counters.UsedMemAligns: %s", counters.UsedMemAligns)
log.Debugf("UpdateTx counters.UsedArithmetics: %s", counters.UsedArithmetics)
log.Debugf("UpdateTx counters.UsedBinaries: %s", counters.UsedBinaries)
log.Debugf("UpdateTx counters.UsedSteps: %s", counters.UsedSteps)
log.Debugf("UpdateTx counters.CumulativeGasUsed: %d", counters.CumulativeGasUsed)
log.Debugf("UpdateTx counters.UsedKeccakHashes: %d", counters.UsedKeccakHashes)
log.Debugf("UpdateTx counters.UsedPoseidonHashes: %d", counters.UsedPoseidonHashes)
log.Debugf("UpdateTx counters.UsedPoseidonPaddings: %d", counters.UsedPoseidonPaddings)
log.Debugf("UpdateTx counters.UsedMemAligns: %d", counters.UsedMemAligns)
log.Debugf("UpdateTx counters.UsedArithmetics: %d", counters.UsedArithmetics)
log.Debugf("UpdateTx counters.UsedBinaries: %d", counters.UsedBinaries)
log.Debugf("UpdateTx counters.UsedSteps: %d", counters.UsedSteps)

addrQueue, found := w.pool[addr.String()]

Expand Down
32 changes: 2 additions & 30 deletions state/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,36 +65,8 @@ func EncodeTransactions(txs []types.Transaction) ([]byte, error) {

// EncodeTransaction RLP encodes the given transaction
func EncodeTransaction(tx types.Transaction) ([]byte, error) {
v, r, s := tx.RawSignatureValues()
sign := 1 - (v.Uint64() & 1)

nonce, gasPrice, gas, to, value, data, chainID := tx.Nonce(), tx.GasPrice(), tx.Gas(), tx.To(), tx.Value(), tx.Data(), tx.ChainId()
log.Debug(nonce, " ", gasPrice, " ", gas, " ", to, " ", value, " ", len(data), " ", chainID)

txCodedRlp, err := rlp.EncodeToBytes([]interface{}{
nonce,
gasPrice,
gas,
to,
value,
data,
chainID, uint(0), uint(0),
})

if err != nil {
return nil, err
}

newV := new(big.Int).Add(big.NewInt(ether155V), big.NewInt(int64(sign)))
newRPadded := fmt.Sprintf("%064s", r.Text(hex.Base))
newSPadded := fmt.Sprintf("%064s", s.Text(hex.Base))
newVPadded := fmt.Sprintf("%02s", newV.Text(hex.Base))
txData, err := hex.DecodeString(hex.EncodeToString(txCodedRlp) + newRPadded + newSPadded + newVPadded)
if err != nil {
return nil, err
}

return txData, nil
transactions := []types.Transaction{tx}
return EncodeTransactions(transactions)
}

// EncodeUnsignedTransaction RLP encodes the given unsigned transaction
Expand Down
2 changes: 2 additions & 0 deletions state/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ const (
DebugInfoErrorType_OOC_ERROR_ON_REPROCESS_FULL_BATCH = "OOC ON REPROCESS FULL BATCH"
// DebugInfoErrorType_EXECUTOR_RLP_ERROR indicates a error happened decoding the RLP returned by the executor
DebugInfoErrorType_EXECUTOR_RLP_ERROR = "EXECUTOR RLP ERROR"
// DebugInfoErrorType_FINALIZER_HALT indicates a fatal error happened in the finalizer when trying to close a batch
DebugInfoErrorType_FINALIZER_HALT = "FINALIZER HALT"
)

// DebugInfo allows handling runtime debug info
Expand Down
1 change: 1 addition & 0 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func (s *ClientSynchronizer) checkTrustedState(batch state.Batch, tBatch *state.
if reorgReasons.Len() > 0 {
reason := reorgReasons.String()
log.Warnf("Trusted Reorg detected for Batch Number: %d. Reasons: %s", tBatch.BatchNumber, reason)
log.Fatal("TRUSTED REORG DETECTED! Batch: ", batch.BatchNumber)
// Store trusted reorg register
tr := state.TrustedReorg{
BatchNumber: tBatch.BatchNumber,
Expand Down
Loading