Skip to content

Commit

Permalink
Merge pull request #2454 from EventStore/write-leader-in-epoch-record
Browse files Browse the repository at this point in the history
Write leader instance ID in epoch record and pass on epoch's leader's ID and gossip info to leader of elections
  • Loading branch information
hayley-jean committed May 28, 2020
2 parents fdad400 + cd3760c commit a4ee0ea
Show file tree
Hide file tree
Showing 12 changed files with 618 additions and 305 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,14 @@ public class ChoosingLeaderTests {
for (int index = 0; index < 3; index++) {
members[index] = CreateMemberInfo(index, epochId, lastCommitPosition, writerCheckpoint,
chaserCheckpoint, nodePriority);
}

var clusterInfo = new ClusterInfo(members);
var previousLeaderId = tc.LastElectedLeader.HasValue ? IdForNode(tc.LastElectedLeader.Value) : Guid.Empty;

for (int index = 0; index < 3; index++) {
var prepareOk = CreatePrepareOk(index, epochId, lastCommitPosition, writerCheckpoint, chaserCheckpoint,
nodePriority);
nodePriority, previousLeaderId, clusterInfo);
prepareOks.Add(prepareOk.ServerId, prepareOk);
}

Expand All @@ -250,12 +256,12 @@ public class ChoosingLeaderTests {
var resigningLeadership = tc.ResigningLeader.HasValue
? (Guid?)IdForNode(tc.ResigningLeader.Value)
: null;
var mc = SUT.GetBestLeaderCandidate(prepareOks, members, lastElectedLeader, resigningLeadership);
var mc = SUT.GetBestLeaderCandidate(prepareOks, members, resigningLeadership, 0);

Assert.AreEqual(IdForNode(tc.ExpectedLeaderCandidateNode), mc.InstanceId);

var ownInfo = CreateLeaderCandidate(1, epochId, lastCommitPosition, writerCheckpoint, chaserCheckpoint,
nodePriority);
nodePriority, previousLeaderId);

