Skip to content

Commit

Permalink
Adding the notion (but not wiring yet) of backup replication
Browse files Browse the repository at this point in the history
  • Loading branch information
ayenderahien committed Jun 5, 2009
1 parent 32a1799 commit cdd5a19
Show file tree
Hide file tree
Showing 23 changed files with 660 additions and 311 deletions.
Expand Up @@ -62,7 +62,7 @@ public void CanCatchUpOnSegment()
}); });
var segments = masterProxy.Join(endpoint); var segments = masterProxy.Join(endpoint);


masterProxy.CaughtUp(endpoint, segments[0].Index, segments[1].Index); masterProxy.CaughtUp(endpoint, ReplicationType.Ownership, segments[0].Index, segments[1].Index);


var topology = masterProxy.GetTopology(); var topology = masterProxy.GetTopology();
Assert.Equal(endpoint, topology.Segments[segments[0].Index].AssignedEndpoint); Assert.Equal(endpoint, topology.Segments[segments[0].Index].AssignedEndpoint);
Expand Down Expand Up @@ -90,7 +90,7 @@ public void CanGiveUpOnSegment()


var segments = masterProxy.Join(newEndpoint); var segments = masterProxy.Join(newEndpoint);


masterProxy.GaveUp(newEndpoint, segments[0].Index, segments[1].Index); masterProxy.GaveUp(newEndpoint, ReplicationType.Ownership, segments[0].Index, segments[1].Index);


var topology = masterProxy.GetTopology(); var topology = masterProxy.GetTopology();
Assert.Equal(existingEndpoint,topology.Segments[segments[0].Index].AssignedEndpoint); Assert.Equal(existingEndpoint,topology.Segments[segments[0].Index].AssignedEndpoint);
Expand Down
Expand Up @@ -30,6 +30,7 @@ public OnlineSegmentReplicationCommandTest()
command = new OnlineSegmentReplicationCommand( command = new OnlineSegmentReplicationCommand(
endpoint, endpoint,
new[] { new Segment { Index = 0 }, new Segment { Index = 1 }, }, new[] { new Segment { Index = 0 }, new Segment { Index = 1 }, },
ReplicationType.Ownership,
node, node,
replication); replication);
} }
Expand Down Expand Up @@ -67,7 +68,7 @@ public void WillLetNodeKnowAboutAnyEmptySegmentsAssignedToIt()
var success = command.Execute(); var success = command.Execute();
Assert.True(success); Assert.True(success);


node.AssertWasCalled(x => x.DoneReplicatingSegments(new int[] { 0 })); node.AssertWasCalled(x => x.DoneReplicatingSegments(ReplicationType.Ownership, new int[] { 0 }));
} }


[Fact] [Fact]
Expand Down Expand Up @@ -162,8 +163,8 @@ public void WhenSegmentReplicationFailsWillGiveUpTheSegment()
var success = command.Execute(); var success = command.Execute();
Assert.False(success); Assert.False(success);


node.AssertWasCalled(x=>x.GivingUpOn(0)); node.AssertWasCalled(x => x.GivingUpOn(ReplicationType.Ownership, 0));
node.AssertWasCalled(x => x.GivingUpOn(1)); node.AssertWasCalled(x => x.GivingUpOn(ReplicationType.Ownership, 1));
} }


[Fact] [Fact]
Expand All @@ -174,7 +175,7 @@ public void WhenEmptySegmentReplicationFailsWillGiveEverythingUp()
var success = command.Execute(); var success = command.Execute();
Assert.False(success); Assert.False(success);


node.AssertWasCalled(x => x.GivingUpOn(0,1)); node.AssertWasCalled(x => x.GivingUpOn(ReplicationType.Ownership, 0, 1));
} }


