Permalink
Browse files

Can now push updates for topology for all nodes.

Added tests to show joining of new node to cluster and how we replicate values to it.

git-svn-id: https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/experiments/dht/dht@2182 079b0acf-d9fa-0310-9935-e5ade295c882
  • Loading branch information...
ayenderahien
ayenderahien committed Jun 5, 2009
1 parent 716cc94 commit 32a179925606c39f770551b62228a1f332da8768
Showing with 3,086 additions and 557 deletions.
  1. +102 −0 Rhino.DistributedHashTable.IntegrationTests/ClusterTests.cs
  2. +16 −0 Rhino.DistributedHashTable.IntegrationTests/FullIntegrationTest.cs
  3. +132 −0 Rhino.DistributedHashTable.IntegrationTests/NodeOverTheNetwork.cs
  4. +5 −0 Rhino.DistributedHashTable.IntegrationTests/Rhino.DistributedHashTable.IntegrationTests.csproj
  5. +0 −3 Rhino.DistributedHashTable/Assumptions.txt
  6. +3 −36 Rhino.DistributedHashTable/Client/DistributedHashTableMasterClient.cs
  7. +86 −42 Rhino.DistributedHashTable/Client/DistributedHashTableStorageClient.cs
  8. +1 −0 Rhino.DistributedHashTable/Commands/ICommand.cs
  9. +55 −0 Rhino.DistributedHashTable/Commands/NotifyEndpointsAboutTopologyChange.cs
  10. +33 −31 Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs
  11. +43 −0 Rhino.DistributedHashTable/Commands/UpdateTopologyCommand.cs
  12. +25 −40 Rhino.DistributedHashTable/Hosting/DistributedHashTableMasterHost.cs
  13. +110 −96 Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
  14. +23 −12 Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs
  15. +11 −1 Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs
  16. +8 −0 Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs
  17. +3 −1 Rhino.DistributedHashTable/Internal/IDistributedHashTableNode.cs
  18. +2 −1 Rhino.DistributedHashTable/Internal/IExecuter.cs
  19. +45 −1 Rhino.DistributedHashTable/Internal/ThreadPoolExecuter.cs
  20. +1,998 −274 Rhino.DistributedHashTable/Protocol/ProtocolDef.cs
  21. +43 −1 Rhino.DistributedHashTable/Protocol/ProtocolDef.proto
  22. +35 −6 Rhino.DistributedHashTable/Protocol/ProtocolDef.proto.bin
  23. +7 −7 Rhino.DistributedHashTable/Remote/DistributedHashTableNodeReplication.cs
  24. +2 −2 Rhino.DistributedHashTable/Remote/IDistributedHashTableNodeReplication.cs
  25. +10 −0 Rhino.DistributedHashTable/Remote/IDistributedHashTableRemoteNode.cs
  26. +9 −0 Rhino.DistributedHashTable/Remote/IDistributedHashTableRemoteNodeFactory.cs
  27. +21 −0 Rhino.DistributedHashTable/Remote/NonPooledDistributedHashTableNodeFactory.cs
  28. +6 −1 Rhino.DistributedHashTable/Rhino.DistributedHashTable.csproj
  29. +7 −2 Rhino.DistributedHashTable/Todo.txt
  30. +245 −0 Rhino.DistributedHashTable/Util/PrtoBufConverter.cs
