Skip to content

Commit

Permalink
Wait for Replication and Indexing to complete before becoming leader
Browse files Browse the repository at this point in the history
- This change ensures that before becoming leader we successfully write an epoch and get it replicated. Thus demonstrating that we are in a fit state to become leader.
- Therefore the log is successfully replicated before we transition to leader.
- We also wait for the records to be indexed before we transition to leader, otherwise incorrect event numbers can be written because the latest events are not known to the index or IndexWriter

- The InaugurationManager is reponsible for making sure the conditions are met for transitioning to leader.
- In the PreLeader state we now accept replication subscriptions and monitor for quorum, transitioning to Unknown if a quorum is not maintained (exactly as we did in the leader state before)
  • Loading branch information
timothycoleman committed Sep 28, 2021
1 parent 41313b2 commit 3e46122
Show file tree
Hide file tree
Showing 21 changed files with 731 additions and 45 deletions.
Expand Up @@ -504,7 +504,7 @@ public class when_state_changed_to_leader : NodeGossipServiceTestFixture {
);

protected override Message When() =>
new SystemMessage.BecomeLeader(Guid.NewGuid(),0);
new SystemMessage.BecomeLeader(Guid.NewGuid());

[Test]
public void should_update_gossip() {
Expand Down
Expand Up @@ -68,7 +68,7 @@ public abstract class with_replication_service : SpecificationWithDirectoryPerTe
queueStatsManager: new QueueStatsManager());

Service.Handle(new SystemMessage.SystemStart());
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid(),0));
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid()));

ReplicaSubscriptionId = AddSubscription(ReplicaId, true, out ReplicaManager1);
ReplicaSubscriptionId2 = AddSubscription(ReplicaId2, true, out ReplicaManager2);
Expand Down Expand Up @@ -114,7 +114,7 @@ public abstract class with_replication_service : SpecificationWithDirectoryPerTe

public abstract void When();
protected void BecomeLeader() {
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid(),0));
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid()));
}

protected void BecomeUnknown() {
Expand Down
Expand Up @@ -37,7 +37,7 @@ public abstract class with_clustered_replication_tracking_service:
public abstract void When();

protected void BecomeLeader() {
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid(),0));
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid()));
}

