Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-node: Derivation metrics #3156

Merged
merged 2 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 64 additions & 2 deletions op-node/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"net/http"
"strconv"
"time"

"github.com/ethereum/go-ethereum"
"github.com/prometheus/client_golang/prometheus/collectors"
Expand All @@ -27,14 +28,22 @@ const (
)

type Metrics struct {
Info *prometheus.GaugeVec
Up prometheus.Gauge
Info *prometheus.GaugeVec
Up prometheus.Gauge

RPCServerRequestsTotal *prometheus.CounterVec
RPCServerRequestDurationSeconds *prometheus.HistogramVec
RPCClientRequestsTotal *prometheus.CounterVec
RPCClientRequestDurationSeconds *prometheus.HistogramVec
RPCClientResponsesTotal *prometheus.CounterVec

DerivationIdle prometheus.Gauge
PipelineResetsTotal prometheus.Counter
LastPipelineResetUnix prometheus.Gauge
UnsafePayloadsTotal prometheus.Counter
DerivationErrorsTotal prometheus.Counter
Heads *prometheus.GaugeVec

registry *prometheus.Registry
}

Expand All @@ -60,6 +69,7 @@ func NewMetrics(procName string) *Metrics {
Name: "up",
Help: "1 if the op node has finished starting up",
}),

RPCServerRequestsTotal: promauto.With(registry).NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: RPCServerSubsystem,
Expand Down Expand Up @@ -103,6 +113,40 @@ func NewMetrics(procName string) *Metrics {
"method",
"error",
}),

DerivationIdle: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "derivation_idle",
Help: "1 if the derivation pipeline is idle",
}),
PipelineResetsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "pipeline_resets_total",
Help: "Count of derivation pipeline resets",
}),
LastPipelineResetUnix: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "last_pipeline_reset_unix",
Help: "Timestamp of last pipeline reset",
}),
UnsafePayloadsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "unsafe_payloads_total",
Help: "Count of unsafe payloads received via p2p",
}),
DerivationErrorsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "derivation_errors_total",
Help: "Count of total derivation errors",
}),
Heads: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "heads",
Help: "Gauge representing the different L1/L2 heads",
}, []string{
"type",
}),

registry: registry,
}
}
Expand Down Expand Up @@ -166,6 +210,24 @@ func (m *Metrics) RecordRPCClientResponse(method string, err error) {
m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}

func (m *Metrics) SetDerivationIdle(status bool) {
var val float64
if status {
val = 1
}
m.DerivationIdle.Set(val)
}

func (m *Metrics) SetHead(kind string, num uint64) {
m.Heads.WithLabelValues(kind).Set(float64(num))
}

func (m *Metrics) RecordPipelineReset() {
m.PipelineResetsTotal.Inc()
m.DerivationErrorsTotal.Inc()
m.LastPipelineResetUnix.Set(float64(time.Now().Unix()))
}

// Serve starts the metrics server on the given hostname and port.
// The server will be closed when the passed-in context is cancelled.
func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error {
Expand Down
2 changes: 1 addition & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err
}

n.l2Engine = driver.NewDriver(&cfg.Driver, &cfg.Rollup, source, n.l1Source, n, n.log, snapshotLog)
n.l2Engine = driver.NewDriver(&cfg.Driver, &cfg.Rollup, source, n.l1Source, n, n.log, snapshotLog, n.metrics)

return nil
}
Expand Down
6 changes: 4 additions & 2 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"math/big"

"github.com/ethereum-optimism/optimism/op-node/metrics"

"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/l1"
"github.com/ethereum-optimism/optimism/op-node/l2"
Expand Down Expand Up @@ -56,7 +58,7 @@ type Network interface {
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error
}

func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 *l2.Source, l1 *l1.Source, network Network, log log.Logger, snapshotLog log.Logger) *Driver {
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 *l2.Source, l1 *l1.Source, network Network, log log.Logger, snapshotLog log.Logger, metrics *metrics.Metrics) *Driver {
output := &outputImpl{
Config: cfg,
dl: l1,
Expand All @@ -67,7 +69,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 *l2.Source, l1 *l1.Sour
var state *state
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, func() eth.L1BlockRef { return state.l1Head }, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2)
state = NewState(driverCfg, log, snapshotLog, cfg, l1, l2, output, derivationPipeline, network)
state = NewState(driverCfg, log, snapshotLog, cfg, l1, l2, output, derivationPipeline, network, metrics)
return &Driver{s: state}
}

