Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
zhwrd committed May 23, 2024
1 parent 246fdb1 commit 2837d2b
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 10 deletions.
1 change: 0 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,6 @@ workflows:
name: op-conductor-docker-build
docker_name: op-conductor
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
requires: ['op-stack-go-docker-build']
platforms: "linux/amd64,linux/arm64"
publish: true
context:
Expand Down
43 changes: 40 additions & 3 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
"github.com/ethereum-optimism/optimism/op-conductor/client"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-conductor/health"
"github.com/ethereum-optimism/optimism/op-conductor/metrics"
conductorrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
opp2p "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
)
Expand All @@ -38,14 +41,15 @@ var (

// New creates a new OpConductor instance.
func New(ctx context.Context, cfg *Config, log log.Logger, version string) (*OpConductor, error) {
return NewOpConductor(ctx, cfg, log, version, nil, nil, nil)
return NewOpConductor(ctx, cfg, log, metrics.NewMetrics(), version, nil, nil, nil)
}

// NewOpConductor creates a new OpConductor instance.
func NewOpConductor(
ctx context.Context,
cfg *Config,
log log.Logger,
m metrics.Metricer,
version string,
ctrl client.SequencerControl,
cons consensus.Consensus,
Expand All @@ -59,6 +63,7 @@ func NewOpConductor(
log: log,
version: version,
cfg: cfg,
metrics: m,
pauseCh: make(chan struct{}),
pauseDoneCh: make(chan struct{}),
resumeCh: make(chan struct{}),
Expand Down Expand Up @@ -172,6 +177,7 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error {

c.hmon = health.NewSequencerHealthMonitor(
c.log,
c.metrics,
c.cfg.HealthCheck.Interval,
c.cfg.HealthCheck.UnsafeInterval,
c.cfg.HealthCheck.SafeInterval,
Expand Down Expand Up @@ -245,6 +251,7 @@ type OpConductor struct {
log log.Logger
version string
cfg *Config
metrics metrics.Metricer

ctrl client.SequencerControl
cons consensus.Consensus
Expand All @@ -271,7 +278,8 @@ type OpConductor struct {
shutdownCtx context.Context
shutdownCancel context.CancelFunc

rpcServer *oprpc.Server
rpcServer *oprpc.Server
metricsServer *httputil.HTTPServer
}

type state struct {
Expand Down Expand Up @@ -310,9 +318,25 @@ func (oc *OpConductor) Start(ctx context.Context) error {
return errors.Wrap(err, "failed to start JSON-RPC server")
}

if oc.cfg.MetricsConfig.Enabled {
oc.log.Info("starting metrics server")
m, ok := oc.metrics.(opmetrics.RegistryMetricer)
if !ok {
return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", oc.metrics)
}
metricsServer, err := opmetrics.StartServer(m.Registry(), oc.cfg.MetricsConfig.ListenAddr, oc.cfg.MetricsConfig.ListenPort)
if err != nil {
return errors.Wrap(err, "failed to start metrics server")
}
oc.metricsServer = metricsServer
}

oc.wg.Add(1)
go oc.loop()

oc.metrics.RecordInfo(oc.version)
oc.metrics.RecordUp()

oc.log.Info("OpConductor started")
return nil
}
Expand Down Expand Up @@ -350,6 +374,12 @@ func (oc *OpConductor) Stop(ctx context.Context) error {
}
}

if oc.metricsServer != nil {
if err := oc.metricsServer.Shutdown(ctx); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to stop metrics server"))
}
}

if result.ErrorOrNil() != nil {
oc.log.Error("failed to stop OpConductor", "err", result.ErrorOrNil())
return result.ErrorOrNil()
Expand Down Expand Up @@ -465,12 +495,14 @@ func (oc *OpConductor) loop() {
defer oc.wg.Done()

for {
startTime := time.Now()
select {
case <-oc.shutdownCtx.Done():
return
default:
oc.loopActionFn()
}
oc.metrics.RecordLoopExecutionTime(time.Since(startTime).Seconds())
}
}

Expand Down Expand Up @@ -619,6 +651,7 @@ func (oc *OpConductor) action() {
if !status.Equal(oc.prevState) {
oc.log.Info("state changed", "prev_state", oc.prevState, "new_state", status)
oc.prevState = status
oc.metrics.RecordStateChange(status.leader, status.healthy, status.active)
}
}

Expand All @@ -627,6 +660,7 @@ func (oc *OpConductor) transferLeader() error {
// TransferLeader here will do round robin to try to transfer leadership to the next healthy node.
oc.log.Info("transferring leadership", "server", oc.cons.ServerID())
err := oc.cons.TransferLeader()
oc.metrics.RecordLeaderTransfer(err == nil)
if err == nil {
oc.leader.Store(false)
return nil // success
Expand Down Expand Up @@ -654,6 +688,7 @@ func (oc *OpConductor) stopSequencer() error {
return errors.Wrap(err, "failed to stop sequencer")
}
}
oc.metrics.RecordStopSequencer(err == nil)

oc.seqActive.Store(false)
return nil
Expand Down Expand Up @@ -684,14 +719,16 @@ func (oc *OpConductor) startSequencer() error {
}

oc.log.Info("starting sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())
if err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash); err != nil {
err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash)
if err != nil {
// cannot directly compare using Errors.Is because the error is returned from an JSON RPC server which lost its type.
if !strings.Contains(err.Error(), driver.ErrSequencerAlreadyStarted.Error()) {
return fmt.Errorf("failed to start sequencer: %w", err)
} else {
oc.log.Warn("sequencer already started.", "err", err)
}
}
oc.metrics.RecordStartSequencer(err == nil)

oc.seqActive.Store(true)
return nil
Expand Down
5 changes: 4 additions & 1 deletion op-conductor/conductor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
consensusmocks "github.com/ethereum-optimism/optimism/op-conductor/consensus/mocks"
"github.com/ethereum-optimism/optimism/op-conductor/health"
healthmocks "github.com/ethereum-optimism/optimism/op-conductor/health/mocks"
"github.com/ethereum-optimism/optimism/op-conductor/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
Expand Down Expand Up @@ -88,6 +89,7 @@ type OpConductorTestSuite struct {
err error
log log.Logger
cfg Config
metrics metrics.Metricer
version string
ctrl *clientmocks.SequencerControl
cons *consensusmocks.Consensus
Expand All @@ -101,6 +103,7 @@ type OpConductorTestSuite struct {
func (s *OpConductorTestSuite) SetupSuite() {
s.ctx = context.Background()
s.log = testlog.Logger(s.T(), log.LevelDebug)
s.metrics = &metrics.NoopMetricsImpl{}
s.cfg = mockConfig(s.T())
s.version = "v0.0.1"
s.next = make(chan struct{}, 1)
Expand All @@ -113,7 +116,7 @@ func (s *OpConductorTestSuite) SetupTest() {
s.hmon = &healthmocks.HealthMonitor{}
s.cons.EXPECT().ServerID().Return("SequencerA")

conductor, err := NewOpConductor(s.ctx, &s.cfg, s.log, s.version, s.ctrl, s.cons, s.hmon)
conductor, err := NewOpConductor(s.ctx, &s.cfg, s.log, s.metrics, s.version, s.ctrl, s.cons, s.hmon)
s.NoError(err)
s.conductor = conductor

Expand Down
15 changes: 10 additions & 5 deletions op-conductor/health/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-conductor/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/dial"
Expand All @@ -34,9 +35,10 @@ type HealthMonitor interface {
// interval is the interval between health checks measured in seconds.
// safeInterval is the interval between safe head progress measured in seconds.
// minPeerCount is the minimum number of peers required for the sequencer to be healthy.
func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p p2p.API) HealthMonitor {
func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p p2p.API) HealthMonitor {
return &SequencerHealthMonitor{
log: log,
metrics: metrics,
done: make(chan struct{}),
interval: interval,
healthUpdateCh: make(chan error),
Expand All @@ -53,9 +55,10 @@ func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInt

// SequencerHealthMonitor monitors sequencer health.
type SequencerHealthMonitor struct {
log log.Logger
done chan struct{}
wg sync.WaitGroup
log log.Logger
metrics metrics.Metricer
done chan struct{}
wg sync.WaitGroup

rollupCfg *rollup.Config
unsafeInterval uint64
Expand Down Expand Up @@ -112,7 +115,9 @@ func (hm *SequencerHealthMonitor) loop() {
case <-hm.done:
return
case <-ticker.C:
hm.healthUpdateCh <- hm.healthCheck()
err := hm.healthCheck()
hm.metrics.RecordHealthCheck(err == nil, err)
hm.healthUpdateCh <- err
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions op-conductor/health/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/suite"

"github.com/ethereum-optimism/optimism/op-conductor/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/rollup"
Expand Down Expand Up @@ -58,6 +59,7 @@ func (s *HealthMonitorTestSuite) SetupMonitor(
log: s.log,
done: make(chan struct{}),
interval: s.interval,
metrics: &metrics.NoopMetricsImpl{},
healthUpdateCh: make(chan error),
rollupCfg: s.rollupCfg,
unsafeInterval: unsafeInterval,
Expand Down
Loading

0 comments on commit 2837d2b

Please sign in to comment.