Permalink
Browse files

Updating esent to latest

Fixing some issues with backup information across the cluster

git-svn-id: https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/experiments/dht/dht@2189 079b0acf-d9fa-0310-9935-e5ade295c882
  • Loading branch information...
ayenderahien
ayenderahien committed Jun 6, 2009
1 parent d413eeb commit f1aa284f0c6d80f83ae97103dcd6abf1b4cd365f
@@ -3,7 +3,9 @@
using System.Threading;
using Rhino.DistributedHashTable.Client;
using Rhino.DistributedHashTable.Hosting;
+using Rhino.DistributedHashTable.Internal;
using Rhino.DistributedHashTable.Parameters;
+using Rhino.PersistentHashTable;
using Xunit;
namespace Rhino.DistributedHashTable.ClusterTests
@@ -33,6 +35,56 @@ public void Dispose()
masterHost.Dispose();
}
+ [Fact]
+ public void AfterBothNodesJoinedWillAutomaticallyReplicateToBackupNode()
+ {
+ storageHostB.Start();
+
+ var masterProxy = new DistributedHashTableMasterClient(masterUri);
+
+ for (var i = 0; i < 50; i++)
+ {
+ var topology = masterProxy.GetTopology();
+ var count = topology.Segments
+ .Where(x => x.AssignedEndpoint == storageHostA.Endpoint)
+ .Count();
+
+ if (count == 4096)
+ break;
+ Thread.Sleep(500);
+ }
+
+ using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
+ {
+ var topology = masterProxy.GetTopology();
+ nodeA.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] { 2, 2, 0, 0 },
+ Key = "abc",
+ Segment = 0
+ });
+ }
+
+ using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint))
+ {
+ var topology = masterProxy.GetTopology();
+ Value[][] values = null;
+ for (var i = 0; i < 100; i++)
+ {
+ values = nodeB.Get(topology.Version, new ExtendedGetRequest
+ {
+ Key = "abc",
+ Segment = 0
+ });
+ if (values[0].Length != 0)
+ break;
+ Thread.Sleep(250);
+ }
+ Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data);
+ }
+ }
+
+
[Fact]
public void TwoNodesCanJoinToTheCluster()
{
@@ -52,13 +104,38 @@ public void TwoNodesCanJoinToTheCluster()
results.TryGetValue(storageHostA.Endpoint, out countOfSegmentsInA);
results.TryGetValue(storageHostB.Endpoint, out countOfSegmentsInB);
if (countOfSegmentsInA == countOfSegmentsInB &&
- countOfSegmentsInB == 4096)
+ countOfSegmentsInB == 4096)
return;
Thread.Sleep(500);
}
Assert.True(false,
- "Should have found two nodes sharing responsability for the geometry: " + countOfSegmentsInA + " - " +
- countOfSegmentsInB);
+ "Should have found two nodes sharing responsability for the geometry: " + countOfSegmentsInA + " - " +
+ countOfSegmentsInB);
+ }
+
+ [Fact]
+ public void AfterTwoNodesJoinTheClusterEachSegmentHasBackup()
+ {
+ storageHostB.Start();
+
+ var masterProxy = new DistributedHashTableMasterClient(masterUri);
+
+ Topology topology;
+ for (var i = 0; i < 50; i++)
+ {
+ topology = masterProxy.GetTopology();
+ var count = topology.Segments
+ .Where(x => x.AssignedEndpoint == storageHostA.Endpoint)
+ .Count();
+
+ if (count == 4096)
+ break;
+ Thread.Sleep(500);
+ }
+ topology = masterProxy.GetTopology();
+ Assert.True(
+ topology.Segments.All(x => x.Backups.Count > 0)
+ );
}
[Fact]
@@ -70,7 +147,7 @@ public void WillReplicateValuesToSecondJoin()
var topology = masterProxy.GetTopology();
nodeA.Put(topology.Version, new ExtendedPutRequest
{
- Bytes = new byte[] {2, 2, 0, 0},
+ Bytes = new byte[] { 2, 2, 0, 0 },
Key = "abc",
Segment = 1
});
@@ -81,7 +158,7 @@ public void WillReplicateValuesToSecondJoin()
for (var i = 0; i < 500; i++)
{
var topology = masterProxy.GetTopology();
- if(topology.Segments[1].AssignedEndpoint ==
+ if (topology.Segments[1].AssignedEndpoint ==
storageHostB.Endpoint)
break;
Thread.Sleep(500);
@@ -94,7 +171,7 @@ public void WillReplicateValuesToSecondJoin()
Key = "abc",
Segment = 1
});
- Assert.Equal(new byte[] {2, 2, 0, 0}, values[0][0].Data);
+ Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data);
}
}
}
@@ -6,7 +6,7 @@
<ProductVersion>9.0.30729</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{D9377684-AC14-47F6-A3E8-E84AD0FC2BAA}</ProjectGuid>
- <OutputType>Exe</OutputType>
+ <OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Rhino.DistributedHashTable.ClusterTests</RootNamespace>
<AssemblyName>Rhino.DistributedHashTable.ClusterTests</AssemblyName>
@@ -59,7 +59,6 @@
<Compile Include="FullIntegrationTest.cs" />
<Compile Include="MasterOverTheNetwork.cs" />
<Compile Include="NodeOverTheNetwork.cs" />
- <Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
@@ -8,7 +8,7 @@
using Rhino.PersistentHashTable;
using Xunit;
-namespace Rhino.DistributedHashTable.IntegrationTests.Mini
+namespace Rhino.DistributedHashTable.IntegrationTests
{
public class DistributedHashTableReplicationTest
{
@@ -5,7 +5,7 @@
using Rhino.PersistentHashTable;
using Xunit;
-namespace Rhino.DistributedHashTable.IntegrationTests.Mini
+namespace Rhino.DistributedHashTable.IntegrationTests
{
public class DistributedHashTableStorageTest
{
@@ -1,6 +1,6 @@
using System.IO;
-namespace Rhino.DistributedHashTable.IntegrationTests.Mini
+namespace Rhino.DistributedHashTable.IntegrationTests
{
public class EsentTestBase
{
@@ -7,7 +7,7 @@
using Rhino.Mocks;
using Xunit;
-namespace Rhino.DistributedHashTable.IntegrationTests.Mini
+namespace Rhino.DistributedHashTable.IntegrationTests
{
public class OnlineSegmentReplicationCommandTest : EsentTestBase
{
@@ -147,20 +147,17 @@ private void ReplicateSegment(Segment segment)
private List<Segment> AssignAllEmptySegmentsFromEndpoint(List<int> processedSegments)
{
var remainingSegments = new List<Segment>();
- foreach (var pagedSegment in segments.Page(500))
- {
- var assignedSegments = otherNode.AssignAllEmptySegments(
- node.Endpoint,
- pagedSegment.Select(x=>x.Index).ToArray());
-
- processedSegments.AddRange(assignedSegments);
- node.DoneReplicatingSegments(type,assignedSegments);
-
- log.DebugFormat("{0} empty segments assigned from {1}", assignedSegments.Length, endpoint);
- remainingSegments.AddRange(
- pagedSegment.Where(x => assignedSegments.Contains(x.Index) == false)
- );
- }
+ var assignedSegments = otherNode.AssignAllEmptySegments(
+ node.Endpoint,
+ segments.Select(x => x.Index).ToArray());
+
+ processedSegments.AddRange(assignedSegments);
+ node.DoneReplicatingSegments(type, assignedSegments);
+
+ log.DebugFormat("{0} empty segments assigned from {1}", assignedSegments.Length, endpoint);
+ remainingSegments.AddRange(
+ segments.Where(x => assignedSegments.Contains(x.Index) == false)
+ );
return remainingSegments;
}
@@ -1,4 +1,5 @@
using System;
+using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
@@ -118,6 +119,10 @@ private void OnBeginAcceptTcpClient(IAsyncResult result)
stream.Flush();
}
}
+ catch(IOException)
+ {
+ // disconnected, so nothing else to do
+ }
catch (SeeOtherException e)
{
writer.Write(new StorageMessageUnion.Builder
@@ -1,14 +1,11 @@
using System;
using System.Collections.Generic;
-using System.IO;
using System.Linq;
using System.Threading;
using System.Transactions;
-using Google.ProtocolBuffers;
using log4net;
using Rhino.DistributedHashTable.Commands;
using Rhino.DistributedHashTable.Parameters;
-using Rhino.DistributedHashTable.Protocol;
using Rhino.DistributedHashTable.Remote;
using Rhino.DistributedHashTable.Util;
using Rhino.Queues;
@@ -62,16 +59,20 @@ private void BackgroundReplication()
if (numOfErrors > 5)
{
log.ErrorFormat("Could not process message {0}, failed too many times, discarding", id);
+ tx.Complete(); //to make sure that the message is consumed
continue;
}
var requests = messageSerializer.Deserialize(message.Data);
var puts = requests.OfType<ExtendedPutRequest>().ToArray();
var removes = requests.OfType<ExtendedRemoveRequest>().ToArray();
+ log.DebugFormat("Accepting {0} puts and {1} removes for background replication",
+ puts.Length, removes.Length);
if (puts.Length > 0)
Storage.Put(GetTopologyVersion(), puts);
if (removes.Length > 0)
Storage.Remove(GetTopologyVersion(), removes);
+
tx.Complete();
}
errors.Remove(id);
@@ -119,6 +120,9 @@ public bool IsSegmentOwned(int segment)
var ownerSegment = Topology.GetSegment(segment);
if (ownerSegment.AssignedEndpoint == null)
return;
+ log.DebugFormat("Sending {0} requests to owner {1}",
+ requests.Length,
+ ownerSegment.AssignedEndpoint.Async);
queueManager.Send(ownerSegment.AssignedEndpoint.Async,
new MessagePayload
{
@@ -138,6 +142,9 @@ public bool IsSegmentOwned(int segment)
{
if (otherBackup == null)
continue;
+ log.DebugFormat("Sending {0} requests to backup {1}",
+ requests.Length,
+ ownerSegment.AssignedEndpoint.Async);
queueManager.Send(otherBackup.Async,
new MessagePayload
{
@@ -36,14 +36,20 @@ public Segment()
public override string ToString()
{
- return string.Format("Index: {0,10}, AssignedEndpoint: {1}, InProcessOfMovingToEndpoint: {2}",
- Index, AssignedEndpoint, InProcessOfMovingToEndpoint);
+ Uri assigned = null;
+ Uri inProcess = null;
+ if(AssignedEndpoint!=null)
+ assigned = AssignedEndpoint.Sync;
+ if (InProcessOfMovingToEndpoint != null)
+ inProcess = InProcessOfMovingToEndpoint.Sync;
+ return string.Format("Index: {0,4}, Assigned: {1}, Tentative: {2}, Backups: {3}, Tentative Backups: {4}",
+ Index, assigned, inProcess, Backups.Count, PendingBackups.Count);
}
public bool BelongsTo(NodeEndpoint endpoint)
{
return endpoint.Equals(AssignedEndpoint) ||
- endpoint.Equals(InProcessOfMovingToEndpoint);
+ endpoint.Equals(InProcessOfMovingToEndpoint);
}
public bool Match(Segment other)
Oops, something went wrong.

0 comments on commit f1aa284

Please sign in to comment.