Skip to content
Browse files

Adding first version of dht

  • Loading branch information...
0 parents commit 21188d3b9f408c3d1c828f03984f1720bb91c066 ayenderahien committed Jun 4, 2009
Showing with 5,611 additions and 0 deletions.
  1. +17 −0 Rhino.DistributedHashTable.IntegrationTests/FullIntegrationTest.cs
  2. +178 −0 Rhino.DistributedHashTable.IntegrationTests/Mini/DistributedHashTableReplicationTest.cs
  3. +244 −0 Rhino.DistributedHashTable.IntegrationTests/Mini/DistributedHashTableStorageTest.cs
  4. +15 −0 Rhino.DistributedHashTable.IntegrationTests/Mini/EsentTestBase.cs
  5. +215 −0 Rhino.DistributedHashTable.IntegrationTests/Mini/OnlineRangeReplicationCommandTest.cs
  6. +36 −0 Rhino.DistributedHashTable.IntegrationTests/Properties/AssemblyInfo.cs
  7. +88 −0 Rhino.DistributedHashTable.IntegrationTests/Rhino.DistributedHashTable.IntegrationTests.csproj
  8. +49 −0 Rhino.DistributedHashTable.IntegrationTests/WhenNodeIsStartedWillJoinMaster.cs
  9. +15 −0 Rhino.DistributedHashTable.IntegrationTests/app.config
  10. +122 −0 Rhino.DistributedHashTable.Tests/BackCopiesBehavior.cs
  11. +33 −0 Rhino.DistributedHashTable.Tests/MasterCaughtUpBehavior.cs
  12. +36 −0 Rhino.DistributedHashTable.Tests/MasterGaveUpBehavior.cs
  13. +138 −0 Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs
  14. +125 −0 Rhino.DistributedHashTable.Tests/NodeReplicationBehavior.cs
  15. +104 −0 Rhino.DistributedHashTable.Tests/NodeStartupBehavior.cs
  16. +10 −0 Rhino.DistributedHashTable.Tests/Program.cs
  17. +36 −0 Rhino.DistributedHashTable.Tests/Properties/AssemblyInfo.cs
  18. +21 −0 Rhino.DistributedHashTable.Tests/RangesBehavior.cs
  19. +90 −0 Rhino.DistributedHashTable.Tests/Rhino.DistributedHashTable.Tests.csproj
  20. +56 −0 Rhino.DistributedHashTable.sln
  21. +3 −0 Rhino.DistributedHashTable/Assumptions.txt
  22. +7 −0 Rhino.DistributedHashTable/Commands/ICommand.cs
  23. +158 −0 Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs
  24. +87 −0 Rhino.DistributedHashTable/Commands/RearrangeBackups.cs
  25. +38 −0 Rhino.DistributedHashTable/Exceptions/SeeOtherException.cs
  26. +35 −0 Rhino.DistributedHashTable/Exceptions/TopologyVersionDoesNotMatchException.cs
  27. +148 −0 Rhino.DistributedHashTable/Hosting/DistributedHashTableMasterHost.cs
  28. +75 −0 Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
  29. +17 −0 Rhino.DistributedHashTable/IDistributedHashTable.cs
  30. +8 −0 Rhino.DistributedHashTable/Internal/BackupState.cs
  31. +27 −0 Rhino.DistributedHashTable/Internal/BinaryMessageSerializer.cs
  32. +8 −0 Rhino.DistributedHashTable/Internal/Constants.cs
  33. +232 −0 Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs
  34. +124 −0 Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs
  35. +208 −0 Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs
  36. +37 −0 Rhino.DistributedHashTable/Internal/IDistributedHashTableMaster.cs
  37. +26 −0 Rhino.DistributedHashTable/Internal/IDistributedHashTableNode.cs
  38. +18 −0 Rhino.DistributedHashTable/Internal/IDistributedHashTableStorage.cs
  39. +9 −0 Rhino.DistributedHashTable/Internal/IExecuter.cs
  40. +11 −0 Rhino.DistributedHashTable/Internal/IMessageSerializer.cs
  41. +88 −0 Rhino.DistributedHashTable/Internal/NodeEndpoint.cs
  42. +9 −0 Rhino.DistributedHashTable/Internal/NodeState.cs
  43. +44 −0 Rhino.DistributedHashTable/Internal/Segment.cs
  44. +14 −0 Rhino.DistributedHashTable/Internal/ThreadPoolExecuter.cs
  45. +42 −0 Rhino.DistributedHashTable/Internal/Topology.cs
  46. +12 −0 Rhino.DistributedHashTable/Parameters/ExtendedGetRequest.cs
  47. +12 −0 Rhino.DistributedHashTable/Parameters/ExtendedPutRequest.cs
  48. +12 −0 Rhino.DistributedHashTable/Parameters/ExtendedRemoveRequest.cs
  49. +11 −0 Rhino.DistributedHashTable/Parameters/IExtendedRequest.cs
  50. +36 −0 Rhino.DistributedHashTable/Properties/AssemblyInfo.cs
  51. +1,955 −0 Rhino.DistributedHashTable/Protocol/Master.cs
  52. +51 −0 Rhino.DistributedHashTable/Protocol/Master.proto
  53. +31 −0 Rhino.DistributedHashTable/Protocol/Master.proto.bin
  54. +2 −0 Rhino.DistributedHashTable/Protocol/regen.cmd
  55. +124 −0 Rhino.DistributedHashTable/Remote/DistributedHashTableNodeReplication.cs
  56. +28 −0 Rhino.DistributedHashTable/Remote/IDistributedHashTableNodeReplication.cs
  57. +10 −0 Rhino.DistributedHashTable/Remote/IDistributedHashTableNodeReplicationFactory.cs
  58. +13 −0 Rhino.DistributedHashTable/Remote/ReplicationResult.cs
  59. +121 −0 Rhino.DistributedHashTable/Rhino.DistributedHashTable.csproj
  60. +10 −0 Rhino.DistributedHashTable/Todo.txt
  61. +82 −0 Rhino.DistributedHashTable/Util/EnumerableExtensions.cs
