Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profile only SequenceTransactions [NIT-2578] #2366

Merged
merged 11 commits into from
Jun 5, 2024
44 changes: 44 additions & 0 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ package gethexec
*/
import "C"
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path"
"runtime/pprof"
"runtime/trace"
"sync"
"testing"
"time"
Expand All @@ -27,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/arbos"
"github.com/offchainlabs/nitro/arbos/arbosState"
"github.com/offchainlabs/nitro/arbos/arbostypes"
Expand Down Expand Up @@ -334,6 +340,44 @@ func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMess
})
}

// SequenceTransactionsWithProfiling runs SequenceTransactions with tracing and
// CPU profiling enabled. If the block creation takes longer than 2 seconds, it
// keeps both and prints out filenames in an error log line.
func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
log.Error("Starting CPU profiling", "error", err)
}
if err := trace.Start(traceBuf); err != nil {
log.Error("Starting tracing", "error", err)
}
start := time.Now()
res, err := s.SequenceTransactions(header, txes, hooks)
elapsed := time.Since(start)
pprof.StopCPUProfile()
trace.Stop()
if elapsed > 2*time.Second {
writeAndLog(pprofBuf, traceBuf)
return res, err
}
return res, err
}

func writeAndLog(pprof, trace *bytes.Buffer) {
id := uuid.NewString()
pprofFile := path.Join(os.TempDir(), id+".pprof")
if err := os.WriteFile(pprofFile, pprof.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for pprof", "fileName", pprofFile, "error", err)
return
}
traceFile := path.Join(os.TempDir(), id+".trace")
if err := os.WriteFile(traceFile, trace.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for trace", "fileName", traceFile, "error", err)
return
}
log.Info("Transactions sequencing took longer than 2 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile)
}

func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
lastBlockHeader, err := s.getCurrentHeader()
if err != nil {
Expand Down
86 changes: 22 additions & 64 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,18 @@
package gethexec

import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/big"
"os"
"path"
"runtime/debug"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/util/arbmath"
Expand Down Expand Up @@ -82,7 +76,7 @@ type SequencerConfig struct {
NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"`
ExpectedSurplusSoftThreshold string `koanf:"expected-surplus-soft-threshold" reload:"hot"`
ExpectedSurplusHardThreshold string `koanf:"expected-surplus-hard-threshold" reload:"hot"`
EnableProfiling bool `koanf:"enable-profiling"`
EnableProfiling bool `koanf:"enable-profiling" reload:"hot"`
expectedSurplusSoftThreshold int
expectedSurplusHardThreshold int
}
Expand Down Expand Up @@ -341,7 +335,6 @@ type Sequencer struct {
expectedSurplusMutex sync.RWMutex
expectedSurplus int64
expectedSurplusUpdated bool
enableProfiling bool
}

func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) {
Expand All @@ -368,7 +361,6 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead
l1Timestamp: 0,
pauseChan: nil,
onForwarderSet: make(chan struct{}, 1),
enableProfiling: config.EnableProfiling,
}
s.nonceFailures = &nonceFailureCache{
containers.NewLruCacheWithOnEvict(config.NonceCacheSize, s.onNonceFailureEvict),
Expand Down Expand Up @@ -416,11 +408,12 @@ func ctxWithTimeout(ctx context.Context, timeout time.Duration) (context.Context
}

func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
config := s.config()
// Only try to acquire Rlock and check for hard threshold if l1reader is not nil
// And hard threshold was enabled, this prevents spamming of read locks when not needed
if s.l1Reader != nil && s.config().ExpectedSurplusHardThreshold != "default" {
if s.l1Reader != nil && config.ExpectedSurplusHardThreshold != "default" {
s.expectedSurplusMutex.RLock()
if s.expectedSurplusUpdated && s.expectedSurplus < int64(s.config().expectedSurplusHardThreshold) {
if s.expectedSurplusUpdated && s.expectedSurplus < int64(config.expectedSurplusHardThreshold) {
return errors.New("currently not accepting transactions due to expected surplus being below threshold")
}
s.expectedSurplusMutex.RUnlock()
Expand Down Expand Up @@ -459,7 +452,7 @@ func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Tran
return err
}

queueTimeout := s.config().QueueTimeout
queueTimeout := config.QueueTimeout
queueCtx, cancelFunc := ctxWithTimeout(parentCtx, queueTimeout)
defer cancelFunc()

Expand Down Expand Up @@ -787,44 +780,6 @@ func (s *Sequencer) precheckNonces(queueItems []txQueueItem, totalBlockSize int)
return outputQueueItems
}

// createBlockWithProfiling runs create block with tracing and CPU profiling
// enabled. If the block creation takes longer than 5 seconds, it keeps both
// and prints out filenames in an error log line.
func (s *Sequencer) createBlockWithProfiling(ctx context.Context) bool {
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
log.Error("Starting CPU profiling", "error", err)
}
if err := trace.Start(traceBuf); err != nil {
log.Error("Starting tracing", "error", err)
}
start := time.Now()
res := s.createBlock(ctx)
elapsed := time.Since(start)
pprof.StopCPUProfile()
trace.Stop()
if elapsed > 2*time.Second {
writeAndLog(pprofBuf, traceBuf)
return res
}
return res
}

func writeAndLog(pprof, trace *bytes.Buffer) {
id := uuid.NewString()
pprofFile := path.Join(os.TempDir(), id+".pprof")
if err := os.WriteFile(pprofFile, pprof.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for pprof", "fileName", pprofFile, "error", err)
return
}
traceFile := path.Join(os.TempDir(), id+".trace")
if err := os.WriteFile(traceFile, trace.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for trace", "fileName", traceFile, "error", err)
return
}
log.Debug("Block creation took longer than 5 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile)
}

func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
var queueItems []txQueueItem
var totalBlockSize int
Expand Down Expand Up @@ -972,7 +927,15 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
}

start := time.Now()
block, err := s.execEngine.SequenceTransactions(header, txes, hooks)
var (
block *types.Block
err error
)
if config.EnableProfiling {
block, err = s.execEngine.SequenceTransactionsWithProfiling(header, txes, hooks)
} else {
block, err = s.execEngine.SequenceTransactions(header, txes, hooks)
}
elapsed := time.Since(start)
blockCreationTimer.Update(elapsed)
if elapsed >= time.Second*5 {
Expand Down Expand Up @@ -1104,16 +1067,17 @@ func (s *Sequencer) updateExpectedSurplus(ctx context.Context) (int64, error) {
unusedL1GasChargeGauge.Update(backlogL1GasCharged)
currentSurplusGauge.Update(surplus)
expectedSurplusGauge.Update(expectedSurplus)
if s.config().ExpectedSurplusSoftThreshold != "default" && expectedSurplus < int64(s.config().expectedSurplusSoftThreshold) {
log.Warn("expected surplus is below soft threshold", "value", expectedSurplus, "threshold", s.config().expectedSurplusSoftThreshold)
config := s.config()
if config.ExpectedSurplusSoftThreshold != "default" && expectedSurplus < int64(config.expectedSurplusSoftThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like an example when we need ExpectedSurplusSoftThreshold and expectedSurplusSoftThreshold read from the same config

log.Warn("expected surplus is below soft threshold", "value", expectedSurplus, "threshold", config.expectedSurplusSoftThreshold)
}
return expectedSurplus, nil
}

func (s *Sequencer) Start(ctxIn context.Context) error {
s.StopWaiter.Start(ctxIn, s)

if (s.config().ExpectedSurplusHardThreshold != "default" || s.config().ExpectedSurplusSoftThreshold != "default") && s.l1Reader == nil {
config := s.config()
if (config.ExpectedSurplusHardThreshold != "default" || config.ExpectedSurplusSoftThreshold != "default") && s.l1Reader == nil {
return errors.New("expected surplus soft/hard thresholds are enabled but l1Reader is nil")
}

Expand All @@ -1125,7 +1089,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error {

expectedSurplus, err := s.updateExpectedSurplus(ctxIn)
if err != nil {
if s.config().ExpectedSurplusHardThreshold != "default" {
if config.ExpectedSurplusHardThreshold != "default" {
return fmt.Errorf("expected-surplus-hard-threshold is enabled but error fetching initial expected surplus value: %w", err)
}
log.Error("expected-surplus-soft-threshold is enabled but error fetching initial expected surplus value", "err", err)
Expand Down Expand Up @@ -1167,14 +1131,8 @@ func (s *Sequencer) Start(ctxIn context.Context) error {
}

s.CallIteratively(func(ctx context.Context) time.Duration {
nextBlock := time.Now().Add(s.config().MaxBlockSpeed)
var madeBlock bool
if s.enableProfiling {
madeBlock = s.createBlockWithProfiling(ctx)
} else {
madeBlock = s.createBlock(ctx)
}
if madeBlock {
nextBlock := time.Now().Add(config.MaxBlockSpeed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we need to get the config from the fetcher, it's called iteratively in a separate goroutine and we still want to hot-reload it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Thanks!

(at this point I'm wondering how much overhead would fetching the config have if we were to never cache into a variable, since I can see how easy would it be to introduce a bug like this).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm... maybe the overhead is not that bad, but I'd still use the same config pointer for options that are e.g. thresholds and it might be good to keep them in sync (so as two or more of them are guaranteed to be taken from the same config object, without risk of reload race) - not sure if there are such sensitive options (?)
What do you think?

if s.createBlock(ctx) {
// Note: this may return a negative duration, but timers are fine with that (they treat negative durations as 0).
return time.Until(nextBlock)
}
Expand Down
Loading