[Fact] [Fact]
Expand Down
28 changes: 14 additions & 14 deletions Rhino.DistributedHashTable.Tests/BackCopiesBehavior.cs
Expand Up @@ -23,7 +23,7 @@ public void AddingNewNodeResultInAllSegmentsHavingNoBackupCopies()
{ {
master.Join(endPoint); master.Join(endPoint);


Assert.True(master.Segments.All(x => x.Backups.Count == 0)); Assert.True(master.Segments.All(x => x.PendingBackups.Count == 0));
} }
} }


Expand All @@ -39,15 +39,15 @@ public OnMasterWithOneExistingNode()


var existingEndpoint = NodeEndpoint.ForTest(3); var existingEndpoint = NodeEndpoint.ForTest(3);
var ranges = master.Join(existingEndpoint); var ranges = master.Join(existingEndpoint);
master.CaughtUp(existingEndpoint, ranges.Select(x=>x.Index).ToArray()); master.CaughtUp(existingEndpoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
} }


[Fact] [Fact]
public void AddingNewNodeResultInAllSegmentsHavingSingleBackupCopy() public void AddingNewNodeResultInAllSegmentsHavingSingleBackupCopy()
{ {
var ranges = master.Join(endPoint); var ranges = master.Join(endPoint);
master.CaughtUp(endPoint, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(endPoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
Assert.True(master.Segments.All(x => x.Backups.Count == 1)); Assert.True(master.Segments.All(x => x.PendingBackups.Count == 1));
} }


[Fact] [Fact]
Expand All @@ -56,7 +56,7 @@ public void AddingNewNodeWillRaiseBackupChangedEvent()
bool wasChanged = false; bool wasChanged = false;
master.BackupChanged += (state, point, range) => wasChanged = true; master.BackupChanged += (state, point, range) => wasChanged = true;
var ranges = master.Join(endPoint); var ranges = master.Join(endPoint);
master.CaughtUp(endPoint, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(endPoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());


Assert.True(wasChanged); Assert.True(wasChanged);
} }
Expand All @@ -74,18 +74,18 @@ public OnMasterWithTwoNodes()


var existingEndpoint = NodeEndpoint.ForTest(3); var existingEndpoint = NodeEndpoint.ForTest(3);
var ranges = master.Join(existingEndpoint); var ranges = master.Join(existingEndpoint);
master.CaughtUp(existingEndpoint, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(existingEndpoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
var anotherPoint = NodeEndpoint.ForTest(10); var anotherPoint = NodeEndpoint.ForTest(10);
ranges = master.Join(anotherPoint); ranges = master.Join(anotherPoint);
master.CaughtUp(anotherPoint, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(anotherPoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
} }


[Fact] [Fact]
public void AddingNewNodeResultInAllSegmentsHavingTwoBackupCopy() public void AddingNewNodeResultInAllSegmentsHavingTwoBackupCopy()
{ {
var ranges = master.Join(endPoint); var ranges = master.Join(endPoint);
master.CaughtUp(endPoint, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(endPoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
Assert.True(master.Segments.All(x => x.Backups.Count == 2)); Assert.True(master.Segments.All(x => x.PendingBackups.Count == 2));
} }
} }


Expand All @@ -101,21 +101,21 @@ public OnMasterWithThreeNodes()


var existingEndpoint = NodeEndpoint.ForTest(3); var existingEndpoint = NodeEndpoint.ForTest(3);
var ranges = master.Join(existingEndpoint); var ranges = master.Join(existingEndpoint);
master.CaughtUp(existingEndpoint, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(existingEndpoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
var anotherPoint = NodeEndpoint.ForTest(10); var anotherPoint = NodeEndpoint.ForTest(10);
ranges = master.Join(anotherPoint); ranges = master.Join(anotherPoint);
master.CaughtUp(anotherPoint, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(anotherPoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
ranges = master.Join(endPoint); ranges = master.Join(endPoint);
master.CaughtUp(endPoint, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(endPoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
} }


[Fact] [Fact]
public void AddingNewNodeResultInAllSegmentsHavingAtLeastTwoBackupCopy() public void AddingNewNodeResultInAllSegmentsHavingAtLeastTwoBackupCopy()
{ {
var yetAnotherEndPoint = NodeEndpoint.ForTest(7); var yetAnotherEndPoint = NodeEndpoint.ForTest(7);
var ranges = master.Join(yetAnotherEndPoint); var ranges = master.Join(yetAnotherEndPoint);
master.CaughtUp(yetAnotherEndPoint, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(yetAnotherEndPoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
Assert.True(master.Segments.All(x => x.Backups.Count >= 2)); Assert.True(master.Segments.All(x => x.PendingBackups.Count >= 2));
} }
} }
} }
Expand Down
2 changes: 1 addition & 1 deletion Rhino.DistributedHashTable.Tests/MasterCaughtUpBehavior.cs
Expand Up @@ -25,7 +25,7 @@ public void WillRaiseTopologyChangedEvent()


bool wasCalled = false; bool wasCalled = false;
master.TopologyChanged += () => wasCalled = true; master.TopologyChanged += () => wasCalled = true;
master.CaughtUp(endPoint, ranges.First().Index); master.CaughtUp(endPoint, ReplicationType.Ownership, ranges.First().Index);
Assert.True(wasCalled); Assert.True(wasCalled);
} }
} }
Expand Down
7 changes: 4 additions & 3 deletions Rhino.DistributedHashTable.Tests/MasterGaveUpBehavior.cs
Expand Up @@ -15,7 +15,8 @@ public OnGaveUp()
{ {
master = new DistributedHashTableMaster(); master = new DistributedHashTableMaster();
master.CaughtUp(NodeEndpoint.ForTest(9), master.CaughtUp(NodeEndpoint.ForTest(9),
master.Join(NodeEndpoint.ForTest(9)).Select(x=>x.Index).ToArray()); ReplicationType.Ownership,
master.Join(NodeEndpoint.ForTest(9)).Select(x => x.Index).ToArray());
endPoint = NodeEndpoint.ForTest(5); endPoint = NodeEndpoint.ForTest(5);
} }


Expand All @@ -26,8 +27,8 @@ public void WillRemoveThePendingMoveFromTheSegment()


var range = ranges.First(); var range = ranges.First();
Assert.NotNull(range.InProcessOfMovingToEndpoint); Assert.NotNull(range.InProcessOfMovingToEndpoint);

master.GaveUp(endPoint, range.Index); master.GaveUp(endPoint, ReplicationType.Ownership, range.Index);


Assert.Null(range.InProcessOfMovingToEndpoint); Assert.Null(range.InProcessOfMovingToEndpoint);
} }
Expand Down
2 changes: 1 addition & 1 deletion Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs
Expand Up @@ -112,7 +112,7 @@ public NewEndpointJoiningMasterWithTwoNodes()
{ {
master.Join(endPoint); master.Join(endPoint);
var ranges = master.Join(anotherNodeInTheMaster); var ranges = master.Join(anotherNodeInTheMaster);
master.CaughtUp(anotherNodeInTheMaster, ranges.Select(x => x.Index).ToArray()); master.CaughtUp(anotherNodeInTheMaster, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
master.Join(newEndpoint); master.Join(newEndpoint);
} }


Expand Down
106 changes: 99 additions & 7 deletions Rhino.DistributedHashTable.Tests/NodeReplicationBehavior.cs
@@ -1,4 +1,5 @@
using System.Net; using System.Net;
using Rhino.DistributedHashTable.Commands;
using Rhino.DistributedHashTable.Internal; using Rhino.DistributedHashTable.Internal;
using Rhino.DistributedHashTable.Parameters; using Rhino.DistributedHashTable.Parameters;
using Rhino.DistributedHashTable.Remote; using Rhino.DistributedHashTable.Remote;
Expand Down Expand Up @@ -31,16 +32,15 @@ public WhenFinishedReplicatingSegment()
[Fact] [Fact]
public void StateWillBeStarted() public void StateWillBeStarted()
{ {
node.DoneReplicatingSegments(new[] { 0 }); node.DoneReplicatingSegments(ReplicationType.Ownership, new[] { 0 });
Assert.Equal(NodeState.Started, node.State); Assert.Equal(NodeState.Started, node.State);
} }


[Fact] [Fact]
public void WillLetMasterKnowItCaughtUp() public void WillLetMasterKnowItCaughtUp()
{ {
var range = new Segment(); node.DoneReplicatingSegments(ReplicationType.Ownership, new[] { 0 });
node.DoneReplicatingSegments(new[] { 0 }); master.AssertWasCalled(x => x.CaughtUp(node.Endpoint, ReplicationType.Ownership, 0));
master.AssertWasCalled(x => x.CaughtUp(node.Endpoint, 0));
} }
} }