17 Rhino.DistributedHashTable.IntegrationTests/FullIntegrationTest.cs
@@ -0,0 +1,17 @@
+using System.IO;
+
+namespace Rhino.DistributedHashTable.IntegrationTests
+{
+ public class FullIntegrationTest
+ {
+ public FullIntegrationTest()
+ {
+ if(Directory.Exists("node.queue.esent"))
+ Directory.Delete("node.queue.esent", true);
+
+ if (Directory.Exists("node.data.esent"))
+ Directory.Delete("node.data.esent", true);
+
+ }
+ }
+}
178 Rhino.DistributedHashTable.IntegrationTests/Mini/DistributedHashTableReplicationTest.cs
@@ -0,0 +1,178 @@
+using System;
+using System.Linq;
+using Rhino.DistributedHashTable.Exceptions;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Parameters;
+using Rhino.DistributedHashTable.Remote;
+using Rhino.Mocks;
+using Rhino.PersistentHashTable;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.IntegrationTests.Mini
+{
+ public class DistributedHashTableReplicationTest
+ {
+ public class WhenThereAreNoKeysInTable: EsentTestBase, IDisposable
+ {
+ private readonly DistributedHashTableStorage distributedHashTableStorage;
+ private readonly IDistributedHashTableNode node;
+ private readonly Guid guid;
+ private readonly IDistributedHashTableNodeReplication replication;
+
+ public WhenThereAreNoKeysInTable()
+ {
+ node = MockRepository.GenerateStub<IDistributedHashTableNode>();
+ guid = Guid.NewGuid();
+ node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
+ node);
+ replication = distributedHashTableStorage.Replication;
+ }
+
+ [Fact]
+ public void WillAssignAllSegmentsWeAskForImmediately()
+ {
+ var ranges = Enumerable.Range(0, 500).ToArray();
+ var assignedSegments = replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ Assert.Equal(ranges, assignedSegments);
+ }
+
+
+ [Fact]
+ public void PuttingKeyInSegmentAsisgnedElsewhereShouldThrow()
+ {
+ var ranges = Enumerable.Range(0, 500).ToArray();
+ replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Put(guid, new ExtendedPutRequest()
+ {
+ Key = "test",
+ Segment = 0,
+ Bytes = new byte[] {1},
+ }));
+ Assert.Equal(NodeEndpoint.ForTest(1), exception.Endpoint);
+ }
+
+ [Fact]
+ public void GettingKeyInSegmentAsisgnedElsewhereShouldThrow()
+ {
+ var ranges = Enumerable.Range(0, 500).ToArray();
+ replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Get(guid, new ExtendedGetRequest()
+ {
+ Key = "test",
+ Segment = 0,
+ }));
+ Assert.Equal(NodeEndpoint.ForTest(1), exception.Endpoint);
+ }
+
+
+ [Fact]
+ public void RemovingKeyInSegmentAsisgnedElsewhereShouldThrow()
+ {
+ var ranges = Enumerable.Range(0, 500).ToArray();
+ replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Remove(guid, new ExtendedRemoveRequest
+ {
+ Key = "test",
+ Segment = 0,
+ }));
+ Assert.Equal(NodeEndpoint.ForTest(1), exception.Endpoint);
+ }
+
+ public void Dispose()
+ {
+ distributedHashTableStorage.Dispose();
+ }
+ }
+
+ public class WhenThereAreKeysInTable : EsentTestBase, IDisposable
+ {
+ private readonly DistributedHashTableStorage distributedHashTableStorage;
+ private readonly IDistributedHashTableNode node;
+ private readonly Guid guid;
+ private readonly IDistributedHashTableNodeReplication replication;
+ private PutResult putResult;
+
+ public WhenThereAreKeysInTable()
+ {
+ node = MockRepository.GenerateStub<IDistributedHashTableNode>();
+ guid = Guid.NewGuid();
+ node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
+ node);
+ replication = distributedHashTableStorage.Replication;
+ putResult = distributedHashTableStorage.Put(guid, new ExtendedPutRequest
+ {
+ Tag = 0,
+ Bytes = new byte[] {1},
+ Key = "test",
+ Segment = 0
+ })[0];
+ }
+
+ [Fact]
+ public void WillSkipSegmentsThatHasItemsInThem()
+ {
+ var ranges = Enumerable.Range(0, 500).ToArray();
+ var assignedSegments = replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ranges);
+ Assert.Equal(ranges.Skip(1).ToArray(), assignedSegments);
+ }
+
+ [Fact]
+ public void WillGiveExistingKeysFromSegment()
+ {
+ var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ Assert.Equal(1, result.PutRequests.Length);
+ Assert.Equal(new byte[]{1}, result.PutRequests[0].Bytes);
+ }
+
+ [Fact]
+ public void WillRememberKeysSentDuringPreviousReplication()
+ {
+ var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ Assert.Equal(1, result.PutRequests.Length);
+
+ result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ Assert.Equal(0, result.PutRequests.Length);
+ }
+
+ [Fact]
+ public void WhenItemIsRemovedWillResultInRemovalRequestOnNextReplicationPage()
+ {
+ var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ Assert.Equal(1, result.PutRequests.Length);
+ node.Storage.Remove(node.GetTopologyVersion(), new ExtendedRemoveRequest
+ {
+ Key = "test",
+ SpecificVersion = putResult.Version,
+ Segment = 0,
+ });
+
+ result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ Assert.Equal(1, result.RemoveRequests.Length);
+ }
+
+ [Fact]
+ public void WhenDoneGettingAllKeysWillAssignSegmentToEndpoint()
+ {
+ var result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ Assert.Equal(1, result.PutRequests.Length);
+
+ result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), 0);
+ Assert.Equal(0, result.PutRequests.Length);
+
+ var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Remove(guid, new ExtendedRemoveRequest
+ {
+ Key = "test",
+ Segment = 0,
+ }));
+ Assert.Equal(NodeEndpoint.ForTest(1), exception.Endpoint);
+ }
+
+ public void Dispose()
+ {
+ distributedHashTableStorage.Dispose();
+ }
+ }
+ }
+}
244 Rhino.DistributedHashTable.IntegrationTests/Mini/DistributedHashTableStorageTest.cs
@@ -0,0 +1,244 @@
+using System;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Parameters;
+using Rhino.Mocks;
+using Rhino.PersistentHashTable;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.IntegrationTests.Mini
+{
+ public class DistributedHashTableStorageTest
+ {
+ public class ReadingValues : EsentTestBase, IDisposable
+ {
+ private readonly DistributedHashTableStorage distributedHashTableStorage;
+ private readonly IDistributedHashTableNode node;
+ private readonly Guid guid;
+
+ public ReadingValues()
+ {
+ node = MockRepository.GenerateStub<IDistributedHashTableNode>();
+ guid = Guid.NewGuid();
+ node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
+ node);
+
+ distributedHashTableStorage.Put(guid, new ExtendedPutRequest
+ {
+ Key = "test",
+ Bytes = new byte[]{1,2,4},
+ TopologyVersion = guid,
+ Segment = 0,
+ });
+ }
+
+ [Fact]
+ public void WillReturnNullForMissingValue()
+ {
+ var values = distributedHashTableStorage.Get(guid, new ExtendedGetRequest
+ {
+ Key = "missing-value",
+ Segment = 0
+ });
+ Assert.Empty(values[0]);
+ }
+
+ [Fact]
+ public void WillReturnValueForExistingValue()
+ {
+ var values = distributedHashTableStorage.Get(guid, new ExtendedGetRequest
+ {
+ Key = "test",
+ Segment = 0
+ });
+ Assert.NotEmpty(values[0]);
+ }
+
+ public void Dispose()
+ {
+ distributedHashTableStorage.Dispose();
+ }
+ }
+
+ public class WritingValues : EsentTestBase, IDisposable
+ {
+ private readonly DistributedHashTableStorage distributedHashTableStorage;
+ private readonly IDistributedHashTableNode node;
+ private readonly Guid guid;
+
+ public WritingValues()
+ {
+ node = MockRepository.GenerateStub<IDistributedHashTableNode>();
+ guid = Guid.NewGuid();
+ node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
+ node);
+ }
+
+ [Fact]
+ public void WillGetVersionNumberFromPut()
+ {
+ var results = distributedHashTableStorage.Put(guid, new ExtendedPutRequest
+ {
+ Key = "test",
+ Bytes = new byte[] { 1, 2, 4 },
+ TopologyVersion = guid,
+ Segment = 0,
+ });
+ Assert.NotNull(results[0].Version);
+ }
+
+ [Fact]
+ public void WillReplicateToOtherSystems()
+ {
+ node.Stub(x => x.IsSegmentOwned(0)).Return(true);
+ var request = new ExtendedPutRequest
+ {
+ Key = "test",
+ Bytes = new byte[] { 1, 2, 4 },
+ TopologyVersion = guid,
+ Segment = 0,
+ };
+ distributedHashTableStorage.Put(guid, request);
+ node.AssertWasCalled(x=>x.SendToAllOtherBackups(0, request));
+ }
+
+ [Fact]
+ public void WillReplicateToOwnerWhenFailedOverToAnotherNode()
+ {
+ node.Stub(x => x.IsSegmentOwned(0)).Return(false);
+ var request = new ExtendedPutRequest
+ {
+ Key = "test",
+ Bytes = new byte[] { 1, 2, 4 },
+ TopologyVersion = guid,
+ Segment = 0,
+ };
+ distributedHashTableStorage.Put(guid, request);
+ node.AssertWasCalled(x => x.SendToOwner(0, request));
+ }
+
+ [Fact]
+ public void WillReplicateToOtherBackupsWhenFailedOverToAnotherNode()
+ {
+ node.Stub(x => x.IsSegmentOwned(0)).Return(false);
+ var request = new ExtendedPutRequest
+ {
+ Key = "test",
+ Bytes = new byte[] { 1, 2, 4 },
+ TopologyVersion = guid,
+ Segment = 0,
+ };
+ distributedHashTableStorage.Put(guid, request);
+ node.AssertWasCalled(x => x.SendToAllOtherBackups(0, request));
+ }
+
+ public void Dispose()
+ {
+ distributedHashTableStorage.Dispose();
+ }
+ }
+
+ public class RemovingValues : EsentTestBase, IDisposable
+ {
+ private readonly DistributedHashTableStorage distributedHashTableStorage;
+ private readonly IDistributedHashTableNode node;
+ private readonly Guid guid;
+ private ValueVersion version;
+
+ public RemovingValues()
+ {
+ node = MockRepository.GenerateStub<IDistributedHashTableNode>();
+ guid = Guid.NewGuid();
+ node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
+ node);
+
+ var results = distributedHashTableStorage.Put(guid, new ExtendedPutRequest
+ {
+ Key = "test",
+ Bytes = new byte[] { 1, 2, 4 },
+ TopologyVersion = guid,
+ Segment = 0,
+ });
+ version = results[0].Version;
+ }
+
+ [Fact]
+ public void WillConfirmRemovalOfExistingValue()
+ {
+ var results = distributedHashTableStorage.Remove(guid, new ExtendedRemoveRequest
+ {
+ Key = "test",
+ SpecificVersion = version,
+ TopologyVersion = guid,
+ Segment = 0,
+ });
+ Assert.True(results[0]);
+ }
+
+ [Fact]
+ public void WillNotConfirmRemovalOfNonExistingValue()
+ {
+ var results = distributedHashTableStorage.Remove(guid, new ExtendedRemoveRequest
+ {
+ Key = "test2",
+ SpecificVersion = version,
+ TopologyVersion = guid,
+ Segment = 0,
+ });
+ Assert.False(results[0]);
+ }
+
+ [Fact]
+ public void WillReplicateToOtherSystems()
+ {
+ node.Stub(x => x.IsSegmentOwned(0)).Return(true);
+ var request = new ExtendedRemoveRequest()
+ {
+ Key = "test",
+ SpecificVersion = version,
+ TopologyVersion = guid,
+ Segment = 0,
+ };
+ distributedHashTableStorage.Remove(guid, request);
+ node.AssertWasCalled(x => x.SendToAllOtherBackups(0, request));
+ }
+
+ [Fact]
+ public void WillReplicateToOwnerWhenFailedOverToAnotherNode()
+ {
+ node.Stub(x => x.IsSegmentOwned(0)).Return(false);
+ var request = new ExtendedRemoveRequest()
+ {
+ Key = "test",
+ SpecificVersion = version,
+ TopologyVersion = guid,
+ Segment = 0,
+ };
+ distributedHashTableStorage.Remove(guid, request);
+ node.AssertWasCalled(x => x.SendToOwner(0, request));
+ }
+
+ [Fact]
+ public void WillReplicateToOtherBackupsWhenFailedOverToAnotherNode()
+ {
+ node.Stub(x => x.IsSegmentOwned(0)).Return(false);
+ var request = new ExtendedRemoveRequest()
+ {
+ Key = "test",
+ SpecificVersion = version,
+ TopologyVersion = guid,
+ Segment = 0,
+ };
+ distributedHashTableStorage.Remove(guid, request);
+ node.AssertWasCalled(x => x.SendToAllOtherBackups(0, request));
+ }
+
+ public void Dispose()
+ {
+ distributedHashTableStorage.Dispose();
+ }
+ }
+ }
+}
15 Rhino.DistributedHashTable.IntegrationTests/Mini/EsentTestBase.cs
@@ -0,0 +1,15 @@
+using System.IO;
+
+namespace Rhino.DistributedHashTable.IntegrationTests.Mini
+{
+ public class EsentTestBase
+ {
+ public EsentTestBase()
+ {
+ if (Directory.Exists("test.esent"))
+ Directory.Delete("test.esent", true);
+
+ Directory.CreateDirectory("test.esent");
+ }
+ }
+}
215 Rhino.DistributedHashTable.IntegrationTests/Mini/OnlineRangeReplicationCommandTest.cs
@@ -0,0 +1,215 @@
+using System;
+using System.IO;
+using Rhino.DistributedHashTable.Commands;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Parameters;
+using Rhino.DistributedHashTable.Remote;
+using Rhino.Mocks;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.IntegrationTests.Mini
+{
+ public class OnlineSegmentReplicationCommandTest
+ {
+ private readonly OnlineSegmentReplicationCommand command;
+ private readonly IDistributedHashTableNode node;
+ private readonly IDistributedHashTableNodeReplication replication;
+ private readonly NodeEndpoint endpoint;
+ private readonly IDistributedHashTableStorage storage;
+ private readonly Guid guid = Guid.NewGuid();
+
+ public OnlineSegmentReplicationCommandTest()
+ {
+ node = MockRepository.GenerateStub<IDistributedHashTableNode>();
+ replication = MockRepository.GenerateStub<IDistributedHashTableNodeReplication>();
+ endpoint = NodeEndpoint.ForTest(1);
+ node.Stub(x => x.Endpoint).Return(NodeEndpoint.ForTest(2));
+ storage = MockRepository.GenerateStub<IDistributedHashTableStorage>();
+ node.Storage = storage;
+ node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ command = new OnlineSegmentReplicationCommand(
+ endpoint,
+ new[] { new Segment { Index = 0 }, new Segment { Index = 1 }, },
+ node,
+ replication);
+ }
+
+ [Fact]
+ public void WillAskForAllEmptySegments()
+ {
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ .Return(new int[0]);
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ .Return(new ReplicationResult
+ {
+ PutRequests = new ExtendedPutRequest[0],
+ RemoveRequests = new ExtendedRemoveRequest[0],
+ Done = true
+ });
+ var success = command.Execute();
+ Assert.True(success);
+
+ replication.AssertWasCalled(x => x.AssignAllEmptySegments(node.Endpoint, new int[] { 0, 1 }));
+ }
+
+ [Fact]
+ public void WillLetNodeKnowAboutAnyEmptySegmentsAssignedToIt()
+ {
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ .Return(new []{0});
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ .Return(new ReplicationResult
+ {
+ PutRequests = new ExtendedPutRequest[0],
+ RemoveRequests = new ExtendedRemoveRequest[0],
+ Done = true
+ });
+ var success = command.Execute();
+ Assert.True(success);
+
+ node.AssertWasCalled(x => x.DoneReplicatingSegments(new int[] { 0 }));
+ }
+
+ [Fact]
+ public void WillNotTryToReplicaterangesThatWereEmptyAndAssigned()
+ {
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ .Return(new[] { 0 });
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ .Return(new ReplicationResult
+ {
+ PutRequests = new ExtendedPutRequest[0],
+ RemoveRequests = new ExtendedRemoveRequest[0],
+ Done = true
+ });
+ var success = command.Execute();
+ Assert.True(success);
+
+ replication.AssertWasNotCalled(x => x.ReplicateNextPage(node.Endpoint, 0));
+ }
+
+ [Fact]
+ public void WillTryToReplicaterangesThatWereNotEmpty()
+ {
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ .Return(new[] { 0 });
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ .Return(new ReplicationResult
+ {
+ PutRequests = new ExtendedPutRequest[0],
+ RemoveRequests = new ExtendedRemoveRequest[0],
+ Done = true
+ });
+ var success = command.Execute();
+ Assert.True(success);
+
+ replication.AssertWasCalled(x => x.ReplicateNextPage(node.Endpoint, 1));
+ }
+
+ [Fact]
+ public void WillPutReturnedItemsIntoStorage()
+ {
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ .Return(new[] { 0 });
+ var request = new ExtendedPutRequest
+ {
+ Bytes = new byte[]{1},
+ Key = "a",
+ };
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ .Return(new ReplicationResult
+ {
+ PutRequests = new[]{ request, },
+ RemoveRequests = new ExtendedRemoveRequest[0],
+ Done = true
+ });
+ var success = command.Execute();
+ Assert.True(success);
+
+ storage.AssertWasCalled(x => x.Put(guid, request));
+ }
+
+ [Fact]
+ public void WillRemoveReturnedRemovalFromStorage()
+ {
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ .Return(new[] { 0 });
+ var request = new ExtendedRemoveRequest
+ {
+ Key = "a",
+ };
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ .Return(new ReplicationResult
+ {
+ PutRequests = new ExtendedPutRequest[0],
+ RemoveRequests = new[] { request },
+ Done = true
+ });
+ var success = command.Execute();
+ Assert.True(success);
+
+ storage.AssertWasCalled(x => x.Remove(guid, request));
+ }
+
+ [Fact]
+ public void WhenSegmentReplicationFailsWillGiveUpTheSegment()
+ {
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ .Return(new int [0]);
+
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ .Throw(new IOException());
+ var success = command.Execute();
+ Assert.False(success);
+
+ node.AssertWasCalled(x=>x.GivingUpOn(0));
+ node.AssertWasCalled(x => x.GivingUpOn(1));
+ }
+
+ [Fact]
+ public void WhenEmptySegmentReplicationFailsWillGiveEverythingUp()
+ {
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ .Throw(new IOException());
+ var success = command.Execute();
+ Assert.False(success);
+
+ node.AssertWasCalled(x => x.GivingUpOn(0,1));
+ }
+
+ [Fact]
+ public void WillRepeatReplicationUntilGetDone()
+ {
+ replication.Stub(x => x.AssignAllEmptySegments(Arg<NodeEndpoint>.Is.Anything, Arg<int[]>.Is.Anything))
+ .Return(new[] { 0 });
+ var request = new ExtendedPutRequest
+ {
+ Bytes = new byte[] { 1 },
+ Key = "a",
+ };
+ for (int i = 0; i < 5; i++)
+ {
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ .Repeat.Once()
+ .Return(new ReplicationResult
+ {
+ PutRequests = new[] {request,},
+ RemoveRequests = new ExtendedRemoveRequest[0],
+ Done = false
+ });
+ }
+ replication.Stub(x => x.ReplicateNextPage(Arg<NodeEndpoint>.Is.Anything, Arg<int>.Is.Anything))
+ .Repeat.Once()
+ .Return(new ReplicationResult
+ {
+ PutRequests = new[] { request, },
+ RemoveRequests = new ExtendedRemoveRequest[0],
+ Done = true
+ });
+ var success = command.Execute();
+ Assert.True(success);
+
+ storage.AssertWasCalled(x => x.Put(guid, request), o => o.Repeat.Times(6));
+ }
+ }
+}
36 Rhino.DistributedHashTable.IntegrationTests/Properties/AssemblyInfo.cs
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Rhino.DistributedHashTable.IntegrationTests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Microsoft")]
+[assembly: AssemblyProduct("Rhino.DistributedHashTable.IntegrationTests")]
+[assembly: AssemblyCopyright("Copyright © Microsoft 2009")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("b32ec7b0-caf5-48d0-ab7d-650c12107937")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
88 ....DistributedHashTable.IntegrationTests/Rhino.DistributedHashTable.IntegrationTests.csproj
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.30729</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{9DE3DFC2-9F51-4339-9192-483C2B62CE01}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Rhino.DistributedHashTable.IntegrationTests</RootNamespace>
+ <AssemblyName>Rhino.DistributedHashTable.IntegrationTests</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Rhino.Mocks, Version=3.5.0.1337, Culture=neutral, PublicKeyToken=0b3305902db7183f, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\OSS\rhino-tools\build\net-3.5\debug\Rhino.Mocks.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ <Reference Include="xunit, Version=1.1.0.1323, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\OSS\rhino-tools\SharedLibs\xUnit\xunit.dll</HintPath>
+ </Reference>
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="FullIntegrationTest.cs" />
+ <Compile Include="Mini\DistributedHashTableReplicationTest.cs" />
+ <Compile Include="Mini\DistributedHashTableStorageTest.cs" />
+ <Compile Include="Mini\EsentTestBase.cs" />
+ <Compile Include="Mini\OnlineRangeReplicationCommandTest.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="WhenNodeIsStartedWillJoinMaster.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\pht\Rhino.PersistentHashTable\Rhino.PersistentHashTable.csproj">
+ <Project>{F30B2D63-CED5-4C8A-908F-0B5503D984A9}</Project>
+ <Name>Rhino.PersistentHashTable</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Rhino.DistributedHashTable\Rhino.DistributedHashTable.csproj">
+ <Project>{4E8D44D2-505D-488C-B92C-51147748B104}</Project>
+ <Name>Rhino.DistributedHashTable</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="app.config" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
49 Rhino.DistributedHashTable.IntegrationTests/WhenNodeIsStartedWillJoinMaster.cs
@@ -0,0 +1,49 @@
+//using System;
+//using System.Linq;
+//using System.ServiceModel;
+//using Rhino.DistributedHashTable.Hosting;
+//using Rhino.DistributedHashTable.Internal;
+//using Xunit;
+
+//namespace Rhino.DistributedHashTable.IntegrationTests
+//{
+// public class WhenNodeIsStarted : FullIntegrationTest, IDisposable
+// {
+// private readonly DistributedHashTableMasterHost masterHost;
+// private readonly DistributedHashTableStorageHost storageHost;
+// private readonly Uri masterUri = new Uri("net.tcp://" + Environment.MachineName + ":2200/master");
+
+// public WhenNodeIsStarted()
+// {
+// masterHost = new DistributedHashTableMasterHost();
+// storageHost = new DistributedHashTableStorageHost(
+// masterUri);
+
+// masterHost.Start();
+// storageHost.Start();
+// }
+
+
+// [Fact]
+// public void WillJoinMaster()
+// {
+// var channel = ChannelFactory<IDistributedHashTableMaster>.CreateChannel(
+// Binding.DhtDefault,
+// new EndpointAddress(masterUri));
+// try
+// {
+// var topology = channel.GetTopology();
+// Assert.True(topology.Segments.All(x => x.AssignedEndpoint == storageHost.Endpoint));
+// }
+// finally
+// {
+// ((IClientChannel)channel).Dispose();
+// }
+// }
+
+// public void Dispose()
+// {
+// masterHost.Dispose();
+// }
+// }
+//}
15 Rhino.DistributedHashTable.IntegrationTests/app.config
@@ -0,0 +1,15 @@
+<configuration>
+ <system.diagnostics>
+ <sources>
+ <source name="System.ServiceModel"
+ switchValue="Information, ActivityTracing"
+ propagateActivity="true">
+ <listeners>
+ <add name="traceListener"
+ type="System.Diagnostics.XmlWriterTraceListener"
+ initializeData= "Traces.svclog" />
+ </listeners>
+ </source>
+ </sources>
+ </system.diagnostics>
+</configuration>
122 Rhino.DistributedHashTable.Tests/BackCopiesBehavior.cs
@@ -0,0 +1,122 @@
+using System.Net;
+using Rhino.DistributedHashTable.Internal;
+using Xunit;
+using System.Linq;
+
+namespace Rhino.DistributedHashTable.Tests
+{
+ public class BackCopiesBehavior
+ {
+ public class OnEmptyMaster
+ {
+ private readonly DistributedHashTableMaster master;
+ private readonly NodeEndpoint endPoint;
+
+ public OnEmptyMaster()
+ {
+ master = new DistributedHashTableMaster();
+ endPoint = NodeEndpoint.ForTest(9);
+ }
+
+ [Fact]
+ public void AddingNewNodeResultInAllSegmentsHavingNoBackupCopies()
+ {
+ master.Join(endPoint);
+
+ Assert.True(master.Segments.All(x => x.Backups.Count == 0));
+ }
+ }
+
+ public class OnMasterWithOneExistingNode
+ {
+ private readonly DistributedHashTableMaster master;
+ private readonly NodeEndpoint endPoint;
+
+ public OnMasterWithOneExistingNode()
+ {
+ master = new DistributedHashTableMaster();
+ endPoint = NodeEndpoint.ForTest(9);
+
+ var existingEndpoint = NodeEndpoint.ForTest(3);
+ var ranges = master.Join(existingEndpoint);
+ master.CaughtUp(existingEndpoint, ranges.Select(x=>x.Index).ToArray());
+ }
+
+ [Fact]
+ public void AddingNewNodeResultInAllSegmentsHavingSingleBackupCopy()
+ {
+ var ranges = master.Join(endPoint);
+ master.CaughtUp(endPoint, ranges.Select(x => x.Index).ToArray());
+ Assert.True(master.Segments.All(x => x.Backups.Count == 1));
+ }
+
+ [Fact]
+ public void AddingNewNodeWillRaiseBackupChangedEvent()
+ {
+ bool wasChanged = false;
+ master.BackupChanged += (state, point, range) => wasChanged = true;
+ var ranges = master.Join(endPoint);
+ master.CaughtUp(endPoint, ranges.Select(x => x.Index).ToArray());
+
+ Assert.True(wasChanged);
+ }
+ }
+
+ public class OnMasterWithTwoNodes
+ {
+ private readonly DistributedHashTableMaster master;
+ private readonly NodeEndpoint endPoint;
+
+ public OnMasterWithTwoNodes()
+ {
+ master = new DistributedHashTableMaster();
+ endPoint = NodeEndpoint.ForTest(9);
+
+ var existingEndpoint = NodeEndpoint.ForTest(3);
+ var ranges = master.Join(existingEndpoint);
+ master.CaughtUp(existingEndpoint, ranges.Select(x => x.Index).ToArray());
+ var anotherPoint = NodeEndpoint.ForTest(10);
+ ranges = master.Join(anotherPoint);
+ master.CaughtUp(anotherPoint, ranges.Select(x => x.Index).ToArray());
+ }
+
+ [Fact]
+ public void AddingNewNodeResultInAllSegmentsHavingTwoBackupCopy()
+ {
+ var ranges = master.Join(endPoint);
+ master.CaughtUp(endPoint, ranges.Select(x => x.Index).ToArray());
+ Assert.True(master.Segments.All(x => x.Backups.Count == 2));
+ }
+ }
+
+ public class OnMasterWithThreeNodes
+ {
+ private readonly DistributedHashTableMaster master;
+ private readonly NodeEndpoint endPoint;
+
+ public OnMasterWithThreeNodes()
+ {
+ master = new DistributedHashTableMaster();
+ endPoint = NodeEndpoint.ForTest(9);
+
+ var existingEndpoint = NodeEndpoint.ForTest(3);
+ var ranges = master.Join(existingEndpoint);
+ master.CaughtUp(existingEndpoint, ranges.Select(x => x.Index).ToArray());
+ var anotherPoint = NodeEndpoint.ForTest(10);
+ ranges = master.Join(anotherPoint);
+ master.CaughtUp(anotherPoint, ranges.Select(x => x.Index).ToArray());
+ ranges = master.Join(endPoint);
+ master.CaughtUp(endPoint, ranges.Select(x => x.Index).ToArray());
+ }
+
+ [Fact]
+ public void AddingNewNodeResultInAllSegmentsHavingTwoBackupCopy()
+ {
+ var yetAnotherEndPoint = NodeEndpoint.ForTest(7);
+ var ranges = master.Join(yetAnotherEndPoint);
+ master.CaughtUp(yetAnotherEndPoint, ranges.Select(x => x.Index).ToArray());
+ Assert.True(master.Segments.All(x => x.Backups.Count == 2));
+ }
+ }
+ }
+}
33 Rhino.DistributedHashTable.Tests/MasterCaughtUpBehavior.cs
@@ -0,0 +1,33 @@
+using System.Linq;
+using System.Net;
+using Rhino.DistributedHashTable.Internal;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.Tests
+{
+ public class MasterCaughtUpBehavior
+ {
+ public class OnCaughtUp
+ {
+ private readonly DistributedHashTableMaster master;
+ private readonly NodeEndpoint endPoint;
+
+ public OnCaughtUp()
+ {
+ master = new DistributedHashTableMaster();
+ endPoint = NodeEndpoint.ForTest(9);
+ }
+
+ [Fact]
+ public void WillRaiseTopologyChangedEvent()
+ {
+ var ranges = master.Join(endPoint);
+
+ bool wasCalled = false;
+ master.TopologyChanged += () => wasCalled = true;
+ master.CaughtUp(endPoint, ranges.First().Index);
+ Assert.True(wasCalled);
+ }
+ }
+ }
+}
36 Rhino.DistributedHashTable.Tests/MasterGaveUpBehavior.cs
@@ -0,0 +1,36 @@
+using System.Linq;
+using Rhino.DistributedHashTable.Internal;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.Tests
+{
+ public class MasterGaveUpBehavior
+ {
+ public class OnGaveUp
+ {
+ private readonly DistributedHashTableMaster master;
+ private readonly NodeEndpoint endPoint;
+
+ public OnGaveUp()
+ {
+ master = new DistributedHashTableMaster();
+ master.CaughtUp(NodeEndpoint.ForTest(9),
+ master.Join(NodeEndpoint.ForTest(9)).Select(x=>x.Index).ToArray());
+ endPoint = NodeEndpoint.ForTest(5);
+ }
+
+ [Fact]
+ public void WillRemoveThePendingMoveFromTheSegment()
+ {
+ var ranges = master.Join(endPoint);
+
+ var range = ranges.First();
+ Assert.NotNull(range.InProcessOfMovingToEndpoint);
+
+ master.GaveUp(endPoint, range.Index);
+
+ Assert.Null(range.InProcessOfMovingToEndpoint);
+ }
+ }
+ }
+}
138 Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs
@@ -0,0 +1,138 @@
+using System.Linq;
+using System.Net;
+using Rhino.DistributedHashTable.Internal;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.Tests
+{
+ public class MasterJoinBehavior
+ {
+ public class OnEmptyMaster
+ {
+ private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
+ private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
+
+ [Fact]
+ public void AllSegmentsAreDirectlyAllocatedToEndpoint()
+ {
+ master.Join(endPoint);
+
+ Assert.True(master.Segments.All(x => x.AssignedEndpoint == endPoint));
+ }
+ }
+
+ public class JoiningTwice
+ {
+ private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
+ private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
+
+ [Fact]
+ public void IsNoOpp()
+ {
+ var ranges1 = master.Join(endPoint);
+ var ranges2 = master.Join(endPoint);
+
+ Assert.Equal(ranges1, ranges2);
+ }
+ }
+
+ public class NewEndpointJoiningNonEmptyMaster
+ {
+ private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
+ private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
+ private readonly NodeEndpoint newEndpoint = NodeEndpoint.ForTest(1);
+
+ public NewEndpointJoiningNonEmptyMaster()
+ {
+ master.Join(endPoint);
+ master.Join(newEndpoint);
+ }
+
+ [Fact]
+ public void SegmentAssignmentsWillNotChange()
+ {
+ Assert.True(master.Segments.All(x => x.AssignedEndpoint == endPoint));
+ }
+
+ [Fact]
+ public void WillNotChangeTotalNumberOfSegments()
+ {
+ Assert.Equal(8192, master.Segments.Count());
+ }
+
+ [Fact]
+ public void HalfOfTheSegmentsWillBeInTheProcessOfAssigningToNewEndpoint()
+ {
+ Assert.Equal(master.Segments.Count()/2,
+ master.Segments.Count(x => x.InProcessOfMovingToEndpoint == newEndpoint));
+ }
+ }
+
+ public class NewEndpointJoiningMasterWhenAnotherJoinIsInTheProcessOfJoining
+ {
+ private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
+ private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
+ private readonly NodeEndpoint newEndpoint = NodeEndpoint.ForTest(1);
+ private readonly NodeEndpoint anotherNodeInTheProcessOfJoining = NodeEndpoint.ForTest(2);
+
+ public NewEndpointJoiningMasterWhenAnotherJoinIsInTheProcessOfJoining()
+ {
+ master.Join(endPoint);
+ master.Join(anotherNodeInTheProcessOfJoining);
+ master.Join(newEndpoint);
+ }
+
+ [Fact]
+ public void SegmentAssignmentsWillNotChange()
+ {
+ Assert.True(master.Segments.All(x => x.AssignedEndpoint == endPoint));
+ }
+
+ [Fact]
+ public void ThirdOfTheAvailableSegmentsWillBeReservedForTheNewNode()
+ {
+ Assert.Equal(1365, master.Segments.Count(x => x.InProcessOfMovingToEndpoint == newEndpoint));
+ }
+
+ [Fact]
+ public void WillNotAffectJoiningOfExistingNode()
+ {
+ Assert.Equal(4096, master.Segments.Count(x => x.InProcessOfMovingToEndpoint == anotherNodeInTheProcessOfJoining));
+ }
+ }
+
+ public class NewEndpointJoiningMasterWithTwoNodes
+ {
+ private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
+ private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
+ private readonly NodeEndpoint newEndpoint = NodeEndpoint.ForTest(1);
+ private readonly NodeEndpoint anotherNodeInTheMaster = NodeEndpoint.ForTest(2);
+
+ public NewEndpointJoiningMasterWithTwoNodes()
+ {
+ master.Join(endPoint);
+ var ranges = master.Join(anotherNodeInTheMaster);
+ master.CaughtUp(anotherNodeInTheMaster, ranges.Select(x => x.Index).ToArray());
+ master.Join(newEndpoint);
+ }
+
+ [Fact]
+ public void WillNotChangeTotalNumberOfSegments()
+ {
+ Assert.Equal(8192, master.Segments.Count());
+ }
+
+ [Fact]
+ public void SegmentAssignmentsWillNotChange()
+ {
+ Assert.False(master.Segments.Any(x => x.AssignedEndpoint == newEndpoint));
+ }
+
+ [Fact]
+ public void ThirdOfTheAvailableSegmentsWillBeAssignedToNewNode()
+ {
+ Assert.Equal(2730, master.Segments.Count(x => x.InProcessOfMovingToEndpoint == newEndpoint));
+ }
+ }
+ }
+}
125 Rhino.DistributedHashTable.Tests/NodeReplicationBehavior.cs
@@ -0,0 +1,125 @@
+using System.Net;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Parameters;
+using Rhino.DistributedHashTable.Remote;
+using Rhino.Mocks;
+using Rhino.Queues;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.Tests
+{
+ public class NodeReplicationBehavior
+ {
+ public class WhenFinishedReplicatingSegment
+ {
+ private readonly DistributedHashTableNode node;
+ private readonly IDistributedHashTableMaster master;
+ private readonly IExecuter executer;
+ private readonly NodeEndpoint endPoint;
+
+ public WhenFinishedReplicatingSegment()
+ {
+ master = MockRepository.GenerateStub<IDistributedHashTableMaster>();
+ executer = MockRepository.GenerateStub<IExecuter>();
+ endPoint = NodeEndpoint.ForTest(1);
+ master.Stub(x => x.Join(Arg.Is(endPoint)))
+ .Return(new Segment[0]);
+ node = new DistributedHashTableNode(master, executer, new BinaryMessageSerializer(), endPoint, MockRepository.GenerateStub<IQueueManager>(),
+ MockRepository.GenerateStub<IDistributedHashTableNodeReplicationFactory>());
+ }
+
+ [Fact]
+ public void StateWillBeStarted()
+ {
+ node.DoneReplicatingSegments(new[] { 0 });
+ Assert.Equal(NodeState.Started, node.State);
+ }
+
+ [Fact]
+ public void WillLetMasterKnowItCaughtUp()
+ {
+ var range = new Segment();
+ node.DoneReplicatingSegments(new[] { 0 });
+ master.AssertWasCalled(x => x.CaughtUp(node.Endpoint, 0));
+ }
+ }
+
+ public class WhenReplicatingRequestToOwner
+ {
+ private readonly DistributedHashTableNode node;
+ private readonly IDistributedHashTableMaster master;
+ private readonly IExecuter executer;
+ private readonly NodeEndpoint endPoint;
+ private readonly IQueueManager queueManager;
+ private Topology topology;
+ private static NodeEndpoint backup1;
+ private static NodeEndpoint backup2;
+
+ public WhenReplicatingRequestToOwner()
+ {
+ master = MockRepository.GenerateStub<IDistributedHashTableMaster>();
+ endPoint = NodeEndpoint.ForTest(1);
+ backup1 = NodeEndpoint.ForTest(2);
+ backup2 = NodeEndpoint.ForTest(3);
+ topology = new Topology(new[]
+ {
+ new Segment
+ {
+ Index = 0,
+ AssignedEndpoint = endPoint,
+ Backups =
+ {
+ backup1,
+ backup2,
+ }
+ },
+ new Segment
+ {
+ Index = 1,
+ AssignedEndpoint = backup1,
+ Backups =
+ {
+ endPoint,
+ backup2,
+ }
+ },
+ });
+ master.Stub(x => x.GetTopology()).Return(topology);
+ executer = MockRepository.GenerateStub<IExecuter>();
+ master.Stub(x => x.Join(Arg.Is(endPoint)))
+ .Return(new Segment[0]);
+ queueManager = MockRepository.GenerateStub<IQueueManager>();
+
+ node = new DistributedHashTableNode(master, executer, new BinaryMessageSerializer(), endPoint,
+ queueManager, MockRepository.GenerateStub<IDistributedHashTableNodeReplicationFactory>());
+ node.Start();
+ }
+
+ [Fact]
+ public void WhenSendingToOwnerWillSendItToOwnerUri()
+ {
+ var request = new ExtendedPutRequest();
+ node.SendToOwner(0, new[] { request });
+ queueManager.Send(endPoint.Async, Arg<MessagePayload>.Is.TypeOf);
+ }
+
+ [Fact]
+ public void WhenSendingToOtherBackupsFromOwner()
+ {
+ var request = new ExtendedPutRequest();
+ node.SendToAllOtherBackups(0, new[] { request });
+ queueManager.Send(backup1.Async, Arg<MessagePayload>.Is.TypeOf);
+ queueManager.Send(backup2.Async, Arg<MessagePayload>.Is.TypeOf);
+ }
+
+ [Fact]
+ public void WhenSendingToOtherBackupsFromBackupNode()
+ {
+ var request = new ExtendedPutRequest();
+ node.SendToAllOtherBackups(1, new[] { request });
+ queueManager.Send(endPoint.Async, Arg<MessagePayload>.Is.TypeOf);
+ queueManager.Send(backup2.Async, Arg<MessagePayload>.Is.TypeOf);
+ }
+ }
+ }
+}
104 Rhino.DistributedHashTable.Tests/NodeStartupBehavior.cs
@@ -0,0 +1,104 @@
+using System.Net;
+using Rhino.DistributedHashTable.Commands;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Remote;
+using Rhino.Mocks;
+using Rhino.Queues;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.Tests
+{
+ public class NodeStartupBehavior
+ {
+ public class JoiningMaster
+ {
+ private readonly DistributedHashTableNode node;
+ private readonly IDistributedHashTableMaster master;
+ private readonly IExecuter executer;
+ private readonly NodeEndpoint endPoint;
+
+ public JoiningMaster()
+ {
+ master = MockRepository.GenerateStub<IDistributedHashTableMaster>();
+ executer = MockRepository.GenerateStub<IExecuter>();
+ endPoint = NodeEndpoint.ForTest(1);
+ master.Stub(x => x.Join(Arg.Is(endPoint)))
+ .Return(new Segment[0]);
+ node = new DistributedHashTableNode(master, executer, new BinaryMessageSerializer(), endPoint, MockRepository.GenerateStub<IQueueManager>(),
+ MockRepository.GenerateStub<IDistributedHashTableNodeReplicationFactory>());
+ }
+
+ [Fact]
+ public void ShouldJoinMasterOnStart()
+ {
+ node.Start();
+ master.AssertWasCalled(x => x.Join(node.Endpoint));
+ }
+
+ [Fact]
+ public void BeforeStartWouldBeInStateNotStarted()
+ {
+ Assert.Equal(NodeState.NotStarted, node.State);
+ }
+ }
+
+ public class HandlingAssignedSegments
+ {
+ private readonly DistributedHashTableNode node;
+ private readonly IDistributedHashTableMaster master;
+ private readonly IExecuter executer;
+ private readonly NodeEndpoint endPoint;
+
+ public HandlingAssignedSegments()
+ {
+ master = MockRepository.GenerateStub<IDistributedHashTableMaster>();
+ executer = MockRepository.GenerateStub<IExecuter>();
+ endPoint = NodeEndpoint.ForTest(1);
+ node = new DistributedHashTableNode(master, executer, new BinaryMessageSerializer(), endPoint, MockRepository.GenerateStub<IQueueManager>(),
+ MockRepository.GenerateStub<IDistributedHashTableNodeReplicationFactory>());
+ }
+
+ [Fact]
+ public void WhenSegmentsAssignedToNodeStateWillBeStarted()
+ {
+ master.Stub(x => x.Join(Arg.Is(endPoint)))
+ .Return(new[]
+ {
+ new Segment {AssignedEndpoint = endPoint},
+ });
+
+ node.Start();
+
+ Assert.Equal(NodeState.Started, node.State);
+ }
+
+ [Fact]
+ public void WhenNoSegmentIsAssignedToNodeStateWillBeStarting()
+ {
+ master.Stub(x => x.Join(Arg.Is(endPoint)))
+ .Return(new[]
+ {
+ new Segment {AssignedEndpoint = NodeEndpoint.ForTest(9)},
+ });
+
+ node.Start();
+
+ Assert.Equal(NodeState.Starting, node.State);
+ }
+
+ [Fact]
+ public void WillRegisterSegmentsNotAssignedToMeForReplication()
+ {
+ master.Stub(x => x.Join(Arg.Is(endPoint)))
+ .Return(new[]
+ {
+ new Segment {AssignedEndpoint = NodeEndpoint.ForTest(9)},
+ });
+
+ node.Start();
+
+ executer.AssertWasCalled(x => x.RegisterForExecution(Arg<OnlineSegmentReplicationCommand>.Is.TypeOf));
+ }
+ }
+ }
+}
10 Rhino.DistributedHashTable.Tests/Program.cs
@@ -0,0 +1,10 @@
+namespace Rhino.DistributedHashTable.Tests
+{
+ public class Program
+ {
+ private static void Main(string[] args)
+ {
+ new MasterJoinBehavior.NewEndpointJoiningMasterWithTwoNodes().SegmentAssignmentsWillNotChange();
+ }
+ }
+}
36 Rhino.DistributedHashTable.Tests/Properties/AssemblyInfo.cs
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Rhino.DistributedHashTable.Tests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Microsoft")]
+[assembly: AssemblyProduct("Rhino.DistributedHashTable.Tests")]
+[assembly: AssemblyCopyright("Copyright © Microsoft 2009")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("ebcd123a-9329-443b-b42a-bacc4ba96620")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
21 Rhino.DistributedHashTable.Tests/RangesBehavior.cs
@@ -0,0 +1,21 @@
+using System.Net;
+using Rhino.DistributedHashTable.Internal;
+using Xunit;
+using System.Linq;
+
+namespace Rhino.DistributedHashTable.Tests
+{
+ public class SegmentsBehavior
+ {
+ public class WhenMasterCreatesSegment
+ {
+ private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
+
+ [Fact]
+ public void ThereShouldBe8192Segments()
+ {
+ Assert.Equal(8192, master.Segments.Count());
+ }
+ }
+ }
+}
90 Rhino.DistributedHashTable.Tests/Rhino.DistributedHashTable.Tests.csproj
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.30729</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{C44B2EA9-28FD-4F05-B43B-A45231EE0FDC}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Rhino.DistributedHashTable.Tests</RootNamespace>
+ <AssemblyName>Rhino.DistributedHashTable.Tests</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <StartupObject>
+ </StartupObject>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Rhino.Mocks, Version=3.5.0.1337, Culture=neutral, PublicKeyToken=0b3305902db7183f, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\OSS\rhino-tools\build\net-3.5\debug\Rhino.Mocks.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ <Reference Include="xunit, Version=1.1.0.1323, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\OSS\rhino-tools\SharedLibs\xUnit\xunit.dll</HintPath>
+ </Reference>
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="BackCopiesBehavior.cs" />
+ <Compile Include="MasterCaughtUpBehavior.cs" />
+ <Compile Include="MasterGaveUpBehavior.cs" />
+ <Compile Include="MasterJoinBehavior.cs" />
+ <Compile Include="NodeReplicationBehavior.cs" />
+ <Compile Include="NodeStartupBehavior.cs" />
+ <Compile Include="Program.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="RangesBehavior.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\pht\Rhino.PersistentHashTable\Rhino.PersistentHashTable.csproj">
+ <Project>{F30B2D63-CED5-4C8A-908F-0B5503D984A9}</Project>
+ <Name>Rhino.PersistentHashTable</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\..\queues\Rhino.Queues\Rhino.Queues.csproj">
+ <Project>{398BF580-41F5-418E-A017-19D19B289A97}</Project>
+ <Name>Rhino.Queues</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Rhino.DistributedHashTable\Rhino.DistributedHashTable.csproj">
+ <Project>{4E8D44D2-505D-488C-B92C-51147748B104}</Project>
+ <Name>Rhino.DistributedHashTable</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
56 Rhino.DistributedHashTable.sln
@@ -0,0 +1,56 @@
+
+Microsoft Visual Studio Solution File, Format Version 10.00
+# Visual Studio 2008
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.DistributedHashTable", "Rhino.DistributedHashTable\Rhino.DistributedHashTable.csproj", "{4E8D44D2-505D-488C-B92C-51147748B104}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.DistributedHashTable.Tests", "Rhino.DistributedHashTable.Tests\Rhino.DistributedHashTable.Tests.csproj", "{C44B2EA9-28FD-4F05-B43B-A45231EE0FDC}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.DistributedHashTable.IntegrationTests", "Rhino.DistributedHashTable.IntegrationTests\Rhino.DistributedHashTable.IntegrationTests.csproj", "{9DE3DFC2-9F51-4339-9192-483C2B62CE01}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.PersistentHashTable", "..\pht\Rhino.PersistentHashTable\Rhino.PersistentHashTable.csproj", "{F30B2D63-CED5-4C8A-908F-0B5503D984A9}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.PersistentHashTable.Tests", "..\pht\Rhino.PersistentHashTable.Tests\Rhino.PersistentHashTable.Tests.csproj", "{3BC9E44C-6E3B-4E3C-B7E1-4850C3DB5251}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.Queues", "..\queues\Rhino.Queues\Rhino.Queues.csproj", "{398BF580-41F5-418E-A017-19D19B289A97}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.Queues.Tests", "..\queues\Rhino.Queues.Tests\Rhino.Queues.Tests.csproj", "{98D394CA-9FEF-49AD-8DAB-FE039152E001}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {4E8D44D2-505D-488C-B92C-51147748B104}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {4E8D44D2-505D-488C-B92C-51147748B104}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {4E8D44D2-505D-488C-B92C-51147748B104}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {4E8D44D2-505D-488C-B92C-51147748B104}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C44B2EA9-28FD-4F05-B43B-A45231EE0FDC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C44B2EA9-28FD-4F05-B43B-A45231EE0FDC}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C44B2EA9-28FD-4F05-B43B-A45231EE0FDC}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C44B2EA9-28FD-4F05-B43B-A45231EE0FDC}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9DE3DFC2-9F51-4339-9192-483C2B62CE01}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9DE3DFC2-9F51-4339-9192-483C2B62CE01}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9DE3DFC2-9F51-4339-9192-483C2B62CE01}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9DE3DFC2-9F51-4339-9192-483C2B62CE01}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F30B2D63-CED5-4C8A-908F-0B5503D984A9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F30B2D63-CED5-4C8A-908F-0B5503D984A9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F30B2D63-CED5-4C8A-908F-0B5503D984A9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F30B2D63-CED5-4C8A-908F-0B5503D984A9}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3BC9E44C-6E3B-4E3C-B7E1-4850C3DB5251}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3BC9E44C-6E3B-4E3C-B7E1-4850C3DB5251}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3BC9E44C-6E3B-4E3C-B7E1-4850C3DB5251}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3BC9E44C-6E3B-4E3C-B7E1-4850C3DB5251}.Release|Any CPU.Build.0 = Release|Any CPU
+ {398BF580-41F5-418E-A017-19D19B289A97}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {398BF580-41F5-418E-A017-19D19B289A97}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {398BF580-41F5-418E-A017-19D19B289A97}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {398BF580-41F5-418E-A017-19D19B289A97}.Release|Any CPU.Build.0 = Release|Any CPU
+ {98D394CA-9FEF-49AD-8DAB-FE039152E001}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {98D394CA-9FEF-49AD-8DAB-FE039152E001}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {98D394CA-9FEF-49AD-8DAB-FE039152E001}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {98D394CA-9FEF-49AD-8DAB-FE039152E001}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+EndGlobal
3 Rhino.DistributedHashTable/Assumptions.txt
@@ -0,0 +1,3 @@
+* Replication & failures
+ ======================
+ How do we handle online replication errors
7 Rhino.DistributedHashTable/Commands/ICommand.cs
@@ -0,0 +1,7 @@
+namespace Rhino.DistributedHashTable.Commands
+{
+ public interface ICommand
+ {
+ bool Execute();
+ }
+}
158 Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs
@@ -0,0 +1,158 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using log4net;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Remote;
+using Rhino.DistributedHashTable.Util;
+
+namespace Rhino.DistributedHashTable.Commands
+{
+ public class OnlineSegmentReplicationCommand : ICommand
+ {
+ private readonly ILog log = LogManager.GetLogger(typeof(OnlineSegmentReplicationCommand));
+
+ private readonly IDistributedHashTableNode node;
+ private readonly IDistributedHashTableNodeReplication otherNode;
+ private readonly string endpoint;
+ private readonly Segment[] ranges;
+
+ public OnlineSegmentReplicationCommand(
+ NodeEndpoint endpoint,
+ Segment[] ranges,
+ IDistributedHashTableNode node,
+ IDistributedHashTableNodeReplication otherNode)
+ {
+ this.endpoint = endpoint.Sync.ToString();
+ this.ranges = ranges;
+ this.node = node;
+ this.otherNode = otherNode;
+ }
+
+ public bool Execute()
+ {
+ if (log.IsDebugEnabled)
+ {
+ var sb = new StringBuilder("Replicating from ")
+ .Append(endpoint)
+ .AppendLine(" the following ranges:");
+
+ foreach (var range in ranges)
+ {
+ sb.Append("\t").Append(range).AppendLine();
+ }
+ log.Debug(sb);
+ }
+ var processedSegments = new List<int>();
+
+ try
+ {
+ var rangesToLoad = AssignAllEmptySegmentsFromEndpoint();
+
+ return ProcessSegmentsWithData(rangesToLoad, processedSegments) == false;
+ }
+ catch (Exception e)
+ {
+ log.Warn("Could not replicate ranges", e);
+ return false;
+ }
+ finally
+ {
+ if (processedSegments.Count != ranges.Length)
+ {
+ try
+ {
+ node.GivingUpOn(ranges.Select(x => x.Index).Except(processedSegments).ToArray());
+ }
+ catch (Exception e)
+ {
+ log.Error("Could not tell node that we are giving up on values", e);
+ }
+ }
+ }
+ }
+
+ private bool ProcessSegmentsWithData(IEnumerable<Segment> rangesToLoad,
+ ICollection<int> processedSegments)
+ {
+ var someFailed = false;
+ int numberOfFailures = 0;
+ foreach (var range in rangesToLoad)
+ {
+ try
+ {
+ ReplicateSegment(range);
+ processedSegments.Add(range.Index);
+ }
+ catch (Exception e)
+ {
+ log.Error("Failed to replicate range " + range, e);
+ numberOfFailures += 1;
+ if (numberOfFailures > 5)
+ {
+ log.WarnFormat("Failed to replicate {0} times, giving up on all additional ranges",
+ numberOfFailures);
+ break;
+ }
+ node.GivingUpOn(range.Index);
+ processedSegments.Add(range.Index);
+ someFailed |= true;
+ }
+ }
+ return someFailed;
+ }
+
+ private void ReplicateSegment(Segment range)
+ {
+ while (true)
+ {
+ log.DebugFormat("Starting replication of range [{0}] from {1}",
+ range,
+ endpoint);
+
+ var result = otherNode.ReplicateNextPage(node.Endpoint, range.Index);
+ log.DebugFormat("Replication of range [{0}] from {1} got {2} puts & {3} removals",
+ range,
+ endpoint,
+ result.PutRequests.Length,
+ result.RemoveRequests.Length);
+
+ if (result.PutRequests.Length != 0)
+ node.Storage.Put(node.GetTopologyVersion(), result.PutRequests);
+
+ if (result.RemoveRequests.Length != 0)
+ node.Storage.Remove(node.GetTopologyVersion(), result.RemoveRequests);
+
+ if (result.Done)
+ break;
+ }
+ node.DoneReplicatingSegments(range.Index);
+ }
+
+ private List<Segment> AssignAllEmptySegmentsFromEndpoint()
+ {
+ var remainingSegments = new List<Segment>();
+ foreach (var pagedSegment in ranges.Page(500))
+ {
+ var assignedSegments = otherNode.AssignAllEmptySegments(
+ node.Endpoint,
+ pagedSegment.Select(x=>x.Index).ToArray());
+ node.DoneReplicatingSegments(assignedSegments);
+ if (log.IsDebugEnabled)
+ {
+ var sb = new StringBuilder("The following empty ranges has been assigned from ")
+ .Append(endpoint).AppendLine(":")
+ .Append(" [")
+ .Append(string.Join(", ", assignedSegments.Select(x => x.ToString()).ToArray()))
+ .Append("]");
+ log.Debug(sb);
+ }
+ remainingSegments.AddRange(
+ pagedSegment.Where(x => assignedSegments.Contains(x.Index) == false)
+ );
+ }
+ return remainingSegments;
+ }
+ }
+}
87 Rhino.DistributedHashTable/Commands/RearrangeBackups.cs
@@ -0,0 +1,87 @@
+using System.Collections.Generic;
+using System.Linq;
+using Rhino.DistributedHashTable.Internal;
+
+namespace Rhino.DistributedHashTable.Commands
+{
+ public class RearrangeBackups
+ {
+ private readonly IEnumerable<NodeEndpoint> endPoints;
+ private readonly int fairDistribution;
+ private readonly int numberOfBackCopiesToKeep;
+ private readonly IEnumerable<Segment> ranges;
+ private readonly List<BackupCount> currentDistribution;
+
+ public ICollection<BackUpAdded> Changed = new List<BackUpAdded>();
+
+ public RearrangeBackups(IEnumerable<Internal.Segment> ranges,
+ IEnumerable<NodeEndpoint> endPoints,
+ int numberOfBackCopiesToKeep)
+ {
+ this.ranges = ranges;
+ this.endPoints = endPoints;
+ this.numberOfBackCopiesToKeep = numberOfBackCopiesToKeep;
+ fairDistribution = (ranges.Count() * numberOfBackCopiesToKeep) / endPoints.Count() + 1;
+ currentDistribution = PrepareDistributions();
+ }
+
+ public bool Rearranging()
+ {
+ foreach (var range in ranges.Where(x => x.Backups.Count < numberOfBackCopiesToKeep))
+ {
+ var endPointsToAddToBackups = currentDistribution
+ .Where(
+ x => range.AssignedEndpoint != x.Endpoint &&
+ range.InProcessOfMovingToEndpoint != x.Endpoint &&
+ x.Count < fairDistribution &&
+ range.Backups.Contains(x.Endpoint) == false
+ )
+ .Take(numberOfBackCopiesToKeep - range.Backups.Count);
+
+ foreach (var endPointToAddToBackups in endPointsToAddToBackups)
+ {
+ Changed.Add(new BackUpAdded
+ {
+ Endpoint = endPointToAddToBackups.Endpoint,
+ Segment = range
+ });
+ range.Backups.Add(endPointToAddToBackups.Endpoint);
+ endPointToAddToBackups.Count += 1;
+ }
+ }
+ return Changed.Count > 0;
+ }
+
+ private List<BackupCount> PrepareDistributions()
+ {
+ var currentDistribution = (
+ from backup in ranges.SelectMany(x => x.Backups)
+ group backup by backup
+ into g
+ select new BackupCount { Endpoint = g.Key, Count = g.Count() }
+ ).ToList();
+
+ foreach (var endPointThatHasNoBackups in endPoints.Except(currentDistribution.Select(x => x.Endpoint)))
+ {
+ currentDistribution.Add(new BackupCount
+ {
+ Count = 0,
+ Endpoint = endPointThatHasNoBackups
+ });
+ }
+ return currentDistribution;
+ }
+
+ private class BackupCount
+ {
+ public int Count;
+ public NodeEndpoint Endpoint;
+ }
+
+ public class BackUpAdded
+ {
+ public Segment Segment;
+ public NodeEndpoint Endpoint;
+ }
+ }
+}
38 Rhino.DistributedHashTable/Exceptions/SeeOtherException.cs
@@ -0,0 +1,38 @@
+using System;
+using System.Runtime.Serialization;
+using Rhino.DistributedHashTable.Internal;
+
+namespace Rhino.DistributedHashTable.Exceptions
+{
+ [Serializable]
+ public class SeeOtherException : Exception
+ {
+ //
+ // For guidelines regarding the creation of new exception types, see
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp
+ // and
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp
+ //
+
+ public Internal.NodeEndpoint Endpoint { get; set; }
+
+ public SeeOtherException()
+ {
+ }
+
+ public SeeOtherException(string message) : base(message)
+ {
+ }
+
+ public SeeOtherException(string message,
+ Exception inner) : base(message, inner)
+ {
+ }
+
+ protected SeeOtherException(
+ SerializationInfo info,
+ StreamingContext context) : base(info, context)
+ {
+ }
+ }
+}
35 Rhino.DistributedHashTable/Exceptions/TopologyVersionDoesNotMatchException.cs
@@ -0,0 +1,35 @@
+using System;
+using System.Runtime.Serialization;
+
+namespace Rhino.DistributedHashTable.Exceptions
+{
+ [Serializable]
+ public class TopologyVersionDoesNotMatchException : Exception
+ {
+ //
+ // For guidelines regarding the creation of new exception types, see
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp
+ // and
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp
+ //
+
+ public TopologyVersionDoesNotMatchException()
+ {
+ }
+
+ public TopologyVersionDoesNotMatchException(string message) : base(message)
+ {
+ }
+
+ public TopologyVersionDoesNotMatchException(string message,
+ Exception inner) : base(message, inner)
+ {
+ }
+
+ protected TopologyVersionDoesNotMatchException(
+ SerializationInfo info,
+ StreamingContext context) : base(info, context)
+ {
+ }
+ }
+}
148 Rhino.DistributedHashTable/Hosting/DistributedHashTableMasterHost.cs
@@ -0,0 +1,148 @@
+using System;
+using System.Net.Sockets;
+using Google.ProtocolBuffers;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Protocol;
+using NodeEndpoint=Rhino.DistributedHashTable.Internal.NodeEndpoint;
+using Segment=Rhino.DistributedHashTable.Protocol.Segment;
+
+namespace Rhino.DistributedHashTable.Hosting
+{
+ public class DistributedHashTableMasterHost : IDisposable
+ {
+ private readonly TcpListener listener;
+ private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
+
+ public DistributedHashTableMasterHost()
+ : this("master", 2200)
+ {
+ }
+
+ public DistributedHashTableMasterHost(string name,
+ int port)
+ {
+ }
+
+ public void Dispose()
+ {
+ listener.Stop();
+ }
+
+ public void Start()
+ {
+ listener.Start();
+ listener.BeginAcceptTcpClient(OnAcceptTcpClient, null);
+ }
+
+ private void OnAcceptTcpClient(IAsyncResult result)
+ {
+ TcpClient client;
+ try
+ {
+ client = listener.EndAcceptTcpClient(result);
+ }
+ catch (ObjectDisposedException)
+ {
+ return;
+ }
+
+ //this is done intentionally in a single threaded fashion
+ //the master is not a hot spot and it drastically simplify our life
+ //to avoid having to do multi threaded stuff here
+ //all calls to the master are also very short
+
+ try
+ {
+ using (client)
+ using (var stream = client.GetStream())
+ {
+ var writer = new MessageStreamWriter<MessageWrapper>(stream);
+ foreach (var wrapper in MessageStreamIterator<MessageWrapper>.FromStreamProvider(() => stream))
+ {
+ switch (wrapper.Type)
+ {
+ case MessageType.GetTopologyRequest:
+ HandleGetToplogy(stream, writer);
+ break;
+ case MessageType.JoinRequest:
+ HandleJoin(wrapper, writer);
+ break;
+ default:
+ throw new ArgumentOutOfRangeException();
+ }
+ }
+ writer.Flush();
+ stream.Flush();
+ }
+ }
+ finally
+ {
+ listener.BeginAcceptTcpClient(OnAcceptTcpClient, null);
+ }
+ }
+
+ private void HandleJoin(MessageWrapper wrapper,
+ MessageStreamWriter<MessageWrapper> writer)
+ {
+ var endpoint = wrapper.JoinRequest.EndpointJoining;
+ var segments = master.Join(new NodeEndpoint
+ {
+ Async = new Uri(endpoint.Async),
+ Sync = new Uri(endpoint.Sync)
+ });
+ var joinResponse = new JoinResponseMessage.Builder();
+ foreach (var segment in segments)
+ {
+ joinResponse.SegmentsList.Add(ConverToProtocolSegment(segment));
+ }
+ writer.Write(new MessageWrapper.Builder
+ {
+ Type = MessageType.JoinResult,
+ JoinResponse = joinResponse.Build()
+ }.Build());
+ }
+
+ private void HandleGetToplogy(NetworkStream stream,
+ MessageStreamWriter<MessageWrapper> writer)
+ {
+ var topology = master.GetTopology();
+ var topologyResultMessage = new TopologyResultMessage.Builder
+ {
+ Version = ByteString.CopyFrom(topology.Version.ToByteArray()),
+ TimestampAsDouble = topology.Timestamp.ToOADate(),
+ };
+ foreach (var segment in topology.Segments)
+ {
+ topologyResultMessage.SegmentsList.Add(ConverToProtocolSegment(segment));
+ }
+ writer.Write(new MessageWrapper.Builder
+ {
+ Type = MessageType.GetTopologyResult,
+ Topology = topologyResultMessage.Build()
+ }.Build());
+ }
+
+ private static Segment ConverToProtocolSegment(Internal.Segment segment)
+ {
+ return new Segment.Builder
+ {
+ Index = segment.Index,
+ AssignedEndpoint = new Protocol.NodeEndpoint.Builder
+ {
+ Async = segment.AssignedEndpoint.Async.ToString(),
+ Sync = segment.AssignedEndpoint.Sync.ToString()
+ }.Build(),
+ InProcessOfMovingToEndpoint = segment.InProcessOfMovingToEndpoint == null
+ ?
+ Protocol.NodeEndpoint.DefaultInstance
+ :
+ new Protocol.NodeEndpoint.Builder
+ {
+ Async = segment.InProcessOfMovingToEndpoint.Async.ToString(),
+ Sync = segment.InProcessOfMovingToEndpoint.Sync.ToString(),
+ }.Build(),
+ Version = ByteString.CopyFrom(segment.Version.ToByteArray()),
+ }.Build();
+ }
+ }
+}
75 Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
@@ -0,0 +1,75 @@
+//using System;
+//using System.Net;
+//using System.ServiceModel;
+//using Rhino.DistributedHashTable.Internal;
+//using Rhino.DistributedHashTable.Remote;
+//using Rhino.Queues;
+
+//namespace Rhino.DistributedHashTable.Hosting
+//{
+// public class DistributedHashTableStorageHost : IDisposable
+// {
+// private readonly IDistributedHashTableNode node;
+// private readonly QueueManager queueManager;
+// private readonly IDistributedHashTableMaster masterChannel;
+// private readonly IDistributedHashTableStorage storage;
+// private readonly ServiceHost serviceHost;
+
+// public DistributedHashTableStorageHost(Uri master)
+// : this(master, "node", 2201)
+// {
+// }
+
+// public DistributedHashTableStorageHost(
+// Uri master,
+// string name,
+// int port)
+// {
+// Endpoint = new NodeEndpoint
+// {
+// Sync = new UriBuilder("net.tcp://" + Environment.MachineName + "/" + name)
+// {
+// Port = port
+// }.Uri,
+// Async = new UriBuilder("rhino.queues://" + Environment.MachineName + "/" + name)
+// {
+// Port = port + 1
+// }.Uri,
+// };
+// queueManager = new QueueManager(new IPEndPoint(IPAddress.Any, port + 1), name + ".queue.esent");
+
+// masterChannel = new ChannelFactory<IDistributedHashTableMaster>(Binding.DhtDefault, master.ToString())
+// .CreateChannel();
+
+// node = new DistributedHashTableNode(
+// masterChannel,
+// new ThreadPoolExecuter(),
+// new NetDataMessageSerializer(),
+// Endpoint,
+// queueManager,
+// new DistributedHashTableNodeReplicationFactory()
+// );
+// storage = new DistributedHashTableStorage(name + ".data.esent", node);
+
+// serviceHost = new ServiceHost(storage);
+// serviceHost.AddServiceEndpoint(typeof (IDistributedHashTableStorage),
+// Binding.DhtDefault, node.Endpoint.Sync);
+// }
+
+// public NodeEndpoint Endpoint { get; private set; }
+
+// public void Start()
+// {
+// node.Start();
+// serviceHost.Open();
+// }
+
+// public void Dispose()
+// {
+// serviceHost.Close(TimeSpan.Zero);
+// storage.Dispose();
+// queueManager.Dispose();
+// ((IClientChannel)masterChannel).Close(TimeSpan.Zero);
+// }
+// }
+//}
17 Rhino.DistributedHashTable/IDistributedHashTable.cs
@@ -0,0 +1,17 @@
+using System.Collections.Generic;
+
+namespace Rhino.DistributedHashTable
+{
+ using PersistentHashTable;
+
+ public interface IDistributedHashTable
+ {
+ PutResult[] Put(params PutRequest[] valuesToAdd);
+ Value[][] Get(params GetRequest[] valuesToGet);
+ bool[] Remove(params RemoveRequest[] valuesToRemove);
+
+ int[] AddItems(params AddItemRequest[] itemsToAdd);
+ void RemoteItems(params RemoveItemRequest[] itemsToRemove);
+ KeyValuePair<int, byte[]>[] GetItems(GetItemsRequest request);
+ }
+}
8 Rhino.DistributedHashTable/Internal/BackupState.cs
@@ -0,0 +1,8 @@
+namespace Rhino.DistributedHashTable.Internal
+{
+ public enum BackupState
+ {
+ Added,
+ Removed
+ }
+}
27 Rhino.DistributedHashTable/Internal/BinaryMessageSerializer.cs
@@ -0,0 +1,27 @@
+using System;
+using System.IO;
+using System.Runtime.Serialization;
+using Rhino.DistributedHashTable.Parameters;
+
+namespace Rhino.DistributedHashTable.Internal
+{
+ public class BinaryMessageSerializer : IMessageSerializer
+ {
+ public byte[] Serialize(IExtendedRequest[] requests)
+ {
+ using (var stream = new MemoryStream())
+ {
+ new NetDataContractSerializer().Serialize(stream,requests);
+ return stream.ToArray();
+ }
+ }
+
+ public IExtendedRequest[] Deserialize(byte[] data)
+ {
+ using (var stream = new MemoryStream(data))
+ {
+ return (IExtendedRequest[])new NetDataContractSerializer().Deserialize(stream);
+ }
+ }
+ }
+}
8 Rhino.DistributedHashTable/Internal/Constants.cs
@@ -0,0 +1,8 @@
+namespace Rhino.DistributedHashTable.Internal
+{
+ public static class Constants
+ {
+ public const string RhinoDhtStartToken = "@rdht://";
+ public const string MovedSegment = RhinoDhtStartToken + "Segment/Moved/";
+ }
+}
232 Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs
@@ -0,0 +1,232 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using log4net;
+using Rhino.DistributedHashTable.Commands;
+using Rhino.DistributedHashTable.Util;
+
+namespace Rhino.DistributedHashTable.Internal
+{
+ /// <summary>
+ /// The master is a SIGNLE THREADED service that manages all
+ /// operations in the cluster.
+ /// </summary>
+ public class DistributedHashTableMaster : IDistributedHashTableMaster
+ {
+ public event Action<BackupState, NodeEndpoint, Segment> BackupChanged = delegate { };
+ public event Action TopologyChanged = delegate { };
+
+ private readonly ILog log = LogManager.GetLogger(typeof(DistributedHashTableMaster));
+
+ public Topology Topology { get; set; }
+
+ private readonly HashSet<NodeEndpoint> endpoints = new HashSet<NodeEndpoint>();
+
+ public IEnumerable<Segment> Segments
+ {
+ get { return Topology.Segments; }
+ }
+
+ public int NumberOfBackCopiesToKeep
+ {
+ get;
+ set;
+ }
+
+ public DistributedHashTableMaster()
+ {
+ NumberOfBackCopiesToKeep = 2;
+ Topology = new Topology(CreateDefaultSegments().ToArray());
+ }
+
+ private static IEnumerable<Segment> CreateDefaultSegments()
+ {
+ for (int i = 0; i < 8192; i++)
+ {
+ var range = new Segment
+ {
+ Index = i
+ };
+ yield return range;
+ }
+ }
+
+ /// <summary>
+ /// This method is called when a new node wants to join the cluster.
+ /// The result is the ranges that this node is responsible for, if it is an
+ /// existing one, or the list of ranges that it needs to pull from the currently
+ /// assigned node.
+ /// Note:
+ /// that if it needs to pull date from the currently assigned node, it will
+ /// also need to call the <see cref