Expand Down
16 changes: 15 additions & 1 deletion op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
)
Expand Down Expand Up @@ -67,6 +68,7 @@ type state struct {
output outputInterface
network Network // may be nil, network for is optional

metrics *metrics.Metrics
log log.Logger
snapshotLog log.Logger
done chan struct{}
Expand All @@ -77,7 +79,7 @@ type state struct {
// NewState creates a new driver state. State changes take effect though
// the given output, derivation pipeline and network interfaces.
func NewState(driverCfg *Config, log log.Logger, snapshotLog log.Logger, config *rollup.Config, l1Chain L1Chain, l2Chain L2Chain,
output outputInterface, derivationPipeline DerivationPipeline, network Network) *state {
output outputInterface, derivationPipeline DerivationPipeline, network Network, metrics *metrics.Metrics) *state {
return &state{
derivation: derivationPipeline,
idleDerivation: false,
Expand All @@ -91,6 +93,7 @@ func NewState(driverCfg *Config, log log.Logger, snapshotLog log.Logger, config
l2: l2Chain,
output: output,
network: network,
metrics: metrics,
l1Heads: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10),
}
Expand All @@ -105,6 +108,8 @@ func (s *state) Start(ctx context.Context) error {
}
s.l1Head = l1Head
s.l2Head, _ = s.l2.L2BlockRefByNumber(ctx, nil)
s.metrics.SetHead("l1", s.l1Head.Number)
s.metrics.SetHead("l2_unsafe", s.l2Head.Number)

s.derivation.Reset()

Expand Down Expand Up @@ -151,6 +156,7 @@ func (s *state) handleNewL1Block(newL1Head eth.L1BlockRef) {
// This could either be a long L1 extension, or a reorg. Both can be handled the same way.
s.log.Warn("L1 Head signal indicates an L1 re-org", "old_l1_head", s.l1Head, "new_l1_head_parent", newL1Head.ParentHash, "new_l1_head", newL1Head)
}
s.metrics.SetHead("l1", newL1Head.Number)
s.l1Head = newL1Head
}

Expand Down Expand Up @@ -315,6 +321,7 @@ func (s *state) eventLoop() {
cancel()
if err != nil {
s.log.Error("Error creating new L2 block", "err", err)
s.metrics.DerivationErrorsTotal.Inc()
}

// We need to catch up to the next origin as quickly as possible. We can do this by
Expand All @@ -330,6 +337,7 @@ func (s *state) eventLoop() {
s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
s.derivation.AddUnsafePayload(payload)
s.metrics.UnsafePayloadsTotal.Inc()
reqStep()

case newL1Head := <-s.l1Heads:
Expand All @@ -338,6 +346,7 @@ func (s *state) eventLoop() {
s.handleNewL1Block(newL1Head)
reqStep() // a new L1 head may mean we have the data to not get an EOF again.
case <-stepReqCh:
s.metrics.SetDerivationIdle(false)
s.idleDerivation = false
mslipper marked this conversation as resolved.
Show resolved Hide resolved
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Progress().Origin, "onto_closed", s.derivation.Progress().Closed)
stepCtx, cancel := context.WithTimeout(ctx, time.Second*10) // TODO pick a timeout for executing a single step
Expand All @@ -346,16 +355,21 @@ func (s *state) eventLoop() {
if err == io.EOF {
s.log.Debug("Derivation process went idle", "progress", s.derivation.Progress().Origin)
s.idleDerivation = true
s.metrics.SetDerivationIdle(true)
continue
} else if err != nil {
// If the pipeline corrupts, e.g. due to a reorg, simply reset it
s.log.Warn("Derivation pipeline is reset", "err", err)
s.derivation.Reset()
s.metrics.RecordPipelineReset()
} else {
finalized, safe, unsafe := s.derivation.Finalized(), s.derivation.SafeL2Head(), s.derivation.UnsafeL2Head()
// log sync progress when it changes
if s.l2Finalized != finalized || s.l2SafeHead != safe || s.l2Head != unsafe {
s.log.Info("Sync progress", "finalized", finalized, "safe", safe, "unsafe", unsafe)
s.metrics.SetHead("l2_finalized", finalized.Number)
s.metrics.SetHead("l2_safe", safe.Number)
s.metrics.SetHead("l2_unsafe", unsafe.Number)
}
// update the heads
s.l2Finalized = finalized
Expand Down