Expand All @@ -51,7 +51,7 @@ public class WhenReplicatingRequestToOwner
private readonly IExecuter executer; private readonly IExecuter executer;
private readonly NodeEndpoint endPoint; private readonly NodeEndpoint endPoint;
private readonly IQueueManager queueManager; private readonly IQueueManager queueManager;
private Topology topology; private readonly Topology topology;
private static NodeEndpoint backup1; private static NodeEndpoint backup1;
private static NodeEndpoint backup2; private static NodeEndpoint backup2;


Expand All @@ -67,7 +67,7 @@ public WhenReplicatingRequestToOwner()
{ {
Index = 0, Index = 0,
AssignedEndpoint = endPoint, AssignedEndpoint = endPoint,
Backups = PendingBackups =
{ {
backup1, backup1,
backup2, backup2,
Expand All @@ -77,7 +77,7 @@ public WhenReplicatingRequestToOwner()
{ {
Index = 1, Index = 1,
AssignedEndpoint = backup1, AssignedEndpoint = backup1,
Backups = PendingBackups =
{ {
endPoint, endPoint,
backup2, backup2,
Expand Down Expand Up @@ -121,5 +121,97 @@ public void WhenSendingToOtherBackupsFromBackupNode()
queueManager.Send(backup2.Async, Arg<MessagePayload>.Is.TypeOf); queueManager.Send(backup2.Async, Arg<MessagePayload>.Is.TypeOf);
} }
} }

public class WhenTopologyIsUpdated
{
private readonly DistributedHashTableNode node;
private readonly IDistributedHashTableMaster master;
private readonly IExecuter executer;
private readonly NodeEndpoint endPoint;

public WhenTopologyIsUpdated()
{
master = MockRepository.GenerateStub<IDistributedHashTableMaster>();
executer = MockRepository.GenerateStub<IExecuter>();
endPoint = NodeEndpoint.ForTest(1);
master.Stub(x => x.Join(Arg.Is(endPoint)))
.Return(new Segment[0]);
node = new DistributedHashTableNode(master, executer, new BinaryMessageSerializer(), endPoint, MockRepository.GenerateStub<IQueueManager>(),
MockRepository.GenerateStub<IDistributedHashTableNodeReplicationFactory>());
}

[Fact]
public void TopologyContainsPendingBackupsForCurrentNodeWillStartsBackupReplication()
{
node.SetTopology(new Topology(new[]
{
new Segment
{
AssignedEndpoint = NodeEndpoint.ForTest(91),
PendingBackups = {endPoint}
},
}));

executer.AssertWasCalled(x=>x.RegisterForExecution(Arg<OnlineSegmentReplicationCommand>.Is.TypeOf));
}

[Fact]
public void WillNotStartReplicationIfCurrentlyReplicatingBackups()
{
node.SetTopology(new Topology(new[]
{
new Segment
{
AssignedEndpoint = NodeEndpoint.ForTest(91),
PendingBackups = {endPoint}
},
}));

node.SetTopology(new Topology(new[]
{
new Segment
{
AssignedEndpoint = NodeEndpoint.ForTest(91),
PendingBackups = {endPoint}
},
}));

executer.AssertWasCalled(
x => x.RegisterForExecution(Arg<OnlineSegmentReplicationCommand>.Is.TypeOf),
o=>o.Repeat.Once());
}

[Fact]
public void AfterBackupsCompleteWillStartReplicationAgain()
{
OnlineSegmentReplicationCommand command = null;
executer.Stub(x => x.RegisterForExecution(Arg<OnlineSegmentReplicationCommand>.Is.TypeOf))
.WhenCalled(invocation => command = (OnlineSegmentReplicationCommand) invocation.Arguments[0]);

node.SetTopology(new Topology(new[]
{
new Segment
{
AssignedEndpoint = NodeEndpoint.ForTest(91),
PendingBackups = {endPoint}
},
}));

command.RaiseCompleted();

node.SetTopology(new Topology(new[]
{
new Segment
{
AssignedEndpoint = NodeEndpoint.ForTest(91),
PendingBackups = {endPoint}
},
}));

executer.AssertWasCalled(
x => x.RegisterForExecution(Arg<OnlineSegmentReplicationCommand>.Is.TypeOf),
o => o.Repeat.Twice());
}
}
} }
} }
Expand Up @@ -10,6 +10,7 @@
using NodeEndpoint = Rhino.DistributedHashTable.Internal.NodeEndpoint; using NodeEndpoint = Rhino.DistributedHashTable.Internal.NodeEndpoint;
using Segment = Rhino.DistributedHashTable.Internal.Segment; using Segment = Rhino.DistributedHashTable.Internal.Segment;
using Rhino.DistributedHashTable.Util; using Rhino.DistributedHashTable.Util;
using ReplicationType=Rhino.DistributedHashTable.Internal.ReplicationType;