var isLegit = SUT.IsLegitimateLeader(1, EndpointForNode(tc.ProposingNode),
IdForNode(tc.ProposingNode), mc, members, null, members[0].InstanceId,
Expand All @@ -269,21 +275,24 @@ public class ChoosingLeaderTests {
Func<int, long> lastCommitPosition,
Func<int, long> writerCheckpoint,
Func<int, long> chaserCheckpoint,
Func<int, int> nodePriority) {
Func<int, int> nodePriority,
Guid previousLeaderId,
ClusterInfo clusterInfo) {
var id = IdForNode(i);
var ep = EndpointForNode(i);
return new ElectionMessage.PrepareOk(1, id, ep, 1, 1, epochId, lastCommitPosition(i), writerCheckpoint(i),
chaserCheckpoint(i), nodePriority(i));
return new ElectionMessage.PrepareOk(1, id, ep, 1, 1, epochId, previousLeaderId, lastCommitPosition(i), writerCheckpoint(i),
chaserCheckpoint(i), nodePriority(i), clusterInfo);
}

static SUT.LeaderCandidate CreateLeaderCandidate(int i, Guid epochId,
Func<int, long> lastCommitPosition,
Func<int, long> writerCheckpoint,
Func<int, long> chaserCheckpoint,
Func<int, int> nodePriority) {
Func<int, int> nodePriority,
Guid previousLeaderId) {
var id = IdForNode(i);
var ep = EndpointForNode(i);
return new SUT.LeaderCandidate(id, ep, 1, 1, epochId, lastCommitPosition(i), writerCheckpoint(i),
return new SUT.LeaderCandidate(id, ep, 1, 1, epochId, previousLeaderId, lastCommitPosition(i), writerCheckpoint(i),
chaserCheckpoint(i), nodePriority(i));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class when_chaser_reads_system_event : with_storage_chaser_service {
public override void When() {
_epochId = Guid.NewGuid();
_epochNumber = 7;
var epoch = new EpochRecord(0, _epochNumber, _epochId, -1, DateTime.UtcNow);
var epoch = new EpochRecord(0, _epochNumber, _epochId, -1, DateTime.UtcNow, Guid.Empty);
var rec = new SystemLogRecord(epoch.EpochPosition, epoch.TimeStamp, SystemRecordType.Epoch,
SystemRecordSerialization.Json, epoch.AsSerialized());

Expand Down
17 changes: 10 additions & 7 deletions src/EventStore.Core/Cluster/EventStoreClusterClient.Elections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public partial class EventStoreClusterClient {
public void SendPrepareOk(ElectionMessage.PrepareOk prepareOk, EndPoint destinationEndpoint,
DateTime deadline) {
SendPrepareOkAsync(prepareOk.View, prepareOk.ServerId, prepareOk.ServerHttpEndPoint, prepareOk.EpochNumber,
prepareOk.EpochPosition, prepareOk.EpochId, prepareOk.LastCommitPosition,
prepareOk.EpochPosition, prepareOk.EpochId, prepareOk.EpochLeaderInstanceId, prepareOk.LastCommitPosition,
prepareOk.WriterCheckpoint,
prepareOk.ChaserCheckpoint, prepareOk.NodePriority, deadline)
prepareOk.ChaserCheckpoint, prepareOk.NodePriority, prepareOk.ClusterInfo, deadline)
.ContinueWith(r => {
if (r.Exception != null) {
Log.Information(r.Exception, "Prepare OK Send Failed to {Server}",
Expand All @@ -53,7 +53,7 @@ public partial class EventStoreClusterClient {
public void SendProposal(ElectionMessage.Proposal proposal, EndPoint destinationEndpoint, DateTime deadline) {
SendProposalAsync(proposal.ServerId, proposal.ServerHttpEndPoint, proposal.LeaderId,
proposal.LeaderHttpEndPoint,
proposal.View, proposal.EpochNumber, proposal.EpochPosition, proposal.EpochId,
proposal.View, proposal.EpochNumber, proposal.EpochPosition, proposal.EpochId,proposal.EpochLeaderInstanceId,
proposal.LastCommitPosition, proposal.WriterCheckpoint, proposal.ChaserCheckpoint,
proposal.NodePriority,
deadline)
Expand Down Expand Up @@ -124,25 +124,27 @@ public partial class EventStoreClusterClient {
}

private async Task SendPrepareOkAsync(int view, Guid serverId, EndPoint serverHttpEndPoint, int epochNumber,
long epochPosition, Guid epochId, long lastCommitPosition, long writerCheckpoint, long chaserCheckpoint,
int nodePriority, DateTime deadline) {
long epochPosition, Guid epochId, Guid epochLeaderInstanceId, long lastCommitPosition, long writerCheckpoint, long chaserCheckpoint,
int nodePriority, ClusterInfo clusterInfo, DateTime deadline) {
var request = new PrepareOkRequest {
View = view,
ServerId = Uuid.FromGuid(serverId).ToDto(),
ServerHttp = new GossipEndPoint(serverHttpEndPoint.GetHost(), (uint)serverHttpEndPoint.GetPort()),
EpochNumber = epochNumber,
EpochPosition = epochPosition,
EpochId = Uuid.FromGuid(epochId).ToDto(),
EpochLeaderInstanceId = Uuid.FromGuid(epochLeaderInstanceId).ToDto(),
LastCommitPosition = lastCommitPosition,
WriterCheckpoint = writerCheckpoint,
ChaserCheckpoint = chaserCheckpoint,
NodePriority = nodePriority
NodePriority = nodePriority,
ClusterInfo = ClusterInfo.ToGrpcClusterInfo(clusterInfo)
};
await _electionsClient.PrepareOkAsync(request, deadline: deadline.ToUniversalTime());
}

private async Task SendProposalAsync(Guid serverId, EndPoint serverHttpEndPoint, Guid leaderId,
EndPoint leaderHttp, int view, int epochNumber, long epochPosition, Guid epochId,
EndPoint leaderHttp, int view, int epochNumber, long epochPosition, Guid epochId, Guid epochLeaderInstanceId,
long lastCommitPosition, long writerCheckpoint, long chaserCheckpoint, int nodePriority,
DateTime deadline) {
var request = new ProposalRequest {
Expand All @@ -154,6 +156,7 @@ public partial class EventStoreClusterClient {
EpochNumber = epochNumber,
EpochPosition = epochPosition,
EpochId = Uuid.FromGuid(epochId).ToDto(),
EpochLeaderInstanceId = Uuid.FromGuid(epochLeaderInstanceId).ToDto(),
LastCommitPosition = lastCommitPosition,
WriterCheckpoint = writerCheckpoint,
ChaserCheckpoint = chaserCheckpoint,
Expand Down
3 changes: 2 additions & 1 deletion src/EventStore.Core/ClusterVNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ public class ClusterVNode :
initialReaderCount: 1,
maxReaderCount: 5,
readerFactory: () => new TFChunkReader(db, db.Config.WriterCheckpoint,
optimizeReadSideCache: db.Config.OptimizeReadSideCache));
optimizeReadSideCache: db.Config.OptimizeReadSideCache),
_nodeInfo.InstanceId);
epochManager.Init();

var storageWriter = new ClusterStorageWriterService(_mainQueue, _mainBus, vNodeSettings.MinFlushDelay,
Expand Down
23 changes: 17 additions & 6 deletions src/EventStore.Core/Messages/ElectionMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,31 +155,37 @@ public class PrepareOk : Message {
public readonly int EpochNumber;
public readonly long EpochPosition;
public readonly Guid EpochId;
public readonly Guid EpochLeaderInstanceId;
public readonly long LastCommitPosition;
public readonly long WriterCheckpoint;
public readonly long ChaserCheckpoint;
public readonly int NodePriority;
public readonly ClusterInfo ClusterInfo;

public PrepareOk(int view,
Guid serverId,
EndPoint serverHttpEndPoint,
int epochNumber,
long epochPosition,
Guid epochId,
Guid epochLeaderInstanceId,
long lastCommitPosition,
long writerCheckpoint,
long chaserCheckpoint,
int nodePriority) {
int nodePriority,
ClusterInfo clusterInfo) {
View = view;
ServerId = serverId;
ServerHttpEndPoint = serverHttpEndPoint;
EpochNumber = epochNumber;
EpochPosition = epochPosition;
EpochId = epochId;
EpochLeaderInstanceId = epochLeaderInstanceId;
LastCommitPosition = lastCommitPosition;
WriterCheckpoint = writerCheckpoint;
ChaserCheckpoint = chaserCheckpoint;
NodePriority = nodePriority;
ClusterInfo = clusterInfo;
}

public PrepareOk(ElectionMessageDto.PrepareOkDto dto) {
Expand All @@ -190,18 +196,20 @@ public class PrepareOk : Message {
EpochNumber = dto.EpochNumber;
EpochPosition = dto.EpochPosition;
EpochId = dto.EpochId;
EpochLeaderInstanceId = dto.EpochLeaderInstanceId;
LastCommitPosition = dto.LastCommitPosition;
WriterCheckpoint = dto.WriterCheckpoint;
ChaserCheckpoint = dto.ChaserCheckpoint;
NodePriority = dto.NodePriority;
ClusterInfo = dto.ClusterInfo;
}

public override string ToString() {
return string.Format(
"---- PrepareOk: view {0}, serverId {1}, serverHttp {2}, epochNumber {3}, " +
"epochPosition {4}, epochId {5}, lastCommitPosition {6}, writerCheckpoint {7}, chaserCheckpoint {8}, nodePriority: {9}",
"epochPosition {4}, epochId {5}, epochLeaderInstanceId {6:B}, lastCommitPosition {7}, writerCheckpoint {8}, chaserCheckpoint {9}, nodePriority: {10}, clusterInfo: {11}",
View, ServerId, ServerHttpEndPoint, EpochNumber,
EpochPosition, EpochId, LastCommitPosition, WriterCheckpoint, ChaserCheckpoint, NodePriority);
EpochPosition, EpochId, EpochLeaderInstanceId, LastCommitPosition, WriterCheckpoint, ChaserCheckpoint, NodePriority, ClusterInfo);
}
}

Expand All @@ -221,13 +229,14 @@ public class Proposal : Message {
public readonly int EpochNumber;
public readonly long EpochPosition;
public readonly Guid EpochId;
public readonly Guid EpochLeaderInstanceId;
public readonly long LastCommitPosition;
public readonly long WriterCheckpoint;
public readonly long ChaserCheckpoint;
public readonly int NodePriority;

public Proposal(Guid serverId, EndPoint serverHttpEndPoint, Guid leaderId, EndPoint leaderHttpEndPoint,
int view, int epochNumber, long epochPosition, Guid epochId,
int view, int epochNumber, long epochPosition, Guid epochId, Guid epochLeaderInstanceId,
long lastCommitPosition, long writerCheckpoint, long chaserCheckpoint, int nodePriority) {
ServerId = serverId;
ServerHttpEndPoint = serverHttpEndPoint;
Expand All @@ -237,6 +246,7 @@ public class Proposal : Message {
EpochNumber = epochNumber;
EpochPosition = epochPosition;
EpochId = epochId;
EpochLeaderInstanceId = epochLeaderInstanceId;
LastCommitPosition = lastCommitPosition;
WriterCheckpoint = writerCheckpoint;
ChaserCheckpoint = chaserCheckpoint;
Expand All @@ -254,6 +264,7 @@ public class Proposal : Message {
EpochNumber = dto.EpochNumber;
EpochPosition = dto.EpochPosition;
EpochId = dto.EpochId;
EpochLeaderInstanceId = dto.EpochLeaderInstanceId;
LastCommitPosition = dto.LastCommitPosition;
WriterCheckpoint = dto.WriterCheckpoint;
ChaserCheckpoint = dto.ChaserCheckpoint;
Expand All @@ -263,10 +274,10 @@ public class Proposal : Message {
public override string ToString() {
return string.Format(
"---- Proposal: serverId {0}, serverHttp {1}, leaderId {2}, leaderHttp {3}, "
+ "view {4}, lastCommitCheckpoint {5}, writerCheckpoint {6}, chaserCheckpoint {7}, epoch {8}@{9}:{10:B}, NodePriority {11}",
+ "view {4}, lastCommitCheckpoint {5}, writerCheckpoint {6}, chaserCheckpoint {7}, epoch {8}@{9}:{10:B} (L={11:B}), NodePriority {12}",
ServerId, ServerHttpEndPoint, LeaderId, LeaderHttpEndPoint,
View, LastCommitPosition, WriterCheckpoint, ChaserCheckpoint,
EpochNumber, EpochPosition, EpochId, NodePriority);
EpochNumber, EpochPosition, EpochId, EpochLeaderInstanceId, NodePriority);
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/EventStore.Core/Messages/ElectionMessageDtos.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using EventStore.Common.Utils;
using EventStore.Core.Cluster;

namespace EventStore.Core.Messages {
public static class ElectionMessageDto {
Expand Down Expand Up @@ -70,11 +71,13 @@ public class PrepareOkDto {
public int EpochNumber { get; set; }
public long EpochPosition { get; set; }
public Guid EpochId { get; set; }
public Guid EpochLeaderInstanceId { get; set; }
public long LastCommitPosition { get; set; }
public long WriterCheckpoint { get; set; }
public long ChaserCheckpoint { get; set; }

public int NodePriority { get; set; }
public ClusterInfo ClusterInfo { get; set; }

public PrepareOkDto() {
}
Expand All @@ -89,11 +92,13 @@ public class PrepareOkDto {
EpochNumber = message.EpochNumber;
EpochPosition = message.EpochPosition;
EpochId = message.EpochId;
EpochLeaderInstanceId = message.EpochLeaderInstanceId;
LastCommitPosition = message.LastCommitPosition;
WriterCheckpoint = message.WriterCheckpoint;
ChaserCheckpoint = message.ChaserCheckpoint;

NodePriority = message.NodePriority;
ClusterInfo = message.ClusterInfo;
}
}

Expand All @@ -115,6 +120,7 @@ public class ProposalDto {
public int EpochNumber { get; set; }
public long EpochPosition { get; set; }
public Guid EpochId { get; set; }
public Guid EpochLeaderInstanceId { get; set; }
public int NodePriority { get; set; }

public ProposalDto() {
Expand All @@ -133,6 +139,7 @@ public class ProposalDto {
EpochNumber = message.EpochNumber;
EpochPosition = message.EpochPosition;
EpochId = message.EpochId;
EpochLeaderInstanceId = message.EpochLeaderInstanceId;
LastCommitPosition = message.LastCommitPosition;
WriterCheckpoint = message.WriterCheckpoint;
ChaserCheckpoint = message.ChaserCheckpoint;
Expand Down
Loading

0 comments on commit a4ee0ea

Please sign in to comment.