Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for Replication and Indexing to complete before becoming leader #3187

Merged
merged 1 commit into from Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -112,7 +112,7 @@ public class read_all_events_filtered_paging_should : SpecificationWithMiniNode
.GetAwaiter()
.GetResult();

Assert.AreEqual(2, slice.Events.Length); // Includes system events at start of stream
Assert.AreEqual(1, slice.Events.Length); // Includes system events at start of stream (inc epoch-information)
}

[Test, Category("LongRunning")]
Expand Down
Expand Up @@ -504,7 +504,7 @@ public class when_state_changed_to_leader : NodeGossipServiceTestFixture {
);

protected override Message When() =>
new SystemMessage.BecomeLeader(Guid.NewGuid(),0);
new SystemMessage.BecomeLeader(Guid.NewGuid());

[Test]
public void should_update_gossip() {
Expand Down
Expand Up @@ -324,8 +324,8 @@ public class when_replica_subscribes_with_uncached_epoch_that_does_not_exist_on_
EpochManager.WriteNewEpoch(8);

_replicaEpochs = new List<Epoch> {
new Epoch(_uncachedLeaderEpochs[0].EpochPosition + 800, 3, Guid.NewGuid()),
new Epoch(_uncachedLeaderEpochs[0].EpochPosition + 400, 2, Guid.NewGuid()),
new Epoch(_uncachedLeaderEpochs[0].EpochPosition + 8000, 3, Guid.NewGuid()),
new Epoch(_uncachedLeaderEpochs[0].EpochPosition + 4000, 2, Guid.NewGuid()),
new Epoch(_uncachedLeaderEpochs[0].EpochPosition, _uncachedLeaderEpochs[0].EpochNumber, _uncachedLeaderEpochs[0].EpochId),
new Epoch(_uncachedLeaderEpochs[1].EpochPosition, _uncachedLeaderEpochs[1].EpochNumber, _uncachedLeaderEpochs[1].EpochId)
};
Expand Down
Expand Up @@ -68,7 +68,7 @@ public abstract class with_replication_service : SpecificationWithDirectoryPerTe
queueStatsManager: new QueueStatsManager());

Service.Handle(new SystemMessage.SystemStart());
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid(),0));
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid()));

ReplicaSubscriptionId = AddSubscription(ReplicaId, true, out ReplicaManager1);
ReplicaSubscriptionId2 = AddSubscription(ReplicaId2, true, out ReplicaManager2);
Expand Down Expand Up @@ -114,7 +114,7 @@ public abstract class with_replication_service : SpecificationWithDirectoryPerTe

public abstract void When();
protected void BecomeLeader() {
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid(),0));
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid()));
}

protected void BecomeUnknown() {
Expand Down
Expand Up @@ -71,7 +71,7 @@ public abstract class with_replication_service_and_epoch_manager : Specification
new QueueStatsManager());

Service.Handle(new SystemMessage.SystemStart());
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid(),0));
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid()));

When();
}
Expand Down
Expand Up @@ -37,7 +37,7 @@ public abstract class with_clustered_replication_tracking_service:
public abstract void When();

protected void BecomeLeader() {
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid(),0));
Service.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid()));
}