namespace Rhino.DistributedHashTable.Client namespace Rhino.DistributedHashTable.Client
{ {
Expand Down Expand Up @@ -83,6 +84,7 @@ private static MasterMessageUnion ReadReply(MasterMessageType responses, Stream
} }


public void CaughtUp(NodeEndpoint endpoint, public void CaughtUp(NodeEndpoint endpoint,
ReplicationType type,
params int[] caughtUpSegments) params int[] caughtUpSegments)
{ {
Execute((writer, Execute((writer,
Expand All @@ -94,6 +96,7 @@ private static MasterMessageUnion ReadReply(MasterMessageType responses, Stream
CaughtUp = new CaughtUpRequestMessage.Builder CaughtUp = new CaughtUpRequestMessage.Builder
{ {
CaughtUpSegmentsList = { caughtUpSegments }, CaughtUpSegmentsList = { caughtUpSegments },
Type = type == ReplicationType.Backup ? Protocol.ReplicationType.Backup : Protocol.ReplicationType.Ownership,
Endpoint = new Protocol.NodeEndpoint.Builder Endpoint = new Protocol.NodeEndpoint.Builder
{ {
Async = endpoint.Async.ToString(), Async = endpoint.Async.ToString(),
Expand Down Expand Up @@ -126,6 +129,7 @@ public Topology GetTopology()
} }


public void GaveUp(NodeEndpoint endpoint, public void GaveUp(NodeEndpoint endpoint,
ReplicationType type,
params int[] rangesGivingUpOn) params int[] rangesGivingUpOn)
{ {
Execute((writer, Execute((writer,
Expand All @@ -137,6 +141,7 @@ public Topology GetTopology()
GaveUp = new GaveUpRequestMessage.Builder GaveUp = new GaveUpRequestMessage.Builder
{ {
GaveUpSegmentsList = { rangesGivingUpOn }, GaveUpSegmentsList = { rangesGivingUpOn },
Type = type == ReplicationType.Backup ? Protocol.ReplicationType.Backup : Protocol.ReplicationType.Ownership,
Endpoint = new Protocol.NodeEndpoint.Builder Endpoint = new Protocol.NodeEndpoint.Builder
{ {
Async = endpoint.Async.ToString(), Async = endpoint.Async.ToString(),
Expand Down

0 comments on commit cdd5a19

Please sign in to comment.