protected void BecomeUnknown() {
Expand Down
@@ -0,0 +1,15 @@
using System;
using EventStore.Core.Services.VNode;

namespace EventStore.Core.Tests.Services.VNode.InaugurationManagement {
public class FakeRecordFinder : ILastIndexableRecordFinder {
private readonly long _lastIndexableRecordPosition;

public FakeRecordFinder(long lastIndexableRecordPosition) {
_lastIndexableRecordPosition = lastIndexableRecordPosition;
}
public long GetLastIndexableRecordPosition() {
return _lastIndexableRecordPosition;
}
}
}
@@ -0,0 +1,108 @@
using System;
using EventStore.Core.Cluster;
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
using EventStore.Core.Services.VNode;
using EventStore.Core.Tests.Fakes;
using EventStore.Core.TransactionLog.Checkpoint;
using EventStore.Core.TransactionLog.LogRecords;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.VNode.InaugurationManagement {
public abstract class InaugurationManagerTests {
protected readonly Guid _correlationId1 = Guid.Parse("00000000-0000-0000-0000-0000000000c1");
protected readonly Guid _correlationId2 = Guid.Parse("00000000-0000-0000-0000-0000000000c2");
protected readonly Guid _correlationId3 = Guid.Parse("00000000-0000-0000-0000-0000000000c3");
protected readonly int _epochNumber = 15;
protected readonly MemberInfo _leader =
new MemberInfo(new MemberInfoDto {
InternalTcpIp = "localhost",
HttpEndPointIp = "localhost",
});
protected readonly long _replicationTarget = 400;
protected readonly long _indexTarget = 300;

protected InaugurationManager _sut;
protected FakePublisher _publisher;
protected InMemoryCheckpoint _replicationCheckpoint;
protected InMemoryCheckpoint _indexCheckpoint;
protected FakeRecordFinder _recordFinder;

[SetUp]
public void SetUp() {
_publisher = new FakePublisher();
_replicationCheckpoint = new InMemoryCheckpoint();
_indexCheckpoint = new InMemoryCheckpoint();
_recordFinder = new FakeRecordFinder(lastIndexableRecordPosition: _indexTarget);
_sut = new InaugurationManager(_publisher, _replicationCheckpoint, _indexCheckpoint, _recordFinder);
Given();
}

protected abstract void Given();

protected void When(Message m) {
_sut.Handle((dynamic)m);
}

protected SystemMessage.EpochWritten GenEpoch(int epochNumber) {
var epoch = new SystemMessage.EpochWritten(new EpochRecord(
epochPosition: _replicationTarget - 1,
epochNumber: epochNumber,
epochId: Guid.NewGuid(),
prevEpochPosition: _replicationTarget - 20,
timeStamp: DateTime.UtcNow,
leaderInstanceId: Guid.NewGuid()));
return epoch;
}

protected void ProgressReplication() {
_replicationCheckpoint.Write(_replicationTarget / 2);
_replicationCheckpoint.Flush();
}

protected void CompleteReplication() {
_replicationCheckpoint.Write(_replicationTarget);
_replicationCheckpoint.Flush();
}

protected void ProgressIndexing() {
_indexCheckpoint.Write(_indexTarget / 2);
_indexCheckpoint.Flush();
}

protected void CompleteIndexing() {
_indexCheckpoint.Write(_indexTarget);
_indexCheckpoint.Flush();
}

protected static T AssertIsType<T>(object o) {
Assert.IsInstanceOf<T>(o);
return (T)o;
}

// check that we have reset to initial state, do this by checking we can
// proceed forward from it
protected void AssertInitial() {
Assert.IsEmpty(_publisher.Messages);
_sut.Handle(new SystemMessage.BecomePreLeader(_correlationId3));
AssertWaitingForChaser(_correlationId3);
}

// check that we have reset to waiting fo chaser, do this by checking we can
// proceed forward from it
protected void AssertWaitingForChaser(Guid expectedCorrelationId) {
Assert.IsEmpty(_publisher.Messages);
_sut.Handle(new SystemMessage.ChaserCaughtUp(expectedCorrelationId));
Assert.AreEqual(1, _publisher.Messages.Count);
Assert.IsInstanceOf<SystemMessage.WriteEpoch>(_publisher.Messages[0]);
}

protected void AssertSentBecomeLeader() {
Assert.AreEqual(1, _publisher.Messages.Count);
var becomeLeader = AssertIsType<SystemMessage.BecomeLeader>(_publisher.Messages[0]);
Assert.AreEqual(_correlationId1, becomeLeader.CorrelationId);
_publisher.Messages.Clear();
AssertInitial();
}
}
}
@@ -0,0 +1,24 @@
using System;
using EventStore.Core.Messages;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.VNode.InaugurationManagement {
[TestFixture]
public class given_initial_state : InaugurationManagerTests {
protected override void Given() {
_publisher.Messages.Clear();
}

[Test]
public void when_become_pre_leader() {
When(new SystemMessage.BecomePreLeader(_correlationId1));
AssertWaitingForChaser(_correlationId1);
}

[Test]
public void when_become_other_node_state() {
When(new SystemMessage.BecomeUnknown(Guid.NewGuid()));
AssertInitial();
}
}
}
@@ -0,0 +1,41 @@
using System;
using EventStore.Core.Messages;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.VNode.InaugurationManagement {

[TestFixture]
public class given_waiting_for_chaser : InaugurationManagerTests {
protected override void Given() {
_sut.Handle(new ElectionMessage.ElectionsDone(123, _epochNumber, _leader));
_sut.Handle(new SystemMessage.BecomePreLeader(_correlationId1));
_publisher.Messages.Clear();
}

[Test]
public void when_chaser_caught_up() {
When(new SystemMessage.ChaserCaughtUp(_correlationId1));
Assert.AreEqual(1, _publisher.Messages.Count);
var writeEpoch = AssertIsType<SystemMessage.WriteEpoch>(_publisher.Messages[0]);
Assert.AreEqual(_epochNumber, writeEpoch.EpochNumber);
}

[Test]
public void when_chaser_caught_up_with_unknown_correlation_id() {
When(new SystemMessage.ChaserCaughtUp(_correlationId2));
Assert.IsEmpty(_publisher.Messages);
}

[Test]
public void when_become_pre_leader() {
When(new SystemMessage.BecomePreLeader(_correlationId2));
AssertWaitingForChaser(_correlationId2);
}

[Test]
public void when_become_other_node_state() {
When(new SystemMessage.BecomeUnknown(Guid.NewGuid()));
AssertInitial();
}
}
}
@@ -0,0 +1,94 @@
using System;
using EventStore.Core.Messages;
using EventStore.Core.Services.TimerService;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.VNode.InaugurationManagement {
[TestFixture]
public class given_waiting_for_conditions : InaugurationManagerTests {
protected override void Given() {
_sut.Handle(new ElectionMessage.ElectionsDone(123, _epochNumber, _leader));
_sut.Handle(new SystemMessage.BecomePreLeader(_correlationId1));
_sut.Handle(new SystemMessage.ChaserCaughtUp(_correlationId1));
_sut.Handle(GenEpoch(_epochNumber));
_publisher.Messages.Clear();
}

[Test]
public void when_transition_triggered_by_indexedto() {
ProgressReplication();
ProgressIndexing();
CompleteReplication();
CompleteIndexing();
Assert.IsEmpty(_publisher.Messages);

When(new ReplicationTrackingMessage.IndexedTo(_indexCheckpoint.Read()));

AssertSentBecomeLeader();
}

[Test]
public void when_transition_triggered_by_replicatedto() {
ProgressReplication();
ProgressIndexing();
CompleteReplication();
CompleteIndexing();
Assert.IsEmpty(_publisher.Messages);

When(new ReplicationTrackingMessage.ReplicatedTo(_replicationCheckpoint.Read()));

AssertSentBecomeLeader();
}

[Test]
public void when_transition_triggered_by_checkconditions() {
CompleteReplication();
CompleteIndexing();
Assert.IsEmpty(_publisher.Messages);

When(new SystemMessage.CheckInaugurationConditions());

Assert.AreEqual(2, _publisher.Messages.Count);
Assert.IsInstanceOf<TimerMessage.Schedule>(_publisher.Messages[0]);
_publisher.Messages.RemoveAt(0);

AssertSentBecomeLeader();
}

[Test]
public void cant_become_leader_twice() {
CompleteReplication();
CompleteIndexing();
Assert.IsEmpty(_publisher.Messages);

When(new ReplicationTrackingMessage.ReplicatedTo(_replicationCheckpoint.Read()));
When(new SystemMessage.CheckInaugurationConditions());

AssertSentBecomeLeader();
}

[Test]
public void when_epoch_written() {
When(GenEpoch(_epochNumber));
Assert.IsEmpty(_publisher.Messages);
}

[Test]
public void when_chaser_caught_up() {
When(new SystemMessage.ChaserCaughtUp(_correlationId1));
Assert.IsEmpty(_publisher.Messages);
}

[Test]
public void when_become_pre_leader() {
When(new SystemMessage.BecomePreLeader(_correlationId2));
AssertWaitingForChaser(_correlationId2);
}

[Test]
public void when_become_other_node_state() {
When(new SystemMessage.BecomeUnknown(Guid.NewGuid()));
AssertInitial();
}
}
}
@@ -0,0 +1,61 @@
using System;
using EventStore.Core.Messages;
using EventStore.Core.Services.TimerService;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.VNode.InaugurationManagement {
[TestFixture]
public class given_writing_epoch : InaugurationManagerTests {
protected override void Given() {
_sut.Handle(new ElectionMessage.ElectionsDone(123, _epochNumber, _leader));
_sut.Handle(new SystemMessage.BecomePreLeader(_correlationId1));
_sut.Handle(new SystemMessage.ChaserCaughtUp(_correlationId1));
_publisher.Messages.Clear();
}

[Test]
public void when_epoch_written() {
When(GenEpoch(_epochNumber));
Assert.AreEqual(2, _publisher.Messages.Count);
Assert.IsInstanceOf<SystemMessage.EnablePreLeaderReplication>(_publisher.Messages[0]);
var schedule = AssertIsType<TimerMessage.Schedule>(_publisher.Messages[1]);
Assert.IsInstanceOf<SystemMessage.CheckInaugurationConditions>(schedule.ReplyMessage);
}

[Test]
public void when_epoch_written_with_unexpected_number() {
When(GenEpoch(_epochNumber + 1));
Assert.IsEmpty(_publisher.Messages);
}

[Test]
public void when_epoch_written_and_conditions_instantly_met() {
CompleteReplication();
CompleteIndexing();
When(GenEpoch(_epochNumber));

Assert.AreEqual(3, _publisher.Messages.Count);
Assert.IsInstanceOf<SystemMessage.EnablePreLeaderReplication>(_publisher.Messages[0]);
Assert.IsInstanceOf<SystemMessage.BecomeLeader>(_publisher.Messages[1]);
Assert.IsInstanceOf<TimerMessage.Schedule>(_publisher.Messages[2]);
}

[Test]
public void when_chaser_caught_up() {
When(new SystemMessage.ChaserCaughtUp(_correlationId1));
Assert.IsEmpty(_publisher.Messages);
}

[Test]
public void when_become_pre_leader() {
When(new SystemMessage.BecomePreLeader(_correlationId2));
AssertWaitingForChaser(_correlationId2);
}

[Test]
public void when_become_other_node_state() {
When(new SystemMessage.BecomeUnknown(Guid.NewGuid()));
AssertInitial();
}
}
}

0 comments on commit 3e46122

Please sign in to comment.