Skip to content
Browse files

Making sure that replication is aware of backup procedures and that i…

…t doesn't need to take that into account with ownership.

git-svn-id: https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/experiments/dht/dht@2190 079b0acf-d9fa-0310-9935-e5ade295c882
  • Loading branch information...
1 parent f1aa284 commit 0084675cda74a5e33976dd7e97a9de3c57e5c766 ayenderahien committed Jun 6, 2009
View
18 Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs
@@ -42,9 +42,10 @@ public void AfterBothNodesJoinedWillAutomaticallyReplicateToBackupNode()
var masterProxy = new DistributedHashTableMasterClient(masterUri);
+ Topology topology;
for (var i = 0; i < 50; i++)
{
- var topology = masterProxy.GetTopology();
+ topology = masterProxy.GetTopology();
var count = topology.Segments
.Where(x => x.AssignedEndpoint == storageHostA.Endpoint)
.Count();
@@ -54,27 +55,28 @@ public void AfterBothNodesJoinedWillAutomaticallyReplicateToBackupNode()
Thread.Sleep(500);
}
+ topology = masterProxy.GetTopology();
+ var segment = topology.Segments.First(x => x.AssignedEndpoint == storageHostA.Endpoint).Index;
using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
{
- var topology = masterProxy.GetTopology();
nodeA.Put(topology.Version, new ExtendedPutRequest
{
Bytes = new byte[] { 2, 2, 0, 0 },
Key = "abc",
- Segment = 0
+ Segment = segment
});
}
using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint))
{
- var topology = masterProxy.GetTopology();
+ topology = masterProxy.GetTopology();
Value[][] values = null;
for (var i = 0; i < 100; i++)
{
values = nodeB.Get(topology.Version, new ExtendedGetRequest
{
Key = "abc",
- Segment = 0
+ Segment = segment
});
if (values[0].Length != 0)
break;
@@ -124,11 +126,9 @@ public void AfterTwoNodesJoinTheClusterEachSegmentHasBackup()
for (var i = 0; i < 50; i++)
{
topology = masterProxy.GetTopology();
- var count = topology.Segments
- .Where(x => x.AssignedEndpoint == storageHostA.Endpoint)
- .Count();
+ var allSegmentsHaveBackups = topology.Segments.All(x => x.Backups.Count > 0);
- if (count == 4096)
+ if (allSegmentsHaveBackups)
break;
Thread.Sleep(500);
}
View
18 Rhino.DistributedHashTable.ClusterTests/NodeOverTheNetwork.cs
@@ -37,7 +37,7 @@ public void CanReplicateEmptySegments()
{
var segments = new[]{1,2,3};
- var assignedSegments = storageProxy.AssignAllEmptySegments(NodeEndpoint.ForTest(13), segments);
+ var assignedSegments = storageProxy.AssignAllEmptySegments(NodeEndpoint.ForTest(13), ReplicationType.Ownership, segments);
Assert.Equal(segments, assignedSegments);
}
@@ -58,7 +58,7 @@ public void WhenReplicatingEmptySegmentsWillNotReplicateSegmentsThatHasValues()
var segments = new[] { 1, 2, 3 };
- var assignedSegments = storageProxy.AssignAllEmptySegments(NodeEndpoint.ForTest(13), segments);
+ var assignedSegments = storageProxy.AssignAllEmptySegments(NodeEndpoint.ForTest(13), ReplicationType.Ownership, segments);
Assert.Equal(new[]{2,3}, assignedSegments);
}
@@ -77,7 +77,7 @@ public void CanReplicateSegmentWithData()
Segment = 1,
});
- var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), ReplicationType.Ownership, 1);
Assert.False(result.Done);
Assert.Equal("test", result.PutRequests[0].Key);
@@ -97,7 +97,7 @@ public void CanReplicateSegmentWithDataWhileStillServingRequestForSegment()
Segment = 1,
});
- var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), ReplicationType.Ownership, 1);
Assert.Equal("test", result.PutRequests[0].Key);
storageProxy.Put(topology.Version, new ExtendedPutRequest
@@ -107,7 +107,7 @@ public void CanReplicateSegmentWithDataWhileStillServingRequestForSegment()
Segment = 1,
});
- result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), ReplicationType.Ownership, 1);
Assert.Equal("test2", result.PutRequests[0].Key);
}
}
@@ -125,7 +125,7 @@ public void CanReplicateSegmentWithDataWhileRemovingItems()
Segment = 1,
});
- var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), ReplicationType.Ownership, 1);
Assert.Equal("test", result.PutRequests[0].Key);
storageProxy.Remove(topology.Version, new ExtendedRemoveRequest()
@@ -135,7 +135,7 @@ public void CanReplicateSegmentWithDataWhileRemovingItems()
SpecificVersion = result.PutRequests[0].ReplicationVersion
});
- result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), ReplicationType.Ownership, 1);
Assert.Equal("test", result.RemoveRequests[0].Key);
}
}
@@ -153,10 +153,10 @@ public void WhenFinishedReplicatingWillTellTheReplicatorSo()
Segment = 1,
});
- var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), ReplicationType.Ownership, 1);
Assert.Equal("test", result.PutRequests[0].Key);
- result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), ReplicationType.Ownership, 1);
Assert.True(result.Done);
}
}
View
24 Rhino.DistributedHashTable.IntegrationTests/DistributedHashTableReplicationTest.cs
@@ -33,7 +33,7 @@ public WhenThereAreNoKeysInTable()
public void WillAssignAllSegmentsWeAskForImmediately()
{
var ranges = Enumerable.Range(0, 500).ToArray();
- var assignedSegments = replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ var assignedSegments = replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ReplicationType.Ownership, ranges);
Assert.Equal(ranges, assignedSegments);
}
@@ -42,7 +42,7 @@ public void WillAssignAllSegmentsWeAskForImmediately()
public void PuttingKeyInSegmentAsisgnedElsewhereShouldThrow()
{
var ranges = Enumerable.Range(0, 500).ToArray();
- replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ReplicationType.Ownership, ranges);
var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Put(guid, new ExtendedPutRequest()
{
Key = "test",
@@ -56,7 +56,7 @@ public void PuttingKeyInSegmentAsisgnedElsewhereShouldThrow()
public void GettingKeyInSegmentAsisgnedElsewhereShouldThrow()
{
var ranges = Enumerable.Range(0, 500).ToArray();
- replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ReplicationType.Ownership, ranges);
var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Get(guid, new ExtendedGetRequest()
{
Key = "test",
@@ -70,7 +70,7 @@ public void GettingKeyInSegmentAsisgnedElsewhereShouldThrow()
public void RemovingKeyInSegmentAsisgnedElsewhereShouldThrow()
{
var ranges = Enumerable.Range(0, 500).ToArray();
- replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ReplicationType.Ownership, ranges);
var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Remove(guid, new ExtendedRemoveRequest
{
Key = "test",
@@ -114,32 +114,32 @@ public WhenThereAreKeysInTable()
public void WillSkipSegmentsThatHasItemsInThem()
{
var ranges = Enumerable.Range(0, 500).ToArray();
- var assignedSegments = replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ var assignedSegments = replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ReplicationType.Ownership, ranges);
Assert.Equal(ranges.Skip(1).ToArray(), assignedSegments);
}
[Fact]
public void WillGiveExistingKeysFromSegment()
{
- var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), ReplicationType.Ownership, 0);
Assert.Equal(1, result.PutRequests.Length);
Assert.Equal(new byte[]{1}, result.PutRequests[0].Bytes);
}
[Fact]
public void WillRememberKeysSentDuringPreviousReplication()
{
- var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), ReplicationType.Ownership, 0);
Assert.Equal(1, result.PutRequests.Length);
- result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), ReplicationType.Ownership, 0);
Assert.Equal(0, result.PutRequests.Length);
}
[Fact]
public void WhenItemIsRemovedWillResultInRemovalRequestOnNextReplicationPage()
{
- var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), ReplicationType.Ownership, 0);
Assert.Equal(1, result.PutRequests.Length);
node.Storage.Remove(node.GetTopologyVersion(), new ExtendedRemoveRequest
{
@@ -148,17 +148,17 @@ public void WhenItemIsRemovedWillResultInRemovalRequestOnNextReplicationPage()
Segment = 0,
});
- result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), ReplicationType.Ownership, 0);
Assert.Equal(1, result.RemoveRequests.Length);
}
[Fact]
public void WhenDoneGettingAllKeysWillAssignSegmentToEndpoint()
{
- var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), ReplicationType.Ownership, 0);
Assert.Equal(1, result.PutRequests.Length);
- result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), ReplicationType.Ownership, 0);
Assert.Equal(0, result.PutRequests.Length);
var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Remove(guid, new ExtendedRemoveRequest
View
44 Rhino.DistributedHashTable.IntegrationTests/OnlineRangeReplicationCommandTest.cs
@@ -38,9 +38,9 @@ public OnlineSegmentReplicationCommandTest()
[Fact]
public void WillAskForAllEmptySegments()
{
- replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int[]>.Is.Anything))
.Return(new int[0]);
- replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int>.Is.Anything))
.Return(new ReplicationResult
{
PutRequests = new ExtendedPutRequest[0],
@@ -50,15 +50,15 @@ public void WillAskForAllEmptySegments()
var success = command.Execute();
Assert.True(success);
- replication.AssertWasCalled(x => x.AssignAllEmptySegments(node.Endpoint, new int[] { 0, 1 }));
+ replication.AssertWasCalled(x => x.AssignAllEmptySegments(node.Endpoint, ReplicationType.Ownership, new [] { 0, 1 }));
}
[Fact]
public void WillLetNodeKnowAboutAnyEmptySegmentsAssignedToIt()
{
- replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int[]>.Is.Anything))
.Return(new []{0});
- replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int>.Is.Anything))
.Return(new ReplicationResult
{
PutRequests = new ExtendedPutRequest[0],
@@ -74,9 +74,9 @@ public void WillLetNodeKnowAboutAnyEmptySegmentsAssignedToIt()
[Fact]
public void WillNotTryToReplicaterangesThatWereEmptyAndAssigned()
{
- replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int[]>.Is.Anything))
.Return(new[] { 0 });
- replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int>.Is.Anything))
.Return(new ReplicationResult
{
PutRequests = new ExtendedPutRequest[0],
@@ -86,15 +86,15 @@ public void WillNotTryToReplicaterangesThatWereEmptyAndAssigned()
var success = command.Execute();
Assert.True(success);
- replication.AssertWasNotCalled(x => x.ReplicateNextPage(node.Endpoint, 0));
+ replication.AssertWasNotCalled(x => x.ReplicateNextPage(node.Endpoint, ReplicationType.Ownership, 0));
}
[Fact]
public void WillTryToReplicaterangesThatWereNotEmpty()
{
- replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int[]>.Is.Anything))
.Return(new[] { 0 });
- replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int>.Is.Anything))
.Return(new ReplicationResult
{
PutRequests = new ExtendedPutRequest[0],
@@ -104,20 +104,20 @@ public void WillTryToReplicaterangesThatWereNotEmpty()
var success = command.Execute();
Assert.True(success);
- replication.AssertWasCalled(x => x.ReplicateNextPage(node.Endpoint, 1));
+ replication.AssertWasCalled(x => x.ReplicateNextPage(node.Endpoint, ReplicationType.Ownership, 1));
}
[Fact]
public void WillPutReturnedItemsIntoStorage()
{
- replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int[]>.Is.Anything))
.Return(new[] { 0 });
var request = new ExtendedPutRequest
{
Bytes = new byte[]{1},
Key = "a",
};
- replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int>.Is.Anything))
.Return(new ReplicationResult
{
PutRequests = new[]{ request, },
@@ -133,13 +133,13 @@ public void WillPutReturnedItemsIntoStorage()
[Fact]
public void WillRemoveReturnedRemovalFromStorage()
{
- replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int[]>.Is.Anything))
.Return(new[] { 0 });
var request = new ExtendedRemoveRequest
{
Key = "a",
};
- replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int>.Is.Anything))
.Return(new ReplicationResult
{
PutRequests = new ExtendedPutRequest[0],
@@ -155,10 +155,10 @@ public void WillRemoveReturnedRemovalFromStorage()
[Fact]
public void WhenSegmentReplicationFailsWillGiveUpTheSegment()
{
- replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int[]>.Is.Anything))
.Return(new int [0]);
-
- replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int>.Is.Anything))
.Throw(new IOException());
var success = command.Execute();
Assert.False(success);
@@ -170,7 +170,7 @@ public void WhenSegmentReplicationFailsWillGiveUpTheSegment()
[Fact]
public void WhenEmptySegmentReplicationFailsWillGiveEverythingUp()
{
- replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int[]>.Is.Anything))
.Throw(new IOException());
var success = command.Execute();
Assert.False(success);
@@ -181,7 +181,7 @@ public void WhenEmptySegmentReplicationFailsWillGiveEverythingUp()
[Fact]
public void WillRepeatReplicationUntilGetDone()
{
- replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int[]>.Is.Anything))
.Return(new[] { 0 });
var request = new ExtendedPutRequest
{
@@ -190,7 +190,7 @@ public void WillRepeatReplicationUntilGetDone()
};
for (int i = 0; i < 5; i++)
{
- replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int>.Is.Anything))
.Repeat.Once()
.Return(new ReplicationResult
{
@@ -199,7 +199,7 @@ public void WillRepeatReplicationUntilGetDone()
Done = false
});
}
- replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg.Is(ReplicationType.Ownership), Arg<int>.Is.Anything))
.Repeat.Once()
.Return(new ReplicationResult
{
View
20 Rhino.DistributedHashTable/Client/DistributedHashTableStorageClient.cs
@@ -11,6 +11,7 @@
using NodeEndpoint = Rhino.DistributedHashTable.Internal.NodeEndpoint;
using Value = Rhino.PersistentHashTable.Value;
using System.Linq;
+using ReplicationType=Rhino.DistributedHashTable.Internal.ReplicationType;
namespace Rhino.DistributedHashTable.Client
{
@@ -171,19 +172,17 @@ public IDistributedHashTableNodeReplication Replication
}
public ReplicationResult ReplicateNextPage(NodeEndpoint replicationEndpoint,
+ ReplicationType type,
int segment)
{
writer.Write(new StorageMessageUnion.Builder
{
Type = StorageMessageType.ReplicateNextPageRequest,
ReplicateNextPageRequest = new ReplicateNextPageRequestMessage.Builder
{
- ReplicationEndpoint = new Protocol.NodeEndpoint.Builder
- {
- Async = replicationEndpoint.Async.ToString(),
- Sync = replicationEndpoint.Sync.ToString()
- }.Build(),
- Segment = segment
+ ReplicationEndpoint = replicationEndpoint.GetNodeEndpoint(),
+ Segment = segment,
+ Type = type == ReplicationType.Backup? Protocol.ReplicationType.Backup : Protocol.ReplicationType.Ownership
}.Build()
}.Build());
writer.Flush();
@@ -204,18 +203,15 @@ public IDistributedHashTableNodeReplication Replication
}
public int[] AssignAllEmptySegments(NodeEndpoint replicationEndpoint,
- int[] segments)
+ ReplicationType type, int[] segments)
{
writer.Write(new StorageMessageUnion.Builder
{
Type = StorageMessageType.AssignAllEmptySegmentsRequest,
AssignAllEmptySegmentsRequest = new AssignAllEmptySegmentsRequestMessage.Builder
{
- ReplicationEndpoint = new Protocol.NodeEndpoint.Builder
- {
- Async = replicationEndpoint.Async.ToString(),
- Sync = replicationEndpoint.Sync.ToString()
- }.Build(),
+ ReplicationEndpoint = replicationEndpoint.GetNodeEndpoint(),
+ Type = type == ReplicationType.Backup? Protocol.ReplicationType.Backup : Protocol.ReplicationType.Ownership,
SegmentsList = { segments }
}.Build()
}.Build());
View
5 Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs
@@ -42,7 +42,7 @@ public void AbortExecution()
public bool Execute()
{
- log.DebugFormat("Replication from {0} of {1} segments", endpoint, segments.Length);
+ log.DebugFormat("Replication from {0} of {1} segments for {2}", endpoint, segments.Length, type);
var processedSegments = new List<int>();
if (continueWorking == false)
@@ -125,7 +125,7 @@ private void ReplicateSegment(Segment segment)
segment,
endpoint);
- var result = otherNode.ReplicateNextPage(node.Endpoint, segment.Index);
+ var result = otherNode.ReplicateNextPage(node.Endpoint, type, segment.Index);
log.DebugFormat("Replication of segment [{0}] from {1} got {2} puts & {3} removals",
segment,
endpoint,
@@ -149,6 +149,7 @@ private List<Segment> AssignAllEmptySegmentsFromEndpoint(List<int> processedSegm
var remainingSegments = new List<Segment>();
var assignedSegments = otherNode.AssignAllEmptySegments(
node.Endpoint,
+ type,
segments.Select(x => x.Index).ToArray());
processedSegments.AddRange(assignedSegments);
View
3 Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
@@ -13,6 +13,7 @@
using Rhino.DistributedHashTable.Util;
using Rhino.Queues;
using NodeEndpoint = Rhino.DistributedHashTable.Internal.NodeEndpoint;
+using ReplicationType=Rhino.DistributedHashTable.Protocol.ReplicationType;
namespace Rhino.DistributedHashTable.Hosting
{
@@ -181,6 +182,7 @@ private void HandleTopologyUpdate(MessageStreamWriter<StorageMessageUnion> write
{
var replicationResult = storage.Replication.ReplicateNextPage(
wrapper.ReplicateNextPageRequest.ReplicationEndpoint.GetNodeEndpoint(),
+ wrapper.ReplicateNextPageRequest.Type == ReplicationType.Backup ? Internal.ReplicationType.Backup : Internal.ReplicationType.Ownership,
wrapper.ReplicateNextPageRequest.Segment
);
writer.Write(new StorageMessageUnion.Builder
@@ -206,6 +208,7 @@ private void HandleTopologyUpdate(MessageStreamWriter<StorageMessageUnion> write
{
var segments = storage.Replication.AssignAllEmptySegments(
wrapper.AssignAllEmptySegmentsRequest.ReplicationEndpoint.GetNodeEndpoint(),
+ wrapper.AssignAllEmptySegmentsRequest.Type == ReplicationType.Backup ? Internal.ReplicationType.Backup : Internal.ReplicationType.Ownership,
wrapper.AssignAllEmptySegmentsRequest.SegmentsList.ToArray()
);
writer.Write(new StorageMessageUnion.Builder
View
4 Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs
@@ -102,8 +102,8 @@ public class MatchSegment
: new Segment
{
Index = x.Index,
- InProcessOfMovingToEndpoint = null,
- AssignedEndpoint = endpoint,
+ InProcessOfMovingToEndpoint = x.InProcessOfMovingToEndpoint,
+ AssignedEndpoint = x.AssignedEndpoint,
PendingBackups = x.PendingBackups
.Where(e => e != endpoint)
.ToSet(),
View
3 Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs
@@ -79,7 +79,8 @@ private void BackgroundReplication()
}
catch (ObjectDisposedException)
{
-
+ log.Info("Queue manager was disposed, quiting background replication");
+ return;
}
catch (Exception e)
{
View
235 Rhino.DistributedHashTable/Protocol/ProtocolDef.cs
@@ -67,74 +67,77 @@ public static partial class ProtocolDef {
"TWVzc2FnZRJCCgdWZXJzaW9uGAEgAigLMjEuUmhpbm8uRGlzdHJpYnV0ZWRI" +
"YXNoVGFibGUuUHJvdG9jb2wuVmFsdWVWZXJzaW9uEhYKDkNvbmZsaWN0RXhp" +
"c3RzGAIgAigIIisKFVJlbW92ZVJlc3BvbnNlTWVzc2FnZRISCgpXYXNSZW1v" +
- "dmVkGAEgAigIIh8KDEVycm9yTWVzc2FnZRIPCgdNZXNzYWdlGAEgAigJIogB" +
+ "dmVkGAEgAigIIh8KDEVycm9yTWVzc2FnZRIPCgdNZXNzYWdlGAEgAigJIswB" +
"CiRBc3NpZ25BbGxFbXB0eVNlZ21lbnRzUmVxdWVzdE1lc3NhZ2USTgoTUmVw" +
"bGljYXRpb25FbmRwb2ludBgBIAIoCzIxLlJoaW5vLkRpc3RyaWJ1dGVkSGFz" +
"aFRhYmxlLlByb3RvY29sLk5vZGVFbmRwb2ludBIQCghTZWdtZW50cxgCIAMo" +
- "BSJBCiVBc3NpZ25BbGxFbXB0eVNlZ21lbnRzUmVzcG9uc2VNZXNzYWdlEhgK" +
- "EEFzc2lnbmVkU2VnbWVudHMYAiADKAUiggEKH1JlcGxpY2F0ZU5leHRQYWdl" +
- "UmVxdWVzdE1lc3NhZ2USTgoTUmVwbGljYXRpb25FbmRwb2ludBgBIAIoCzIx" +
- "LlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlByb3RvY29sLk5vZGVFbmRw" +
- "b2ludBIPCgdTZWdtZW50GAIgAigFItABCiBSZXBsaWNhdGVOZXh0UGFnZVJl" +
- "c3BvbnNlTWVzc2FnZRIMCgREb25lGAEgAigIElEKDlJlbW92ZVJlcXVlc3Rz" +
- "GAIgAygLMjkuUmhpbm8uRGlzdHJpYnV0ZWRIYXNoVGFibGUuUHJvdG9jb2wu" +
- "UmVtb3ZlUmVxdWVzdE1lc3NhZ2USSwoLUHV0UmVxdWVzdHMYAyADKAsyNi5S" +
- "aGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Qcm90b2NvbC5QdXRSZXF1ZXN0" +
- "TWVzc2FnZSJYChRTZWVPdGhlckVycm9yTWVzc2FnZRJACgVPdGhlchgBIAIo" +
- "CzIxLlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlByb3RvY29sLk5vZGVF" +
- "bmRwb2ludCKmBAoSTWFzdGVyTWVzc2FnZVVuaW9uEkQKBFR5cGUYASACKA4y" +
- "Ni5SaGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Qcm90b2NvbC5NYXN0ZXJN" +
- "ZXNzYWdlVHlwZRJMCgtKb2luUmVxdWVzdBgCIAEoCzI3LlJoaW5vLkRpc3Ry" +
- "aWJ1dGVkSGFzaFRhYmxlLlByb3RvY29sLkpvaW5SZXF1ZXN0TWVzc2FnZRJO" +
- "CgxKb2luUmVzcG9uc2UYAyABKAsyOC5SaGluby5EaXN0cmlidXRlZEhhc2hU" +
- "YWJsZS5Qcm90b2NvbC5Kb2luUmVzcG9uc2VNZXNzYWdlEkwKCFRvcG9sb2d5" +
- "GAQgASgLMjouUmhpbm8uRGlzdHJpYnV0ZWRIYXNoVGFibGUuUHJvdG9jb2wu" +
- "VG9wb2xvZ3lSZXN1bHRNZXNzYWdlEkQKCUV4Y2VwdGlvbhgFIAEoCzIxLlJo" +
- "aW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlByb3RvY29sLkVycm9yTWVzc2Fn" +
- "ZRJNCghDYXVnaHRVcBgGIAEoCzI7LlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRh" +
- "YmxlLlByb3RvY29sLkNhdWdodFVwUmVxdWVzdE1lc3NhZ2USSQoGR2F2ZVVw" +
- "GAcgASgLMjkuUmhpbm8uRGlzdHJpYnV0ZWRIYXNoVGFibGUuUHJvdG9jb2wu" +
- "R2F2ZVVwUmVxdWVzdE1lc3NhZ2UipAkKE1N0b3JhZ2VNZXNzYWdlVW5pb24S" +
- "RQoEVHlwZRgBIAIoDjI3LlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlBy" +
- "b3RvY29sLlN0b3JhZ2VNZXNzYWdlVHlwZRIXCg9Ub3BvbG9neVZlcnNpb24Y" +
- "AiABKAwSSwoLR2V0UmVxdWVzdHMYAyADKAsyNi5SaGluby5EaXN0cmlidXRl" +
- "ZEhhc2hUYWJsZS5Qcm90b2NvbC5HZXRSZXF1ZXN0TWVzc2FnZRJLCgtQdXRS" +
- "ZXF1ZXN0cxgEIAMoCzI2LlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlBy" +
- "b3RvY29sLlB1dFJlcXVlc3RNZXNzYWdlEk0KDFB1dFJlc3BvbnNlcxgHIAMo" +
- "CzI3LlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlByb3RvY29sLlB1dFJl" +
- "c3BvbnNlTWVzc2FnZRJRCg5SZW1vdmVSZXF1ZXN0cxgFIAMoCzI5LlJoaW5v" +
- "LkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlByb3RvY29sLlJlbW92ZVJlcXVlc3RN" +
- "ZXNzYWdlElIKDlJlbW92ZVJlc3BvbmVzGAggAygLMjouUmhpbm8uRGlzdHJp" +
- "YnV0ZWRIYXNoVGFibGUuUHJvdG9jb2wuUmVtb3ZlUmVzcG9uc2VNZXNzYWdl" +
- "Ek0KDEdldFJlc3BvbnNlcxgGIAMoCzI3LlJoaW5vLkRpc3RyaWJ1dGVkSGFz" +
- "aFRhYmxlLlByb3RvY29sLkdldFJlc3BvbnNlTWVzc2FnZRJECglFeGNlcHRp" +
- "b24YCSABKAsyMS5SaGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Qcm90b2Nv" +
- "bC5FcnJvck1lc3NhZ2UScAodQXNzaWduQWxsRW1wdHlTZWdtZW50c1JlcXVl" +
- "c3QYCiABKAsySS5SaGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Qcm90b2Nv" +
- "bC5Bc3NpZ25BbGxFbXB0eVNlZ21lbnRzUmVxdWVzdE1lc3NhZ2UScgoeQXNz" +
- "aWduQWxsRW1wdHlTZWdtZW50c1Jlc3BvbnNlGAsgASgLMkouUmhpbm8uRGlz" +
- "dHJpYnV0ZWRIYXNoVGFibGUuUHJvdG9jb2wuQXNzaWduQWxsRW1wdHlTZWdt" +
- "ZW50c1Jlc3BvbnNlTWVzc2FnZRJmChhSZXBsaWNhdGVOZXh0UGFnZVJlcXVl" +
- "c3QYDCABKAsyRC5SaGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Qcm90b2Nv" +
- "bC5SZXBsaWNhdGVOZXh0UGFnZVJlcXVlc3RNZXNzYWdlEmgKGVJlcGxpY2F0" +
- "ZU5leHRQYWdlUmVzcG9uc2UYDSABKAsyRS5SaGluby5EaXN0cmlidXRlZEhh" +
- "c2hUYWJsZS5Qcm90b2NvbC5SZXBsaWNhdGVOZXh0UGFnZVJlc3BvbnNlTWVz" +
- "c2FnZRJQCg1TZWVPdGhlckVycm9yGA4gASgLMjkuUmhpbm8uRGlzdHJpYnV0" +
- "ZWRIYXNoVGFibGUuUHJvdG9jb2wuU2VlT3RoZXJFcnJvck1lc3NhZ2Uq9QIK" +
- "ElN0b3JhZ2VNZXNzYWdlVHlwZRIPCgtHZXRSZXF1ZXN0cxABEg8KC1B1dFJl" +
- "cXVlc3RzEAISEgoOUmVtb3ZlUmVxdWVzdHMQAxIQCgxHZXRSZXNwb25zZXMQ" +
- "BBIQCgxQdXRSZXNwb25zZXMQBRITCg9SZW1vdmVSZXNwb25zZXMQBhIWChJT" +
- "dG9yYWdlRXJyb3JSZXN1bHQQBxIhCh1Bc3NpZ25BbGxFbXB0eVNlZ21lbnRz" +
- "UmVxdWVzdBAIEiIKHkFzc2lnbkFsbEVtcHR5U2VnbWVudHNSZXNwb25zZRAJ" +
- "EhwKGFJlcGxpY2F0ZU5leHRQYWdlUmVxdWVzdBAKEh0KGVJlcGxpY2F0ZU5l" +
- "eHRQYWdlUmVzcG9uc2UQCxIRCg1TZWVPdGhlckVycm9yEAwSGAoUVG9wb2xv" +
- "Z3lDaGFuZ2VkRXJyb3IQDRISCg5VcGRhdGVUb3BvbG9neRAOEhMKD1RvcG9s" +
- "b2d5VXBkYXRlZBAPKswBChFNYXN0ZXJNZXNzYWdlVHlwZRIWChJHZXRUb3Bv" +
- "bG9neVJlcXVlc3QQARIVChFHZXRUb3BvbG9neVJlc3VsdBACEg8KC0pvaW5S" +
- "ZXF1ZXN0EAMSDgoKSm9pblJlc3VsdBAEEhUKEU1hc3RlckVycm9yUmVzdWx0" +
- "EAUSEwoPQ2F1Z2h0VXBSZXF1ZXN0EAYSFAoQQ2F1Z2h0VXBSZXNwb25zZRAH" +
- "EhEKDUdhdmVVcFJlcXVlc3QQCBISCg5HYXZlVXBSZXNwb25zZRAJKiwKD1Jl" +
- "cGxpY2F0aW9uVHlwZRINCglPd25lcnNoaXAQARIKCgZCYWNrdXAQAkICSAE="),
+ "BRJCCgRUeXBlGAMgAigOMjQuUmhpbm8uRGlzdHJpYnV0ZWRIYXNoVGFibGUu" +
+ "UHJvdG9jb2wuUmVwbGljYXRpb25UeXBlIkEKJUFzc2lnbkFsbEVtcHR5U2Vn" +
+ "bWVudHNSZXNwb25zZU1lc3NhZ2USGAoQQXNzaWduZWRTZWdtZW50cxgCIAMo" +
+ "BSLGAQofUmVwbGljYXRlTmV4dFBhZ2VSZXF1ZXN0TWVzc2FnZRJOChNSZXBs" +
+ "aWNhdGlvbkVuZHBvaW50GAEgAigLMjEuUmhpbm8uRGlzdHJpYnV0ZWRIYXNo" +
+ "VGFibGUuUHJvdG9jb2wuTm9kZUVuZHBvaW50Eg8KB1NlZ21lbnQYAiACKAUS" +
+ "QgoEVHlwZRgDIAIoDjI0LlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlBy" +
+ "b3RvY29sLlJlcGxpY2F0aW9uVHlwZSLQAQogUmVwbGljYXRlTmV4dFBhZ2VS" +
+ "ZXNwb25zZU1lc3NhZ2USDAoERG9uZRgBIAIoCBJRCg5SZW1vdmVSZXF1ZXN0" +
+ "cxgCIAMoCzI5LlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlByb3RvY29s" +
+ "LlJlbW92ZVJlcXVlc3RNZXNzYWdlEksKC1B1dFJlcXVlc3RzGAMgAygLMjYu" +
+ "Umhpbm8uRGlzdHJpYnV0ZWRIYXNoVGFibGUuUHJvdG9jb2wuUHV0UmVxdWVz" +
+ "dE1lc3NhZ2UiWAoUU2VlT3RoZXJFcnJvck1lc3NhZ2USQAoFT3RoZXIYASAC" +
+ "KAsyMS5SaGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Qcm90b2NvbC5Ob2Rl" +
+ "RW5kcG9pbnQipgQKEk1hc3Rlck1lc3NhZ2VVbmlvbhJECgRUeXBlGAEgAigO" +
+ "MjYuUmhpbm8uRGlzdHJpYnV0ZWRIYXNoVGFibGUuUHJvdG9jb2wuTWFzdGVy" +
+ "TWVzc2FnZVR5cGUSTAoLSm9pblJlcXVlc3QYAiABKAsyNy5SaGluby5EaXN0" +
+ "cmlidXRlZEhhc2hUYWJsZS5Qcm90b2NvbC5Kb2luUmVxdWVzdE1lc3NhZ2US" +
+ "TgoMSm9pblJlc3BvbnNlGAMgASgLMjguUmhpbm8uRGlzdHJpYnV0ZWRIYXNo" +
+ "VGFibGUuUHJvdG9jb2wuSm9pblJlc3BvbnNlTWVzc2FnZRJMCghUb3BvbG9n" +
+ "eRgEIAEoCzI6LlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlByb3RvY29s" +
+ "LlRvcG9sb2d5UmVzdWx0TWVzc2FnZRJECglFeGNlcHRpb24YBSABKAsyMS5S" +
+ "aGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Qcm90b2NvbC5FcnJvck1lc3Nh" +
+ "Z2USTQoIQ2F1Z2h0VXAYBiABKAsyOy5SaGluby5EaXN0cmlidXRlZEhhc2hU" +
+ "YWJsZS5Qcm90b2NvbC5DYXVnaHRVcFJlcXVlc3RNZXNzYWdlEkkKBkdhdmVV" +
+ "cBgHIAEoCzI5LlJoaW5vLkRpc3RyaWJ1dGVkSGFzaFRhYmxlLlByb3RvY29s" +
+ "LkdhdmVVcFJlcXVlc3RNZXNzYWdlIqQJChNTdG9yYWdlTWVzc2FnZVVuaW9u" +
+ "EkUKBFR5cGUYASACKA4yNy5SaGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Q" +
+ "cm90b2NvbC5TdG9yYWdlTWVzc2FnZVR5cGUSFwoPVG9wb2xvZ3lWZXJzaW9u" +
+ "GAIgASgMEksKC0dldFJlcXVlc3RzGAMgAygLMjYuUmhpbm8uRGlzdHJpYnV0" +
+ "ZWRIYXNoVGFibGUuUHJvdG9jb2wuR2V0UmVxdWVzdE1lc3NhZ2USSwoLUHV0" +
+ "UmVxdWVzdHMYBCADKAsyNi5SaGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Q" +
+ "cm90b2NvbC5QdXRSZXF1ZXN0TWVzc2FnZRJNCgxQdXRSZXNwb25zZXMYByAD" +
+ "KAsyNy5SaGluby5EaXN0cmlidXRlZEhhc2hUYWJsZS5Qcm90b2NvbC5QdXRS" +
+ "ZXNwb25zZU1lc3NhZ2USUQoOUmVtb3ZlUmVxdWVzdHMYBSADKAsyOS5SaGlu" +
+ "by5EaXN0cmlidXRlZEhhc2hUYWJsZS5Qcm90b2NvbC5SZW1vdmVSZXF1ZXN0" +
+ "TWVzc2FnZRJSCg5SZW1vdmVSZXNwb25lcxgIIAMoCzI6LlJoaW5vLkRpc3Ry" +
+ "aWJ1dGVkSGFzaFRhYmxlLlByb3RvY29sLlJlbW92ZVJlc3BvbnNlTWVzc2Fn" +
+ "ZRJNCgxHZXRSZXNwb25zZXMYBiADKAsyNy5SaGluby5EaXN0cmlidXRlZEhh" +
+ "c2hUYWJsZS5Qcm90b2NvbC5HZXRSZXNwb25zZU1lc3NhZ2USRAoJRXhjZXB0" +
+ "aW9uGAkgASgLMjEuUmhpbm8uRGlzdHJpYnV0ZWRIYXNoVGFibGUuUHJvdG9j" +
+ "b2wuRXJyb3JNZXNzYWdlEnAKHUFzc2lnbkFsbEVtcHR5U2VnbWVudHNSZXF1" +
+ "ZXN0GAogASgLMkkuUmhpbm8uRGlzdHJpYnV0ZWRIYXNoVGFibGUuUHJvdG9j" +
+ "b2wuQXNzaWduQWxsRW1wdHlTZWdtZW50c1JlcXVlc3RNZXNzYWdlEnIKHkFz" +
+ "c2lnbkFsbEVtcHR5U2VnbWVudHNSZXNwb25zZRgLIAEoCzJKLlJoaW5vLkRp" +
+ "c3RyaWJ1dGVkSGFzaFRhYmxlLlByb3RvY29sLkFzc2lnbkFsbEVtcHR5U2Vn" +
+ "bWVudHNSZXNwb25zZU1lc3NhZ2USZgoYUmVwbGljYXRlTmV4dFBhZ2VSZXF1" +
+ "ZXN0GAwgASgLMkQuUmhpbm8uRGlzdHJpYnV0ZWRIYXNoVGFibGUuUHJvdG9j" +
+ "b2wuUmVwbGljYXRlTmV4dFBhZ2VSZXF1ZXN0TWVzc2FnZRJoChlSZXBsaWNh" +
+ "dGVOZXh0UGFnZVJlc3BvbnNlGA0gASgLMkUuUmhpbm8uRGlzdHJpYnV0ZWRI" +
+ "YXNoVGFibGUuUHJvdG9jb2wuUmVwbGljYXRlTmV4dFBhZ2VSZXNwb25zZU1l" +
+ "c3NhZ2USUAoNU2VlT3RoZXJFcnJvchgOIAEoCzI5LlJoaW5vLkRpc3RyaWJ1" +
+ "dGVkSGFzaFRhYmxlLlByb3RvY29sLlNlZU90aGVyRXJyb3JNZXNzYWdlKvUC" +
+ "ChJTdG9yYWdlTWVzc2FnZVR5cGUSDwoLR2V0UmVxdWVzdHMQARIPCgtQdXRS" +
+ "ZXF1ZXN0cxACEhIKDlJlbW92ZVJlcXVlc3RzEAMSEAoMR2V0UmVzcG9uc2Vz" +
+ "EAQSEAoMUHV0UmVzcG9uc2VzEAUSEwoPUmVtb3ZlUmVzcG9uc2VzEAYSFgoS" +
+ "U3RvcmFnZUVycm9yUmVzdWx0EAcSIQodQXNzaWduQWxsRW1wdHlTZWdtZW50" +
+ "c1JlcXVlc3QQCBIiCh5Bc3NpZ25BbGxFbXB0eVNlZ21lbnRzUmVzcG9uc2UQ" +
+ "CRIcChhSZXBsaWNhdGVOZXh0UGFnZVJlcXVlc3QQChIdChlSZXBsaWNhdGVO" +
+ "ZXh0UGFnZVJlc3BvbnNlEAsSEQoNU2VlT3RoZXJFcnJvchAMEhgKFFRvcG9s" +
+ "b2d5Q2hhbmdlZEVycm9yEA0SEgoOVXBkYXRlVG9wb2xvZ3kQDhITCg9Ub3Bv" +
+ "bG9neVVwZGF0ZWQQDyrMAQoRTWFzdGVyTWVzc2FnZVR5cGUSFgoSR2V0VG9w" +
+ "b2xvZ3lSZXF1ZXN0EAESFQoRR2V0VG9wb2xvZ3lSZXN1bHQQAhIPCgtKb2lu" +
+ "UmVxdWVzdBADEg4KCkpvaW5SZXN1bHQQBBIVChFNYXN0ZXJFcnJvclJlc3Vs" +
+ "dBAFEhMKD0NhdWdodFVwUmVxdWVzdBAGEhQKEENhdWdodFVwUmVzcG9uc2UQ" +
+ "BxIRCg1HYXZlVXBSZXF1ZXN0EAgSEgoOR2F2ZVVwUmVzcG9uc2UQCSosCg9S" +
+ "ZXBsaWNhdGlvblR5cGUSDQoJT3duZXJzaGlwEAESCgoGQmFja3VwEAJCAkgB"),
new pbd::FileDescriptor[] {
});
#endregion
@@ -224,7 +227,7 @@ public static partial class ProtocolDef {
= Descriptor.MessageTypes[16];
internal static pb::FieldAccess.FieldAccessorTable<global::Rhino.DistributedHashTable.Protocol.AssignAllEmptySegmentsRequestMessage, global::Rhino.DistributedHashTable.Protocol.AssignAllEmptySegmentsRequestMessage.Builder> internal__static_Rhino_DistributedHashTable_Protocol_AssignAllEmptySegmentsRequestMessage__FieldAccessorTable
= new pb::FieldAccess.FieldAccessorTable<global::Rhino.DistributedHashTable.Protocol.AssignAllEmptySegmentsRequestMessage, global::Rhino.DistributedHashTable.Protocol.AssignAllEmptySegmentsRequestMessage.Builder>(internal__static_Rhino_DistributedHashTable_Protocol_AssignAllEmptySegmentsRequestMessage__Descriptor,
- new string[] { "ReplicationEndpoint", "Segments", });
+ new string[] { "ReplicationEndpoint", "Segments", "Type", });
internal static readonly pbd::MessageDescriptor internal__static_Rhino_DistributedHashTable_Protocol_AssignAllEmptySegmentsResponseMessage__Descriptor
= Descriptor.MessageTypes[17];
internal static pb::FieldAccess.FieldAccessorTable<global::Rhino.DistributedHashTable.Protocol.AssignAllEmptySegmentsResponseMessage, global::Rhino.DistributedHashTable.Protocol.AssignAllEmptySegmentsResponseMessage.Builder> internal__static_Rhino_DistributedHashTable_Protocol_AssignAllEmptySegmentsResponseMessage__FieldAccessorTable
@@ -234,7 +237,7 @@ public static partial class ProtocolDef {
= Descriptor.MessageTypes[18];
internal static pb::FieldAccess.FieldAccessorTable<global::Rhino.DistributedHashTable.Protocol.ReplicateNextPageRequestMessage, global::Rhino.DistributedHashTable.Protocol.ReplicateNextPageRequestMessage.Builder> internal__static_Rhino_DistributedHashTable_Protocol_ReplicateNextPageRequestMessage__FieldAccessorTable
= new pb::FieldAccess.FieldAccessorTable<global::Rhino.DistributedHashTable.Protocol.ReplicateNextPageRequestMessage, global::Rhino.DistributedHashTable.Protocol.ReplicateNextPageRequestMessage.Builder>(internal__static_Rhino_DistributedHashTable_Protocol_ReplicateNextPageRequestMessage__Descriptor,
- new string[] { "ReplicationEndpoint", "Segment", });
+ new string[] { "ReplicationEndpoint", "Segment", "Type", });
internal static readonly pbd::MessageDescriptor internal__static_Rhino_DistributedHashTable_Protocol_ReplicateNextPageResponseMessage__Descriptor
= Descriptor.MessageTypes[19];
internal static pb::FieldAccess.FieldAccessorTable<global::Rhino.DistributedHashTable.Protocol.ReplicateNextPageResponseMessage, global::Rhino.DistributedHashTable.Protocol.ReplicateNextPageResponseMessage.Builder> internal__static_Rhino_DistributedHashTable_Protocol_ReplicateNextPageResponseMessage__FieldAccessorTable
@@ -5579,9 +5582,19 @@ public sealed partial class AssignAllEmptySegmentsRequestMessage : pb::Generated
return segments_[index];
}
+ private bool hasType;
+ private global::Rhino.DistributedHashTable.Protocol.ReplicationType type_ = global::Rhino.DistributedHashTable.Protocol.ReplicationType.Ownership;
+ public bool HasType {
+ get { return hasType; }
+ }
+ public global::Rhino.DistributedHashTable.Protocol.ReplicationType Type {
+ get { return type_; }
+ }
+
public override bool IsInitialized {
get {
if (!hasReplicationEndpoint) return false;
+ if (!hasType) return false;
if (!ReplicationEndpoint.IsInitialized) return false;
return true;
}
@@ -5596,6 +5609,9 @@ public sealed partial class AssignAllEmptySegmentsRequestMessage : pb::Generated
output.WriteInt32(2, element);
}
}
+ if (HasType) {
+ output.WriteEnum(3, (int) Type);
+ }
UnknownFields.WriteTo(output);
}
@@ -5617,6 +5633,9 @@ public sealed partial class AssignAllEmptySegmentsRequestMessage : pb::Generated
size += dataSize;
size += 1 * segments_.Count;
}
+ if (HasType) {
+ size += pb::CodedOutputStream.ComputeEnumSize(3, (int) Type);
+ }
size += UnknownFields.SerializedSize;
memoizedSerializedSize = size;
return size;
@@ -5707,6 +5726,9 @@ public sealed partial class Builder : pb::GeneratedBuilder<AssignAllEmptySegment
if (other.segments_.Count != 0) {
base.AddRange(other.segments_, result.segments_);
}
+ if (other.HasType) {
+ Type = other.Type;
+ }
this.MergeUnknownFields(other.UnknownFields);
return this;
}
@@ -5752,6 +5774,18 @@ public sealed partial class Builder : pb::GeneratedBuilder<AssignAllEmptySegment
AddSegments(input.ReadInt32());
break;
}
+ case 24: {
+ int rawValue = input.ReadEnum();
+ if (!global::System.Enum.IsDefined(typeof(global::Rhino.DistributedHashTable.Protocol.ReplicationType), rawValue)) {
+ if (unknownFields == null) {
+ unknownFields = pb::UnknownFieldSet.CreateBuilder(this.UnknownFields);
+ }
+ unknownFields.MergeVarintField(3, (ulong) rawValue);
+ } else {
+ Type = (global::Rhino.DistributedHashTable.Protocol.ReplicationType) rawValue;
+ }
+ break;
+ }
}
}
}
@@ -5818,6 +5852,24 @@ public sealed partial class Builder : pb::GeneratedBuilder<AssignAllEmptySegment
result.segments_.Clear();
return this;
}
+
+ public bool HasType {
+ get { return result.HasType; }
+ }
+ public global::Rhino.DistributedHashTable.Protocol.ReplicationType Type {
+ get { return result.Type; }
+ set { SetType(value); }
+ }
+ public Builder SetType(global::Rhino.DistributedHashTable.Protocol.ReplicationType value) {
+ result.hasType = true;
+ result.type_ = value;
+ return this;
+ }
+ public Builder ClearType() {
+ result.hasType = false;
+ result.type_ = global::Rhino.DistributedHashTable.Protocol.ReplicationType.Ownership;
+ return this;
+ }
}
}
@@ -6080,10 +6132,20 @@ public sealed partial class ReplicateNextPageRequestMessage : pb::GeneratedMessa
get { return segment_; }
}
+ private bool hasType;
+ private global::Rhino.DistributedHashTable.Protocol.ReplicationType type_ = global::Rhino.DistributedHashTable.Protocol.ReplicationType.Ownership;
+ public bool HasType {
+ get { return hasType; }
+ }
+ public global::Rhino.DistributedHashTable.Protocol.ReplicationType Type {
+ get { return type_; }
+ }
+
public override bool IsInitialized {
get {
if (!hasReplicationEndpoint) return false;
if (!hasSegment) return false;
+ if (!hasType) return false;
if (!ReplicationEndpoint.IsInitialized) return false;
return true;
}
@@ -6096,6 +6158,9 @@ public sealed partial class ReplicateNextPageRequestMessage : pb::GeneratedMessa
if (HasSegment) {
output.WriteInt32(2, Segment);
}
+ if (HasType) {
+ output.WriteEnum(3, (int) Type);
+ }
UnknownFields.WriteTo(output);
}
@@ -6112,6 +6177,9 @@ public sealed partial class ReplicateNextPageRequestMessage : pb::GeneratedMessa
if (HasSegment) {
size += pb::CodedOutputStream.ComputeInt32Size(2, Segment);
}
+ if (HasType) {
+ size += pb::CodedOutputStream.ComputeEnumSize(3, (int) Type);
+ }
size += UnknownFields.SerializedSize;
memoizedSerializedSize = size;
return size;
@@ -6201,6 +6269,9 @@ public sealed partial class Builder : pb::GeneratedBuilder<ReplicateNextPageRequ
if (other.HasSegment) {
Segment = other.Segment;
}
+ if (other.HasType) {
+ Type = other.Type;
+ }
this.MergeUnknownFields(other.UnknownFields);
return this;
}
@@ -6246,6 +6317,18 @@ public sealed partial class Builder : pb::GeneratedBuilder<ReplicateNextPageRequ
Segment = input.ReadInt32();
break;
}
+ case 24: {
+ int rawValue = input.ReadEnum();
+ if (!global::System.Enum.IsDefined(typeof(global::Rhino.DistributedHashTable.Protocol.ReplicationType), rawValue)) {
+ if (unknownFields == null) {
+ unknownFields = pb::UnknownFieldSet.CreateBuilder(this.UnknownFields);
+ }
+ unknownFields.MergeVarintField(3, (ulong) rawValue);
+ } else {
+ Type = (global::Rhino.DistributedHashTable.Protocol.ReplicationType) rawValue;
+ }
+ break;
+ }
}
}
}
@@ -6304,6 +6387,24 @@ public sealed partial class Builder : pb::GeneratedBuilder<ReplicateNextPageRequ
result.segment_ = 0;
return this;
}
+
+ public bool HasType {
+ get { return result.HasType; }
+ }
+ public global::Rhino.DistributedHashTable.Protocol.ReplicationType Type {
+ get { return result.Type; }
+ set { SetType(value); }
+ }
+ public Builder SetType(global::Rhino.DistributedHashTable.Protocol.ReplicationType value) {
+ result.hasType = true;
+ result.type_ = value;
+ return this;
+ }
+ public Builder ClearType() {
+ result.hasType = false;
+ result.type_ = global::Rhino.DistributedHashTable.Protocol.ReplicationType.Ownership;
+ return this;
+ }
}
}
View
2 Rhino.DistributedHashTable/Protocol/ProtocolDef.proto
@@ -161,6 +161,7 @@ message AssignAllEmptySegmentsRequestMessage
{
required NodeEndpoint ReplicationEndpoint = 1;
repeated int32 Segments = 2;
+ required ReplicationType Type = 3;
}
message AssignAllEmptySegmentsResponseMessage
@@ -172,6 +173,7 @@ message ReplicateNextPageRequestMessage
{
required NodeEndpoint ReplicationEndpoint = 1;
required int32 Segment = 2;
+ required ReplicationType Type = 3;
}
message ReplicateNextPageResponseMessage
View
12 Rhino.DistributedHashTable/Protocol/ProtocolDef.proto.bin
@@ -1,5 +1,5 @@
-�*
+�+
ProtocolDef.proto#Rhino.DistributedHashTable.Protocol"�
Value
Key ( 
@@ -75,15 +75,17 @@ IsReadOnly (
WasRemoved ("
ErrorMessage
-Message ( "�
+Message ( "�
$AssignAllEmptySegmentsRequestMessageN
ReplicationEndpoint ( 21.Rhino.DistributedHashTable.Protocol.NodeEndpoint
-Segments ("A
+Segments (B
+Type (24.Rhino.DistributedHashTable.Protocol.ReplicationType"A
%AssignAllEmptySegmentsResponseMessage
-AssignedSegments ("�
+AssignedSegments ("�
ReplicateNextPageRequestMessageN
ReplicationEndpoint ( 21.Rhino.DistributedHashTable.Protocol.NodeEndpoint
-Segment ("�
+Segment (B
+Type (24.Rhino.DistributedHashTable.Protocol.ReplicationType"�
ReplicateNextPageResponseMessage
Done (Q
RemoveRequests ( 29.Rhino.DistributedHashTable.Protocol.RemoveRequestMessageK
View
29 Rhino.DistributedHashTable/Remote/DistributedHashTableNodeReplication.cs
@@ -1,6 +1,4 @@
-using System;
using System.Collections.Generic;
-using System.Text;
using Rhino.DistributedHashTable.Internal;
using Rhino.DistributedHashTable.Parameters;
using Rhino.PersistentHashTable;
@@ -17,18 +15,19 @@ public DistributedHashTableNodeReplication(PersistentHashTable.PersistentHashTab
}
public ReplicationResult ReplicateNextPage(NodeEndpoint replicationEndpoint,
- int segment)
+ ReplicationType type,
+ int segment)
{
var putRequests = new List<ExtendedPutRequest>();
var removalRequests = new List<ExtendedRemoveRequest>();
- bool done = false;
+ var done = false;
hashTable.Batch(actions =>
{
foreach (var getRequest in actions.GetKeysForTag(segment))
{
var alreadyReplicated = actions.HasReplicationInfo(getRequest.Key,
- getRequest.SpecifiedVersion,
- replicationEndpoint.GetHash());
+ getRequest.SpecifiedVersion,
+ replicationEndpoint.GetHash());
if (alreadyReplicated)
continue;
@@ -52,8 +51,8 @@ public DistributedHashTableNodeReplication(PersistentHashTable.PersistentHashTab
});
actions.AddReplicationInfo(getRequest.Key,
- getRequest.SpecifiedVersion,
- replicationEndpoint.GetHash());
+ getRequest.SpecifiedVersion,
+ replicationEndpoint.GetHash());
if (putRequests.Count >= 100)
break;
@@ -64,14 +63,14 @@ public DistributedHashTableNodeReplication(PersistentHashTable.PersistentHashTab
removalRequests.Add(new ExtendedRemoveRequest
{
Key = request.Key,
- SpecificVersion = request.SpecificVersion
+ SpecificVersion = request.SpecificVersion
});
if (removalRequests.Count >= 100)
break;
}
done = putRequests.Count == 0 && removalRequests.Count == 0;
- if (done)
+ if (done && type == ReplicationType.Ownership)
{
MarkSegmentAsAssignedToEndpoint(actions, replicationEndpoint, segment);
}
@@ -88,7 +87,8 @@ public DistributedHashTableNodeReplication(PersistentHashTable.PersistentHashTab
}
public int[] AssignAllEmptySegments(NodeEndpoint replicationEndpoint,
- int[] segments)
+ ReplicationType type,
+ int[] segments)
{
var reservedSegments = new List<int>();
@@ -98,7 +98,8 @@ public DistributedHashTableNodeReplication(PersistentHashTable.PersistentHashTab
{
if (actions.HasTag(segment))
continue;
- if (MarkSegmentAsAssignedToEndpoint(actions, replicationEndpoint, segment) == false)
+ if (type == ReplicationType.Ownership &&
+ MarkSegmentAsAssignedToEndpoint(actions, replicationEndpoint, segment) == false)
continue;
reservedSegments.Add(segment);
}
@@ -109,8 +110,8 @@ public DistributedHashTableNodeReplication(PersistentHashTable.PersistentHashTab
}
private static bool MarkSegmentAsAssignedToEndpoint(PersistentHashTableActions actions,
- NodeEndpoint endpoint,
- int segment)
+ NodeEndpoint endpoint,
+ int segment)
{
var result = actions.Put(new PutRequest
{
View
8 Rhino.DistributedHashTable/Remote/IDistributedHashTableNodeReplication.cs
@@ -11,7 +11,9 @@ public interface IDistributedHashTableNodeReplication
/// whatever values has been changed (changing the value remove the replication version)
/// If there are no more results, the segment is assigned to the endpoint
/// </summary>
- ReplicationResult ReplicateNextPage(NodeEndpoint replicationEndpoint, int segment);
+ ReplicationResult ReplicateNextPage(NodeEndpoint replicationEndpoint,
+ ReplicationType type,
+ int segment);
/// <summary>
@@ -23,6 +25,8 @@ public interface IDistributedHashTableNodeReplication
/// After this methods return, any call to a node with a segment that was
/// returned from this method will raise a <seealso cref="SeeOtherException"/>
/// </summary>
- int[] AssignAllEmptySegments(NodeEndpoint replicationEndpoint, int[] segments);
+ int[] AssignAllEmptySegments(NodeEndpoint replicationEndpoint,
+ ReplicationType type,
+ int[] segments);
}
}

0 comments on commit 0084675

Please sign in to comment.
Something went wrong with that request. Please try again.