Skip to content

Commit

Permalink
feat: op-conductor strongly consistent reads (#10619)
Browse files Browse the repository at this point in the history
* op-conductor: add more logs for raft debugging

* Add barrier

* LatestUnsafePayload reads in a strongly consistent fashion

* Atomic swap OpConductor.healthy

* Fix conductor/service_test

* Add test for when LatestUnsafePayload returns an error

* Update some method comments

---------

Co-authored-by: Francis Li <francis.li@coinbase.com>
  • Loading branch information
BrianBland and 0x00101010 committed May 23, 2024
1 parent df2aeba commit 4a487b8
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 46 deletions.
41 changes: 25 additions & 16 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
)

var (
ErrResumeTimeout = errors.New("timeout to resume conductor")
ErrPauseTimeout = errors.New("timeout to pause conductor")
ErrUnsafeHeadMismarch = errors.New("unsafe head mismatch")
ErrUnableToRetrieveUnsafeHeadFromConsensus = errors.New("unable to retrieve unsafe head from consensus")
ErrResumeTimeout = errors.New("timeout to resume conductor")
ErrPauseTimeout = errors.New("timeout to pause conductor")
ErrUnsafeHeadMismatch = errors.New("unsafe head mismatch")
ErrNoUnsafeHead = errors.New("no unsafe head")
)

// New creates a new OpConductor instance.
Expand Down Expand Up @@ -441,7 +441,7 @@ func (oc *OpConductor) TransferLeaderToServer(_ context.Context, id string, addr
return oc.cons.TransferLeaderTo(id, addr)
}

// CommitUnsafePayload commits a unsafe payload (latest head) to the cluster FSM.
// CommitUnsafePayload commits an unsafe payload (latest head) to the cluster FSM ensuring strong consistency by leveraging Raft consensus mechanisms.
func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return oc.cons.CommitUnsafePayload(payload)
}
Expand All @@ -456,8 +456,8 @@ func (oc *OpConductor) ClusterMembership(_ context.Context) ([]*consensus.Server
return oc.cons.ClusterMembership()
}

