Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profile only SequenceTransactions [NIT-2578] #2366

Merged
merged 11 commits into from
Jun 5, 2024
44 changes: 44 additions & 0 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ package gethexec
*/
import "C"
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path"
"runtime/pprof"
"runtime/trace"
"sync"
"testing"
"time"
Expand All @@ -27,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/arbos"
"github.com/offchainlabs/nitro/arbos/arbosState"
"github.com/offchainlabs/nitro/arbos/arbostypes"
Expand Down Expand Up @@ -334,6 +340,44 @@ func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMess
})
}

// SequenceTransactionsWithProfiling runs SequenceTransactions with tracing and
// CPU profiling enabled. If the block creation takes longer than 2 seconds, it
// keeps both and prints out filenames in an error log line.
func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
log.Error("Starting CPU profiling", "error", err)
}
if err := trace.Start(traceBuf); err != nil {
log.Error("Starting tracing", "error", err)
}
start := time.Now()
res, err := s.SequenceTransactions(header, txes, hooks)
elapsed := time.Since(start)
pprof.StopCPUProfile()
trace.Stop()
if elapsed > 2*time.Second {
writeAndLog(pprofBuf, traceBuf)
return res, err
}
return res, err
}

func writeAndLog(pprof, trace *bytes.Buffer) {
id := uuid.NewString()
pprofFile := path.Join(os.TempDir(), id+".pprof")
if err := os.WriteFile(pprofFile, pprof.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for pprof", "fileName", pprofFile, "error", err)
return
}
traceFile := path.Join(os.TempDir(), id+".trace")
if err := os.WriteFile(traceFile, trace.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for trace", "fileName", traceFile, "error", err)
return
}
log.Info("Transactions sequencing took longer than 2 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile)
}

func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
lastBlockHeader, err := s.getCurrentHeader()
if err != nil {
Expand Down
66 changes: 11 additions & 55 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,18 @@
package gethexec

import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/big"
"os"
"path"
"runtime/debug"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/util/arbmath"
Expand Down Expand Up @@ -82,7 +76,7 @@ type SequencerConfig struct {
NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"`
ExpectedSurplusSoftThreshold string `koanf:"expected-surplus-soft-threshold" reload:"hot"`
ExpectedSurplusHardThreshold string `koanf:"expected-surplus-hard-threshold" reload:"hot"`
EnableProfiling bool `koanf:"enable-profiling"`
EnableProfiling bool `koanf:"enable-profiling" reload:"hot"`
expectedSurplusSoftThreshold int
expectedSurplusHardThreshold int
}
Expand Down Expand Up @@ -341,7 +335,6 @@ type Sequencer struct {
expectedSurplusMutex sync.RWMutex
expectedSurplus int64
expectedSurplusUpdated bool
enableProfiling bool
}

func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) {
Expand All @@ -368,7 +361,6 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead
l1Timestamp: 0,
pauseChan: nil,
onForwarderSet: make(chan struct{}, 1),
enableProfiling: config.EnableProfiling,
}
s.nonceFailures = &nonceFailureCache{
containers.NewLruCacheWithOnEvict(config.NonceCacheSize, s.onNonceFailureEvict),
Expand Down Expand Up @@ -787,44 +779,6 @@ func (s *Sequencer) precheckNonces(queueItems []txQueueItem, totalBlockSize int)
return outputQueueItems
}

// createBlockWithProfiling runs create block with tracing and CPU profiling
// enabled. If the block creation takes longer than 5 seconds, it keeps both
// and prints out filenames in an error log line.
func (s *Sequencer) createBlockWithProfiling(ctx context.Context) bool {
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
log.Error("Starting CPU profiling", "error", err)
}
if err := trace.Start(traceBuf); err != nil {
log.Error("Starting tracing", "error", err)
}
start := time.Now()
res := s.createBlock(ctx)
elapsed := time.Since(start)
pprof.StopCPUProfile()
trace.Stop()
if elapsed > 2*time.Second {
writeAndLog(pprofBuf, traceBuf)
return res
}
return res
}

func writeAndLog(pprof, trace *bytes.Buffer) {
id := uuid.NewString()
pprofFile := path.Join(os.TempDir(), id+".pprof")
if err := os.WriteFile(pprofFile, pprof.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for pprof", "fileName", pprofFile, "error", err)
return
}
traceFile := path.Join(os.TempDir(), id+".trace")
if err := os.WriteFile(traceFile, trace.Bytes(), 0o600); err != nil {
log.Error("Creating temporary file for trace", "fileName", traceFile, "error", err)
return
}
log.Debug("Block creation took longer than 5 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile)
}

func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
var queueItems []txQueueItem
var totalBlockSize int
Expand Down Expand Up @@ -972,7 +926,15 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
}

start := time.Now()
block, err := s.execEngine.SequenceTransactions(header, txes, hooks)
var (
block *types.Block
err error
)
if s.config().EnableProfiling {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could use a config copy that we already got in this scope to avoid calling the fetcher again

config := s.config()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Changed other methods as well where it's called more than once.

block, err = s.execEngine.SequenceTransactionsWithProfiling(header, txes, hooks)
} else {
block, err = s.execEngine.SequenceTransactions(header, txes, hooks)
}
elapsed := time.Since(start)
blockCreationTimer.Update(elapsed)
if elapsed >= time.Second*5 {
Expand Down Expand Up @@ -1168,13 +1130,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error {

s.CallIteratively(func(ctx context.Context) time.Duration {
nextBlock := time.Now().Add(s.config().MaxBlockSpeed)
var madeBlock bool
if s.enableProfiling {
madeBlock = s.createBlockWithProfiling(ctx)
} else {
madeBlock = s.createBlock(ctx)
}
if madeBlock {
if s.createBlock(ctx) {
// Note: this may return a negative duration, but timers are fine with that (they treat negative durations as 0).
return time.Until(nextBlock)
}
Expand Down
Loading