@@ -0,0 +1,102 @@
+using System;
+using System.Linq;
+using System.Threading;
+using Rhino.DistributedHashTable.Client;
+using Rhino.DistributedHashTable.Hosting;
+using Rhino.DistributedHashTable.Parameters;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.IntegrationTests
+{
+ public class ClusterTests
+ {
+ public class JoiningToCluster : FullIntegrationTest, IDisposable
+ {
+ private readonly DistributedHashTableMasterHost masterHost;
+ private readonly Uri masterUri = new Uri("rhino.dht://" + Environment.MachineName + ":2200/master");
+ private readonly DistributedHashTableStorageHost storageHostA;
+ private readonly DistributedHashTableStorageHost storageHostB;
+
+ public JoiningToCluster()
+ {
+ masterHost = new DistributedHashTableMasterHost();
+ storageHostA = new DistributedHashTableStorageHost(masterUri);
+ storageHostB = new DistributedHashTableStorageHost(masterUri, "nodeB", 2203);
+ masterHost.Start();
+ storageHostA.Start();
+ }
+
+ public void Dispose()
+ {
+ storageHostB.Dispose();
+ storageHostA.Dispose();
+ masterHost.Dispose();
+ }
+
+ [Fact]
+ public void TwoNodesCanJoinToTheCluster()
+ {
+ storageHostB.Start();
+
+ var countOfSegmentsInA = 0;
+ var countOfSegmentsInB = 0;
+ var masterProxy = new DistributedHashTableMasterClient(masterUri);
+
+ for (var i = 0; i < 50; i++)
+ {
+ var topology = masterProxy.GetTopology();
+ var results = topology.Segments.GroupBy(x => x.AssignedEndpoint)
+ .Select(x => new { x.Key, Count = x.Count() })
+ .ToDictionary(x => x.Key, x => x.Count);
+
+ results.TryGetValue(storageHostA.Endpoint, out countOfSegmentsInA);
+ results.TryGetValue(storageHostB.Endpoint, out countOfSegmentsInB);
+ if (countOfSegmentsInA == countOfSegmentsInB &&
+ countOfSegmentsInB == 4096)
+ return;
+ Thread.Sleep(500);
+ }
+ Assert.True(false,
+ "Should have found two nodes sharing responsability for the geometry: " + countOfSegmentsInA + " - " +
+ countOfSegmentsInB);
+ }
+
+ [Fact]
+ public void WillReplicateValuesToSecondJoin()
+ {
+ var masterProxy = new DistributedHashTableMasterClient(masterUri);
+ using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
+ {
+ var topology = masterProxy.GetTopology();
+ nodeA.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] {2, 2, 0, 0},
+ Key = "abc",
+ Segment = 1
+ });
+ }
+
+ storageHostB.Start(); //will replicate all odd segments here now
+
+ for (var i = 0; i < 500; i++)
+ {
+ var topology = masterProxy.GetTopology();
+ if(topology.Segments[1].AssignedEndpoint ==
+ storageHostB.Endpoint)
+ break;
+ Thread.Sleep(500);
+ }
+ using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint))
+ {
+ var topology = masterProxy.GetTopology();
+ var values = nodeB.Get(topology.Version, new ExtendedGetRequest
+ {
+ Key = "abc",
+ Segment = 1
+ });
+ Assert.Equal(new byte[] {2, 2, 0, 0}, values[0][0].Data);
+ }
+ }
+ }
+ }
+}
@@ -1,9 +1,20 @@
using System.IO;
+using log4net.Appender;
+using log4net.Config;
+using log4net.Layout;
namespace Rhino.DistributedHashTable.IntegrationTests
{
public class FullIntegrationTest
{
+ static FullIntegrationTest()
+ {
+ BasicConfigurator.Configure(new DebugAppender
+ {
+ Layout = new SimpleLayout()
+ });
+ }
+
public FullIntegrationTest()
{
if(Directory.Exists("node.queue.esent"))
@@ -12,6 +23,11 @@ public FullIntegrationTest()
if (Directory.Exists("node.data.esent"))
Directory.Delete("node.data.esent", true);
+ if (Directory.Exists("nodeB.queue.esent"))
+ Directory.Delete("nodeB.queue.esent", true);
+
+ if (Directory.Exists("nodeB.data.esent"))
+ Directory.Delete("nodeB.data.esent", true);
}
}
}
@@ -1,6 +1,7 @@
using System;
using Rhino.DistributedHashTable.Client;
using Rhino.DistributedHashTable.Hosting;
+using Rhino.DistributedHashTable.Internal;
using Rhino.DistributedHashTable.Parameters;
using Xunit;
using System.Linq;
@@ -29,6 +30,137 @@ public void NodeHaveJoinedMasterAutomatically()
Assert.True(topology.Segments.All(x => x.AssignedEndpoint == storageHost.Endpoint));
}
+ [Fact]
+ public void CanReplicateEmptySegments()
+ {
+ using (var storageProxy = new DistributedHashTableStorageClient(storageHost.Endpoint))
+ {
+ var segments = new[]{1,2,3};
+
+ var assignedSegments = storageProxy.AssignAllEmptySegments(NodeEndpoint.ForTest(13), segments);
+
+ Assert.Equal(segments, assignedSegments);
+ }
+ }
+
+ [Fact]
+ public void WhenReplicatingEmptySegmentsWillNotReplicateSegmentsThatHasValues()
+ {
+ using (var storageProxy = new DistributedHashTableStorageClient(storageHost.Endpoint))
+ {
+ var topology = new DistributedHashTableMasterClient(masterUri).GetTopology();
+ storageProxy.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] {1, 2, 3},
+ Key = "test",
+ Segment = 1,
+ });
+
+ var segments = new[] { 1, 2, 3 };
+
+ var assignedSegments = storageProxy.AssignAllEmptySegments(NodeEndpoint.ForTest(13), segments);
+
+ Assert.Equal(new[]{2,3}, assignedSegments);
+ }
+ }
+
+ [Fact]
+ public void CanReplicateSegmentWithData()
+ {
+ using (var storageProxy = new DistributedHashTableStorageClient(storageHost.Endpoint))
+ {
+ var topology = new DistributedHashTableMasterClient(masterUri).GetTopology();
+ storageProxy.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] { 1, 2, 3 },
+ Key = "test",
+ Segment = 1,
+ });
+
+ var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+
+ Assert.False(result.Done);
+ Assert.Equal("test", result.PutRequests[0].Key);
+ }
+ }
+
+ [Fact]
+ public void CanReplicateSegmentWithDataWhileStillServingRequestForSegment()
+ {
+ using (var storageProxy = new DistributedHashTableStorageClient(storageHost.Endpoint))
+ {
+ var topology = new DistributedHashTableMasterClient(masterUri).GetTopology();
+ storageProxy.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] { 1, 2, 3 },
+ Key = "test",
+ Segment = 1,
+ });
+
+ var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ Assert.Equal("test", result.PutRequests[0].Key);
+
+ storageProxy.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] { 1, 2, 3 },
+ Key = "test2",
+ Segment = 1,
+ });
+
+ result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ Assert.Equal("test2", result.PutRequests[0].Key);
+ }
+ }
+
+ [Fact]
+ public void CanReplicateSegmentWithDataWhileRemovingItems()
+ {
+ using (var storageProxy = new DistributedHashTableStorageClient(storageHost.Endpoint))
+ {
+ var topology = new DistributedHashTableMasterClient(masterUri).GetTopology();
+ storageProxy.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] { 1, 2, 3 },
+ Key = "test",
+ Segment = 1,
+ });
+
+ var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ Assert.Equal("test", result.PutRequests[0].Key);
+
+ storageProxy.Remove(topology.Version, new ExtendedRemoveRequest()
+ {
+ Key = "test",
+ Segment = 1,
+ SpecificVersion = result.PutRequests[0].ReplicationVersion
+ });
+
+ result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ Assert.Equal("test", result.RemoveRequests[0].Key);
+ }
+ }
+
+ [Fact]
+ public void WhenFinishedReplicatingWillTellTheReplicatorSo()
+ {
+ using (var storageProxy = new DistributedHashTableStorageClient(storageHost.Endpoint))
+ {
+ var topology = new DistributedHashTableMasterClient(masterUri).GetTopology();
+ storageProxy.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] { 1, 2, 3 },
+ Key = "test",
+ Segment = 1,
+ });
+
+ var result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ Assert.Equal("test", result.PutRequests[0].Key);
+
+ result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
+ Assert.True(result.Done);
+ }
+ }
+
[Fact]
public void CanPutItem()
{
@@ -31,6 +31,10 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
+ <Reference Include="log4net, Version=1.2.10.0, Culture=neutral, PublicKeyToken=1b44e1d426115821, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Rhino.DistributedHashTable\bin\Debug\log4net.dll</HintPath>
+ </Reference>
<Reference Include="Rhino.Mocks, Version=3.5.0.1337, Culture=neutral, PublicKeyToken=0b3305902db7183f, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\..\OSS\rhino-tools\build\net-3.5\debug\Rhino.Mocks.dll</HintPath>
@@ -56,6 +60,7 @@
</Reference>
</ItemGroup>
<ItemGroup>
+ <Compile Include="ClusterTests.cs" />
<Compile Include="FullIntegrationTest.cs" />
<Compile Include="MasterOverTheNetwork.cs" />
<Compile Include="Mini\DistributedHashTableReplicationTest.cs" />
@@ -1,3 +0,0 @@
-* Replication & failures
- ======================
- How do we handle online replication errors
@@ -65,11 +65,11 @@ public Segment[] Join(NodeEndpoint endpoint)
var response = union.JoinResponse;
- return response.SegmentsList.Select(x => ConvertSegment(x)).ToArray();
+ return response.SegmentsList.Select(x => x.GetSegment()).ToArray();
});
}
- private MasterMessageUnion ReadReply(MasterMessageType responses, Stream stream)
+ private static MasterMessageUnion ReadReply(MasterMessageType responses, Stream stream)
{
var iterator = MessageStreamIterator<MasterMessageUnion>.FromStreamProvider(() => new UndisposableStream(stream));
var union = iterator.First();
@@ -121,43 +121,10 @@ public Topology GetTopology()
var union = ReadReply(MasterMessageType.GetTopologyResult, stream);
- var topology = union.Topology;
- var segments = topology.SegmentsList.Select(x => ConvertSegment(x));
- return new Topology(segments.ToArray(), new Guid(topology.Version.ToByteArray()))
- {
- Timestamp = DateTime.FromOADate(topology.TimestampAsDouble)
- };
+ return union.Topology.GetTopology();
});
}
- private static Segment ConvertSegment(Protocol.Segment x)
- {
- return new Segment
- {
- Version = new Guid(x.Version.ToByteArray()),
- AssignedEndpoint = x.AssignedEndpoint != Protocol.NodeEndpoint.DefaultInstance
- ? new NodeEndpoint
- {
- Async = new Uri(x.AssignedEndpoint.Async),
- Sync = new Uri(x.AssignedEndpoint.Sync)
- }
- : null,
- InProcessOfMovingToEndpoint = x.InProcessOfMovingToEndpoint != Protocol.NodeEndpoint.DefaultInstance
- ? new NodeEndpoint
- {
- Async = new Uri(x.InProcessOfMovingToEndpoint.Async),
- Sync = new Uri(x.InProcessOfMovingToEndpoint.Sync)
- }
- : null,
- Index = x.Index,
- Backups = x.BackupsList.Select(b => new NodeEndpoint
- {
- Async = new Uri(b.Async),
- Sync = new Uri(b.Sync)
- }).ToSet(),
- };
- }
-
public void GaveUp(NodeEndpoint endpoint,
params int[] rangesGivingUpOn)
{
Oops, something went wrong.

0 comments on commit 32a1799

Please sign in to comment.