diff --git a/.circleci/config.yml b/.circleci/config.yml index 3d4b389adeb0..53cbdc5a70e4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1859,7 +1859,6 @@ workflows: name: op-conductor-docker-build docker_name: op-conductor docker_tags: <>,<> - requires: ['op-stack-go-docker-build'] platforms: "linux/amd64,linux/arm64" publish: true context: diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index d7e9073b04b0..0617718efbe8 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -18,6 +18,7 @@ import ( "github.com/ethereum-optimism/optimism/op-conductor/client" "github.com/ethereum-optimism/optimism/op-conductor/consensus" "github.com/ethereum-optimism/optimism/op-conductor/health" + "github.com/ethereum-optimism/optimism/op-conductor/metrics" conductorrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc" opp2p "github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/rollup/driver" @@ -25,6 +26,8 @@ import ( opclient "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/httputil" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" "github.com/ethereum-optimism/optimism/op-service/sources" ) @@ -38,7 +41,7 @@ var ( // New creates a new OpConductor instance. func New(ctx context.Context, cfg *Config, log log.Logger, version string) (*OpConductor, error) { - return NewOpConductor(ctx, cfg, log, version, nil, nil, nil) + return NewOpConductor(ctx, cfg, log, metrics.NewMetrics(), version, nil, nil, nil) } // NewOpConductor creates a new OpConductor instance. @@ -46,6 +49,7 @@ func NewOpConductor( ctx context.Context, cfg *Config, log log.Logger, + m metrics.Metricer, version string, ctrl client.SequencerControl, cons consensus.Consensus, @@ -59,6 +63,7 @@ func NewOpConductor( log: log, version: version, cfg: cfg, + metrics: m, pauseCh: make(chan struct{}), pauseDoneCh: make(chan struct{}), resumeCh: make(chan struct{}), @@ -172,6 +177,7 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error { c.hmon = health.NewSequencerHealthMonitor( c.log, + c.metrics, c.cfg.HealthCheck.Interval, c.cfg.HealthCheck.UnsafeInterval, c.cfg.HealthCheck.SafeInterval, @@ -245,6 +251,7 @@ type OpConductor struct { log log.Logger version string cfg *Config + metrics metrics.Metricer ctrl client.SequencerControl cons consensus.Consensus @@ -271,7 +278,8 @@ type OpConductor struct { shutdownCtx context.Context shutdownCancel context.CancelFunc - rpcServer *oprpc.Server + rpcServer *oprpc.Server + metricsServer *httputil.HTTPServer } type state struct { @@ -310,9 +318,25 @@ func (oc *OpConductor) Start(ctx context.Context) error { return errors.Wrap(err, "failed to start JSON-RPC server") } + if oc.cfg.MetricsConfig.Enabled { + oc.log.Info("starting metrics server") + m, ok := oc.metrics.(opmetrics.RegistryMetricer) + if !ok { + return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", oc.metrics) + } + metricsServer, err := opmetrics.StartServer(m.Registry(), oc.cfg.MetricsConfig.ListenAddr, oc.cfg.MetricsConfig.ListenPort) + if err != nil { + return errors.Wrap(err, "failed to start metrics server") + } + oc.metricsServer = metricsServer + } + oc.wg.Add(1) go oc.loop() + oc.metrics.RecordInfo(oc.version) + oc.metrics.RecordUp() + oc.log.Info("OpConductor started") return nil } @@ -350,6 +374,12 @@ func (oc *OpConductor) Stop(ctx context.Context) error { } } + if oc.metricsServer != nil { + if err := oc.metricsServer.Shutdown(ctx); err != nil { + result = multierror.Append(result, errors.Wrap(err, "failed to stop metrics server")) + } + } + if result.ErrorOrNil() != nil { oc.log.Error("failed to stop OpConductor", "err", result.ErrorOrNil()) return result.ErrorOrNil() @@ -465,12 +495,14 @@ func (oc *OpConductor) loop() { defer oc.wg.Done() for { + startTime := time.Now() select { case <-oc.shutdownCtx.Done(): return default: oc.loopActionFn() } + oc.metrics.RecordLoopExecutionTime(time.Since(startTime).Seconds()) } } @@ -619,6 +651,7 @@ func (oc *OpConductor) action() { if !status.Equal(oc.prevState) { oc.log.Info("state changed", "prev_state", oc.prevState, "new_state", status) oc.prevState = status + oc.metrics.RecordStateChange(status.leader, status.healthy, status.active) } } @@ -627,6 +660,7 @@ func (oc *OpConductor) transferLeader() error { // TransferLeader here will do round robin to try to transfer leadership to the next healthy node. oc.log.Info("transferring leadership", "server", oc.cons.ServerID()) err := oc.cons.TransferLeader() + oc.metrics.RecordLeaderTransfer(err == nil) if err == nil { oc.leader.Store(false) return nil // success @@ -654,6 +688,7 @@ func (oc *OpConductor) stopSequencer() error { return errors.Wrap(err, "failed to stop sequencer") } } + oc.metrics.RecordStopSequencer(err == nil) oc.seqActive.Store(false) return nil @@ -684,7 +719,8 @@ func (oc *OpConductor) startSequencer() error { } oc.log.Info("starting sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load()) - if err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash); err != nil { + err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash) + if err != nil { // cannot directly compare using Errors.Is because the error is returned from an JSON RPC server which lost its type. if !strings.Contains(err.Error(), driver.ErrSequencerAlreadyStarted.Error()) { return fmt.Errorf("failed to start sequencer: %w", err) @@ -692,6 +728,7 @@ func (oc *OpConductor) startSequencer() error { oc.log.Warn("sequencer already started.", "err", err) } } + oc.metrics.RecordStartSequencer(err == nil) oc.seqActive.Store(true) return nil diff --git a/op-conductor/conductor/service_test.go b/op-conductor/conductor/service_test.go index 7feb3826646b..c0062bd6b1fd 100644 --- a/op-conductor/conductor/service_test.go +++ b/op-conductor/conductor/service_test.go @@ -19,6 +19,7 @@ import ( consensusmocks "github.com/ethereum-optimism/optimism/op-conductor/consensus/mocks" "github.com/ethereum-optimism/optimism/op-conductor/health" healthmocks "github.com/ethereum-optimism/optimism/op-conductor/health/mocks" + "github.com/ethereum-optimism/optimism/op-conductor/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/testlog" @@ -88,6 +89,7 @@ type OpConductorTestSuite struct { err error log log.Logger cfg Config + metrics metrics.Metricer version string ctrl *clientmocks.SequencerControl cons *consensusmocks.Consensus @@ -101,6 +103,7 @@ type OpConductorTestSuite struct { func (s *OpConductorTestSuite) SetupSuite() { s.ctx = context.Background() s.log = testlog.Logger(s.T(), log.LevelDebug) + s.metrics = &metrics.NoopMetricsImpl{} s.cfg = mockConfig(s.T()) s.version = "v0.0.1" s.next = make(chan struct{}, 1) @@ -113,7 +116,7 @@ func (s *OpConductorTestSuite) SetupTest() { s.hmon = &healthmocks.HealthMonitor{} s.cons.EXPECT().ServerID().Return("SequencerA") - conductor, err := NewOpConductor(s.ctx, &s.cfg, s.log, s.version, s.ctrl, s.cons, s.hmon) + conductor, err := NewOpConductor(s.ctx, &s.cfg, s.log, s.metrics, s.version, s.ctrl, s.cons, s.hmon) s.NoError(err) s.conductor = conductor diff --git a/op-conductor/health/monitor.go b/op-conductor/health/monitor.go index cad9406168dd..829b6490bf68 100644 --- a/op-conductor/health/monitor.go +++ b/op-conductor/health/monitor.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-conductor/metrics" "github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-service/dial" @@ -34,9 +35,10 @@ type HealthMonitor interface { // interval is the interval between health checks measured in seconds. // safeInterval is the interval between safe head progress measured in seconds. // minPeerCount is the minimum number of peers required for the sequencer to be healthy. -func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p p2p.API) HealthMonitor { +func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p p2p.API) HealthMonitor { return &SequencerHealthMonitor{ log: log, + metrics: metrics, done: make(chan struct{}), interval: interval, healthUpdateCh: make(chan error), @@ -53,9 +55,10 @@ func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInt // SequencerHealthMonitor monitors sequencer health. type SequencerHealthMonitor struct { - log log.Logger - done chan struct{} - wg sync.WaitGroup + log log.Logger + metrics metrics.Metricer + done chan struct{} + wg sync.WaitGroup rollupCfg *rollup.Config unsafeInterval uint64 @@ -112,7 +115,9 @@ func (hm *SequencerHealthMonitor) loop() { case <-hm.done: return case <-ticker.C: - hm.healthUpdateCh <- hm.healthCheck() + err := hm.healthCheck() + hm.metrics.RecordHealthCheck(err == nil, err) + hm.healthUpdateCh <- err } } } diff --git a/op-conductor/health/monitor_test.go b/op-conductor/health/monitor_test.go index e19d8cb1e0ca..1533e98a360c 100644 --- a/op-conductor/health/monitor_test.go +++ b/op-conductor/health/monitor_test.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/suite" + "github.com/ethereum-optimism/optimism/op-conductor/metrics" "github.com/ethereum-optimism/optimism/op-node/p2p" p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks" "github.com/ethereum-optimism/optimism/op-node/rollup" @@ -58,6 +59,7 @@ func (s *HealthMonitorTestSuite) SetupMonitor( log: s.log, done: make(chan struct{}), interval: s.interval, + metrics: &metrics.NoopMetricsImpl{}, healthUpdateCh: make(chan error), rollupCfg: s.rollupCfg, unsafeInterval: unsafeInterval, diff --git a/op-conductor/metrics/metrics.go b/op-conductor/metrics/metrics.go new file mode 100644 index 000000000000..4ac5e6847433 --- /dev/null +++ b/op-conductor/metrics/metrics.go @@ -0,0 +1,157 @@ +package metrics + +import ( + "strconv" + + "github.com/ethereum-optimism/optimism/op-service/httputil" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +const Namespace = "op_conductor" + +type Metricer interface { + RecordInfo(version string) + RecordUp() + RecordStateChange(leader bool, healthy bool, active bool) + RecordLeaderTransfer(success bool) + RecordStartSequencer(success bool) + RecordStopSequencer(success bool) + RecordHealthCheck(success bool, err error) + RecordLoopExecutionTime(duration float64) +} + +// Metrics implementation must implement RegistryMetricer to allow the metrics server to work. +var _ opmetrics.RegistryMetricer = (*Metrics)(nil) + +type Metrics struct { + ns string + registry *prometheus.Registry + factory opmetrics.Factory + + info prometheus.GaugeVec + up prometheus.Gauge + + healthChecks *prometheus.CounterVec + leaderTransfers *prometheus.CounterVec + sequencerStarts *prometheus.CounterVec + sequencerStops *prometheus.CounterVec + stateChanges *prometheus.CounterVec + + loopExecutionTime prometheus.Histogram +} + +func (m *Metrics) Registry() *prometheus.Registry { + return m.registry +} + +var _ Metricer = (*Metrics)(nil) + +func NewMetrics() *Metrics { + registry := opmetrics.NewRegistry() + factory := opmetrics.With(registry) + + return &Metrics{ + ns: Namespace, + registry: registry, + factory: factory, + + info: *factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "info", + Help: "Pseudo-metric tracking version and config info", + }, []string{ + "version", + }), + up: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "up", + Help: "1 if the op-conductor has finished starting up", + }), + healthChecks: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "healthchecks_count", + Help: "Number of healthchecks", + }, []string{"success", "error"}), + leaderTransfers: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "leader_transfers_count", + Help: "Number of leader transfers", + }, []string{"success"}), + sequencerStarts: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "sequencer_starts_count", + Help: "Number of sequencer starts", + }, []string{"success"}), + sequencerStops: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "sequencer_stops_count", + Help: "Number of sequencer stops", + }, []string{"success"}), + stateChanges: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "state_changes_count", + Help: "Number of state changes", + }, []string{ + "leader", + "healthy", + "active", + }), + loopExecutionTime: factory.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Name: "loop_execution_time", + Help: "Time (in seconds) to execute conductor loop iteration", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, + }), + } +} + +func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) { + return opmetrics.StartServer(m.registry, host, port) +} + +// RecordInfo sets a pseudo-metric that contains versioning and +// config info for the op-proposer. +func (m *Metrics) RecordInfo(version string) { + m.info.WithLabelValues(version).Set(1) +} + +// RecordUp sets the up metric to 1. +func (m *Metrics) RecordUp() { + prometheus.MustRegister() + m.up.Set(1) +} + +// RecordHealthCheck increments the healthChecks counter. +func (m *Metrics) RecordHealthCheck(success bool, err error) { + errStr := "" + if err != nil { + errStr = err.Error() + } + m.healthChecks.WithLabelValues(strconv.FormatBool(success), errStr).Inc() +} + +// RecordLeaderTransfer increments the leaderTransfers counter. +func (m *Metrics) RecordLeaderTransfer(success bool) { + m.leaderTransfers.WithLabelValues(strconv.FormatBool(success)).Inc() +} + +// RecordStateChange increments the stateChanges counter. +func (m *Metrics) RecordStateChange(leader bool, healthy bool, active bool) { + m.stateChanges.WithLabelValues(strconv.FormatBool(leader), strconv.FormatBool(healthy), strconv.FormatBool(active)).Inc() +} + +// RecordStartSequencer increments the sequencerStarts counter. +func (m *Metrics) RecordStartSequencer(success bool) { + m.sequencerStarts.WithLabelValues(strconv.FormatBool(success)).Inc() +} + +// RecordStopSequencer increments the sequencerStops counter. +func (m *Metrics) RecordStopSequencer(success bool) { + m.sequencerStops.WithLabelValues(strconv.FormatBool(success)).Inc() +} + +// RecordLoopExecutionTime records the time it took to execute the conductor loop. +func (m *Metrics) RecordLoopExecutionTime(duration float64) { + m.loopExecutionTime.Observe(duration) +} diff --git a/op-conductor/metrics/noop.go b/op-conductor/metrics/noop.go new file mode 100644 index 000000000000..c5c2ffc70d4d --- /dev/null +++ b/op-conductor/metrics/noop.go @@ -0,0 +1,14 @@ +package metrics + +type NoopMetricsImpl struct{} + +var NoopMetrics Metricer = new(NoopMetricsImpl) + +func (*NoopMetricsImpl) RecordInfo(version string) {} +func (*NoopMetricsImpl) RecordUp() {} +func (*NoopMetricsImpl) RecordStateChange(leader bool, healthy bool, active bool) {} +func (*NoopMetricsImpl) RecordLeaderTransfer(success bool) {} +func (*NoopMetricsImpl) RecordStartSequencer(success bool) {} +func (*NoopMetricsImpl) RecordStopSequencer(success bool) {} +func (*NoopMetricsImpl) RecordHealthCheck(success bool, err error) {} +func (*NoopMetricsImpl) RecordLoopExecutionTime(duration float64) {}