protected void BecomeUnknown() {
Expand Down
Expand Up @@ -16,6 +16,8 @@
using EventStore.Core.Tests.Helpers;
using EventStore.Core.TransactionLog.LogRecords;
using System.Threading;
using EventStore.Core.Services;
using EventStore.Common.Utils;

namespace EventStore.Core.Tests.Services.Storage {
[TestFixture]
Expand Down Expand Up @@ -233,6 +235,23 @@ public sealed class when_having_TFLog_with_existing_epochs : SpecificationWithDi
Assert.That(_cache.First.Value.EpochNumber == _epochs[20].EpochNumber);
Assert.That(_cache.Last.Value.EpochNumber == _epochs[29].EpochNumber);

// can write an epoch with epoch information (even though previous epochs
// dont have epoch information)
_epochManager.WriteNewEpoch(GetNextEpoch());
_epochManager.WriteNewEpoch(GetNextEpoch());
var epochsWritten = _published.OfType<SystemMessage.EpochWritten>().ToArray();
Assert.AreEqual(2, epochsWritten.Length);
for (int i = 0; i < epochsWritten.Length; i++) {
_reader.Reposition(epochsWritten[i].Epoch.EpochPosition);
_reader.TryReadNext(); // read epoch
var result = _reader.TryReadNext(); // read epoch-information
Assert.True(result.Success);
var epochInfo = (PrepareLogRecord)result.LogRecord;
Assert.AreEqual(SystemStreams.EpochInformationStream, epochInfo.EventStreamId);
Assert.AreEqual(SystemEventTypes.EpochInformation, epochInfo.EventType);
Assert.AreEqual(i - 1, epochInfo.ExpectedVersion);
Assert.AreEqual(_instanceId, new EpochRecord(epochInfo.Data.ParseJson<EpochRecord.EpochRecordDto>()).LeaderInstanceId);
}
}
public void Dispose() {
try {
Expand Down
Expand Up @@ -16,6 +16,9 @@
using EventStore.Core.Tests.Helpers;
using EventStore.Core.TransactionLog.LogRecords;
using System.Threading;
using EventStore.Core.Services;
using EventStore.Core.Data;
using EventStore.Common.Utils;

namespace EventStore.Core.Tests.Services.Storage {
[TestFixture]
Expand Down Expand Up @@ -98,7 +101,6 @@ public sealed class when_having_an_epoch_manager_and_empty_tf_log : Specificatio
Assert.That(epochWritten.Epoch.LeaderInstanceId == _instanceId);
Assert.That(epochWritten.Epoch.TimeStamp < DateTime.UtcNow);
Assert.That(epochWritten.Epoch.TimeStamp >= beforeWrite);
_published.Clear();

// will_cache_epochs_written() {

Expand Down Expand Up @@ -131,6 +133,22 @@ public sealed class when_having_an_epoch_manager_and_empty_tf_log : Specificatio
epoch = epoch.Next;
}
CollectionAssert.IsOrdered(epochs);

// has written epoch information
var epochsWritten = _published.OfType<SystemMessage.EpochWritten>().ToArray();
Assert.AreEqual(1 + 4 + 16, epochsWritten.Length);
for (int i = 0; i < epochsWritten.Length; i++) {
_reader.Reposition(epochsWritten[i].Epoch.EpochPosition);
_reader.TryReadNext(); // read epoch
var result = _reader.TryReadNext(); // read epoch-information
Assert.True(result.Success);
var epochInfo = (PrepareLogRecord)result.LogRecord;
Assert.AreEqual(SystemStreams.EpochInformationStream, epochInfo.EventStreamId);
Assert.AreEqual(SystemEventTypes.EpochInformation, epochInfo.EventType);
Assert.AreEqual(i - 1, epochInfo.ExpectedVersion);
Assert.AreEqual(_instanceId, new EpochRecord(epochInfo.Data.ParseJson<EpochRecord.EpochRecordDto>()).LeaderInstanceId);
}
_published.Clear();
}

public void Dispose() {
Expand Down
@@ -0,0 +1,106 @@
using System;
using EventStore.Core.Cluster;
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
using EventStore.Core.Services.VNode;
using EventStore.Core.Tests.Fakes;
using EventStore.Core.TransactionLog.Checkpoint;
using EventStore.Core.TransactionLog.LogRecords;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.VNode.InaugurationManagement {
public abstract class InaugurationManagerTests {
protected readonly Guid _correlationId1 = Guid.Parse("00000000-0000-0000-0000-0000000000c1");
protected readonly Guid _correlationId2 = Guid.Parse("00000000-0000-0000-0000-0000000000c2");
protected readonly Guid _correlationId3 = Guid.Parse("00000000-0000-0000-0000-0000000000c3");
protected readonly int _epochNumber = 15;
protected readonly MemberInfo _leader =
new MemberInfo(new MemberInfoDto {
InternalTcpIp = "localhost",
HttpEndPointIp = "localhost",
});
protected readonly long _replicationTarget = 400;
protected readonly long _indexTarget = 400;

protected InaugurationManager _sut;
protected FakePublisher _publisher;
protected InMemoryCheckpoint _replicationCheckpoint;
protected InMemoryCheckpoint _indexCheckpoint;

[SetUp]
public void SetUp() {
_publisher = new FakePublisher();
_replicationCheckpoint = new InMemoryCheckpoint();
_indexCheckpoint = new InMemoryCheckpoint();
_sut = new InaugurationManager(_publisher, _replicationCheckpoint, _indexCheckpoint);
Given();
}

protected abstract void Given();

protected void When(Message m) {
_sut.Handle((dynamic)m);
}

protected SystemMessage.EpochWritten GenEpoch(int epochNumber) {
var epoch = new SystemMessage.EpochWritten(new EpochRecord(
epochPosition: _replicationTarget - 1,
epochNumber: epochNumber,
epochId: Guid.NewGuid(),
prevEpochPosition: _replicationTarget - 20,
timeStamp: DateTime.UtcNow,
leaderInstanceId: Guid.NewGuid()));
return epoch;
}

protected void ProgressReplication() {
_replicationCheckpoint.Write(_replicationTarget / 2);
_replicationCheckpoint.Flush();
}

protected void CompleteReplication() {
_replicationCheckpoint.Write(_replicationTarget);
_replicationCheckpoint.Flush();
}

protected void ProgressIndexing() {
_indexCheckpoint.Write(_indexTarget / 2);
_indexCheckpoint.Flush();
}

protected void CompleteIndexing() {
_indexCheckpoint.Write(_indexTarget);
_indexCheckpoint.Flush();
}

protected static T AssertIsType<T>(object o) {
Assert.IsInstanceOf<T>(o);
return (T)o;
}

// check that we have reset to initial state, do this by checking we can
// proceed forward from it
protected void AssertInitial() {
Assert.IsEmpty(_publisher.Messages);
_sut.Handle(new SystemMessage.BecomePreLeader(_correlationId3));
AssertWaitingForChaser(_correlationId3);
}

// check that we have reset to waiting fo chaser, do this by checking we can
// proceed forward from it
protected void AssertWaitingForChaser(Guid expectedCorrelationId) {
Assert.IsEmpty(_publisher.Messages);
_sut.Handle(new SystemMessage.ChaserCaughtUp(expectedCorrelationId));
Assert.AreEqual(1, _publisher.Messages.Count);
Assert.IsInstanceOf<SystemMessage.WriteEpoch>(_publisher.Messages[0]);
}

protected void AssertSentBecomeLeader() {
Assert.AreEqual(1, _publisher.Messages.Count);
var becomeLeader = AssertIsType<SystemMessage.BecomeLeader>(_publisher.Messages[0]);
Assert.AreEqual(_correlationId1, becomeLeader.CorrelationId);
_publisher.Messages.Clear();
AssertInitial();
}
}
}
@@ -0,0 +1,24 @@
using System;
using EventStore.Core.Messages;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.VNode.InaugurationManagement {
[TestFixture]
public class given_initial_state : InaugurationManagerTests {
protected override void Given() {
_publisher.Messages.Clear();
}

[Test]
public void when_become_pre_leader() {
When(new SystemMessage.BecomePreLeader(_correlationId1));
AssertWaitingForChaser(_correlationId1);
}

[Test]
public void when_become_other_node_state() {
When(new SystemMessage.BecomeUnknown(Guid.NewGuid()));
AssertInitial();
}
}
}
@@ -0,0 +1,41 @@
using System;
using EventStore.Core.Messages;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.VNode.InaugurationManagement {

[TestFixture]
public class given_waiting_for_chaser : InaugurationManagerTests {
protected override void Given() {
_sut.Handle(new ElectionMessage.ElectionsDone(123, _epochNumber, _leader));
_sut.Handle(new SystemMessage.BecomePreLeader(_correlationId1));
_publisher.Messages.Clear();
}

[Test]
public void when_chaser_caught_up() {
When(new SystemMessage.ChaserCaughtUp(_correlationId1));
Assert.AreEqual(1, _publisher.Messages.Count);
var writeEpoch = AssertIsType<SystemMessage.WriteEpoch>(_publisher.Messages[0]);
Assert.AreEqual(_epochNumber, writeEpoch.EpochNumber);
}

[Test]
public void when_chaser_caught_up_with_unknown_correlation_id() {
When(new SystemMessage.ChaserCaughtUp(_correlationId2));
Assert.IsEmpty(_publisher.Messages);
}

[Test]
public void when_become_pre_leader() {
When(new SystemMessage.BecomePreLeader(_correlationId2));
AssertWaitingForChaser(_correlationId2);
}

[Test]
public void when_become_other_node_state() {
When(new SystemMessage.BecomeUnknown(Guid.NewGuid()));
AssertInitial();
}
}
}