// LatestUnsafePayload returns the latest unsafe payload envelope from FSM.
func (oc *OpConductor) LatestUnsafePayload(_ context.Context) *eth.ExecutionPayloadEnvelope {
// LatestUnsafePayload returns the latest unsafe payload envelope from FSM in a strongly consistent fashion.
func (oc *OpConductor) LatestUnsafePayload(_ context.Context) (*eth.ExecutionPayloadEnvelope, error) {
return oc.cons.LatestUnsafePayload()
}

Expand Down Expand Up @@ -522,12 +522,11 @@ func (oc *OpConductor) handleHealthUpdate(hcerr error) {
oc.queueAction()
}

if healthy != oc.healthy.Load() {
if oc.healthy.Swap(healthy) != healthy {
// queue an action if health status changed.
oc.queueAction()
}

oc.healthy.Store(healthy)
oc.hcerr = hcerr
}

Expand Down Expand Up @@ -668,8 +667,15 @@ func (oc *OpConductor) startSequencer() error {
unsafeInCons, unsafeInNode, err := oc.compareUnsafeHead(ctx)
// if there's a mismatch, try to post the unsafe head to op-node
if err != nil {
if errors.Is(err, ErrUnsafeHeadMismarch) && uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 {
if errors.Is(err, ErrUnsafeHeadMismatch) && uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 {
// tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay)
oc.log.Debug(
"posting unsafe head to op-node",
"consensus_num", uint64(unsafeInCons.ExecutionPayload.BlockNumber),
"consensus_hash", unsafeInCons.ExecutionPayload.BlockHash.Hex(),
"node_num", unsafeInNode.NumberU64(),
"node_hash", unsafeInNode.Hash().Hex(),
)
if innerErr := oc.ctrl.PostUnsafePayload(ctx, unsafeInCons); innerErr != nil {
oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", innerErr)
}
Expand All @@ -692,27 +698,30 @@ func (oc *OpConductor) startSequencer() error {
}

func (oc *OpConductor) compareUnsafeHead(ctx context.Context) (*eth.ExecutionPayloadEnvelope, eth.BlockInfo, error) {
unsafeInCons := oc.cons.LatestUnsafePayload()
unsafeInCons, err := oc.cons.LatestUnsafePayload()
if err != nil {
return nil, nil, errors.Wrap(err, "unable to retrieve unsafe head from consensus")
}
if unsafeInCons == nil {
return nil, nil, ErrUnableToRetrieveUnsafeHeadFromConsensus
return nil, nil, ErrNoUnsafeHead
}

unsafeInNode, err := oc.ctrl.LatestUnsafeBlock(ctx)
if err != nil {
return unsafeInCons, nil, errors.Wrap(err, "failed to get latest unsafe block from EL during compareUnsafeHead phase")
}

oc.log.Debug("comparing unsafe head", "consensus", unsafeInCons.ExecutionPayload.BlockNumber, "node", unsafeInNode.NumberU64())
oc.log.Debug("comparing unsafe head", "consensus", uint64(unsafeInCons.ExecutionPayload.BlockNumber), "node", unsafeInNode.NumberU64())
if unsafeInCons.ExecutionPayload.BlockHash != unsafeInNode.Hash() {
oc.log.Warn(
"latest unsafe block in consensus is not the same as the one in op-node",
"consensus_hash", unsafeInCons.ExecutionPayload.BlockHash,
"consensus_block_num", unsafeInCons.ExecutionPayload.BlockNumber,
"consensus_num", uint64(unsafeInCons.ExecutionPayload.BlockNumber),
"node_hash", unsafeInNode.Hash(),
"node_block_num", unsafeInNode.NumberU64(),
"node_num", unsafeInNode.NumberU64(),
)

return unsafeInCons, unsafeInNode, ErrUnsafeHeadMismarch
return unsafeInCons, unsafeInNode, ErrUnsafeHeadMismatch
}

return unsafeInCons, unsafeInNode, nil
Expand Down
46 changes: 41 additions & 5 deletions op-conductor/conductor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (s *OpConductorTestSuite) TestScenario1() {
InfoHash: [32]byte{1, 2, 3},
}
s.cons.EXPECT().TransferLeader().Return(nil)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)

// become leader
Expand All @@ -317,6 +317,42 @@ func (s *OpConductorTestSuite) TestScenario1() {
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
}

// In this test, we have a follower that is not healthy and not sequencing, it becomes leader through election.
// But since it fails to compare the unsafe head to the value stored in consensus, we expect it to transfer leadership to another node.
// [follower, not healthy, not sequencing] -- become leader --> [leader, not healthy, not sequencing] -- transfer leadership --> [follower, not healthy, not sequencing]
func (s *OpConductorTestSuite) TestScenario1Err() {
s.enableSynchronization()

// set initial state
s.conductor.leader.Store(false)
s.conductor.healthy.Store(false)
s.conductor.seqActive.Store(false)
s.conductor.hcerr = health.ErrSequencerNotHealthy
s.conductor.prevState = &state{
leader: false,
healthy: false,
active: false,
}

s.cons.EXPECT().LatestUnsafePayload().Return(nil, errors.New("fake connection error")).Times(1)
s.cons.EXPECT().TransferLeader().Return(nil)

// become leader
s.updateLeaderStatusAndExecuteAction(true)

// expect to transfer leadership, go back to [follower, not healthy, not sequencing]
s.False(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.False(s.conductor.seqActive.Load())
s.Equal(health.ErrSequencerNotHealthy, s.conductor.hcerr)
s.Equal(&state{
leader: true,
healthy: false,
active: false,
}, s.conductor.prevState)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
}

// In this test, we have a follower that is not healthy and not sequencing. it becomes healthy and we expect it to stay as follower and not start sequencing.
// [follower, not healthy, not sequencing] -- become healthy --> [follower, healthy, not sequencing]
func (s *OpConductorTestSuite) TestScenario2() {
Expand Down Expand Up @@ -353,7 +389,7 @@ func (s *OpConductorTestSuite) TestScenario3() {
InfoNum: 1,
InfoHash: [32]byte{1, 2, 3},
}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mock.Anything).Return(nil).Times(1)

Expand Down Expand Up @@ -392,7 +428,7 @@ func (s *OpConductorTestSuite) TestScenario4() {
InfoNum: 1,
InfoHash: [32]byte{2, 3, 4},
}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().PostUnsafePayload(mock.Anything, mock.Anything).Return(nil).Times(1)

Expand All @@ -410,7 +446,7 @@ func (s *OpConductorTestSuite) TestScenario4() {
// unsafe caught up, we try to start sequencer at specified block and succeeds
mockBlockInfo.InfoNum = 2
mockBlockInfo.InfoHash = [32]byte{1, 2, 3}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mockBlockInfo.InfoHash).Return(nil).Times(1)

Expand Down Expand Up @@ -664,7 +700,7 @@ func (s *OpConductorTestSuite) TestFailureAndRetry3() {
InfoNum: 1,
InfoHash: [32]byte{1, 2, 3},
}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(2)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(2)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(2)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mockBlockInfo.InfoHash).Return(nil).Times(1)

Expand Down
6 changes: 3 additions & 3 deletions op-conductor/consensus/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ type Consensus interface {
// ClusterMembership returns the current cluster membership configuration.
ClusterMembership() ([]*ServerInfo, error)

// CommitPayload commits latest unsafe payload to the FSM.
// CommitPayload commits latest unsafe payload to the FSM in a strongly consistent fashion.
CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error
// LatestUnsafeBlock returns the latest unsafe payload from FSM.
LatestUnsafePayload() *eth.ExecutionPayloadEnvelope
// LatestUnsafeBlock returns the latest unsafe payload from FSM in a strongly consistent fashion.
LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error)

// Shutdown shuts down the consensus protocol client.
Shutdown() error
Expand Down
20 changes: 15 additions & 5 deletions op-conductor/consensus/mocks/Consensus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 13 additions & 8 deletions op-conductor/consensus/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b
return nil, errors.Wrap(err, "failed to create raft tcp transport")
}

fsm := &unsafeHeadTracker{}
fsm := NewUnsafeHeadTracker(log)

r, err := raft.NewRaft(rc, fsm, logStore, stableStore, snapshotStore, transport)
if err != nil {
Expand Down Expand Up @@ -140,8 +140,7 @@ func (rc *RaftConsensus) DemoteVoter(id string) error {

// Leader implements Consensus, it returns true if it is the leader of the cluster.
func (rc *RaftConsensus) Leader() bool {
_, id := rc.r.LeaderWithID()
return id == rc.serverID
return rc.r.State() == raft.Leader
}

// LeaderWithID implements Consensus, it returns the leader's server ID and address.
Expand Down Expand Up @@ -205,8 +204,10 @@ func (rc *RaftConsensus) Shutdown() error {
return nil
}

// CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM.
// CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM in a strongly consistent fashion.
func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error {
rc.log.Debug("committing unsafe payload", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex())

var buf bytes.Buffer
if _, err := payload.MarshalSSZ(&buf); err != nil {
return errors.Wrap(err, "failed to marshal payload envelope")
Expand All @@ -216,14 +217,18 @@ func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelo
if err := f.Error(); err != nil {
return errors.Wrap(err, "failed to apply payload envelope")
}
rc.log.Debug("unsafe payload committed", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex())

return nil
}

// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM.
func (rc *RaftConsensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope {
payload := rc.unsafeTracker.UnsafeHead()
return payload
// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM in a strongly consistent fashion.
func (rc *RaftConsensus) LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error) {
if err := rc.r.Barrier(defaultTimeout).Error(); err != nil {
return nil, errors.Wrap(err, "failed to apply barrier")
}

return rc.unsafeTracker.UnsafeHead(), nil
}

// ClusterMembership implements Consensus, it returns the current cluster membership configuration.
Expand Down
8 changes: 8 additions & 0 deletions op-conductor/consensus/raft_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@ var _ raft.FSM = (*unsafeHeadTracker)(nil)

// unsafeHeadTracker implements raft.FSM for storing unsafe head payload into raft consensus layer.
type unsafeHeadTracker struct {
log log.Logger
mtx sync.RWMutex
unsafeHead *eth.ExecutionPayloadEnvelope
}

func NewUnsafeHeadTracker(log log.Logger) *unsafeHeadTracker {
return &unsafeHeadTracker{
log: log,
}
}

// Apply implements raft.FSM, it applies the latest change (latest unsafe head payload) to FSM.
func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} {
if l.Data == nil || len(l.Data) == 0 {
Expand All @@ -33,6 +40,7 @@ func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} {

t.mtx.Lock()
defer t.mtx.Unlock()
t.log.Debug("applying new unsafe head", "number", uint64(data.ExecutionPayload.BlockNumber), "hash", data.ExecutionPayload.BlockHash.Hex())
if t.unsafeHead == nil || t.unsafeHead.ExecutionPayload.BlockNumber < data.ExecutionPayload.BlockNumber {
t.unsafeHead = data
}
Expand Down
Loading

0 comments on commit 4a487b8

Please sign in to comment.