Skip to content
Browse files

Starting to work on queued replication

  • Loading branch information...
1 parent bd30ebc commit 75b1d0933ff0a210809fb4ccb3903efa283786ca ayenderahien committed Jun 5, 2009
View
10 Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs
@@ -6,7 +6,7 @@
using Rhino.DistributedHashTable.Parameters;
using Xunit;
-namespace Rhino.DistributedHashTable.IntegrationTests
+namespace Rhino.DistributedHashTable.ClusterTests
{
public class ClusterTests
{
@@ -52,13 +52,13 @@ public void TwoNodesCanJoinToTheCluster()
results.TryGetValue(storageHostA.Endpoint, out countOfSegmentsInA);
results.TryGetValue(storageHostB.Endpoint, out countOfSegmentsInB);
if (countOfSegmentsInA == countOfSegmentsInB &&
- countOfSegmentsInB == 4096)
+ countOfSegmentsInB == 4096)
return;
Thread.Sleep(500);
}
Assert.True(false,
- "Should have found two nodes sharing responsability for the geometry: " + countOfSegmentsInA + " - " +
- countOfSegmentsInB);
+ "Should have found two nodes sharing responsability for the geometry: " + countOfSegmentsInA + " - " +
+ countOfSegmentsInB);
}
[Fact]
@@ -82,7 +82,7 @@ public void WillReplicateValuesToSecondJoin()
{
var topology = masterProxy.GetTopology();
if(topology.Segments[1].AssignedEndpoint ==
- storageHostB.Endpoint)
+ storageHostB.Endpoint)
break;
Thread.Sleep(500);
}
View
2 Rhino.DistributedHashTable.ClusterTests/FullIntegrationTest.cs
@@ -3,7 +3,7 @@
using log4net.Config;
using log4net.Layout;
-namespace Rhino.DistributedHashTable.IntegrationTests
+namespace Rhino.DistributedHashTable.ClusterTests
{
public class FullIntegrationTest
{
View
12 Rhino.DistributedHashTable.ClusterTests/MasterOverTheNetwork.cs
@@ -5,7 +5,7 @@
using Xunit;
using System.Linq;
-namespace Rhino.DistributedHashTable.IntegrationTests
+namespace Rhino.DistributedHashTable.ClusterTests
{
public class MasterOverTheNetwork
{
@@ -39,7 +39,7 @@ public void CanJoinToMaster()
{
var endpoint = new NodeEndpoint
{
- Async = new Uri("rhino.queues://localhost:2202"),
+ Async = new Uri("rhino.queues://localhost:2202/replication"),
Sync = new Uri("rhino.dht://localhost:2201")
};
var segments = masterProxy.Join(endpoint);
@@ -52,12 +52,12 @@ public void CanCatchUpOnSegment()
{
var endpoint = new NodeEndpoint
{
- Async = new Uri("rhino.queues://localhost:2202"),
+ Async = new Uri("rhino.queues://localhost:2202/replication"),
Sync = new Uri("rhino.dht://localhost:2201")
};
masterProxy.Join(new NodeEndpoint
{
- Async = new Uri("rhino.queues://other:2202"),
+ Async = new Uri("rhino.queues://other:2202/replication"),
Sync = new Uri("rhino.dht://other:2201")
});
var segments = masterProxy.Join(endpoint);
@@ -77,14 +77,14 @@ public void CanGiveUpOnSegment()
{
var existingEndpoint = new NodeEndpoint
{
- Async = new Uri("rhino.queues://other:2202"),
+ Async = new Uri("rhino.queues://other:2202/replication"),
Sync = new Uri("rhino.dht://other:2201")
};
masterProxy.Join(existingEndpoint);
var newEndpoint = new NodeEndpoint
{
- Async = new Uri("rhino.queues://localhost:2202"),
+ Async = new Uri("rhino.queues://localhost:2202/replication"),
Sync = new Uri("rhino.dht://localhost:2201")
};
View
6 Rhino.DistributedHashTable.ClusterTests/NodeOverTheNetwork.cs
@@ -6,7 +6,7 @@
using Xunit;
using System.Linq;
-namespace Rhino.DistributedHashTable.IntegrationTests
+namespace Rhino.DistributedHashTable.ClusterTests
{
public class NodeOverTheNetwork : FullIntegrationTest, IDisposable
{
@@ -132,7 +132,7 @@ public void CanReplicateSegmentWithDataWhileRemovingItems()
{
Key = "test",
Segment = 1,
- SpecificVersion = result.PutRequests[0].ReplicationVersion
+ SpecificVersion = result.PutRequests[0].ReplicationVersion
});
result = storageProxy.ReplicateNextPage(NodeEndpoint.ForTest(13), 1);
@@ -179,7 +179,7 @@ public void CanPutItem()
var values = storageProxy.Get(topology.Version, new ExtendedGetRequest
{
Key = "test",
- Segment = 1
+ Segment = 1
});
Assert.Equal(1, values[0].Length);
View
5 Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
@@ -36,7 +36,7 @@ public DistributedHashTableStorageHost(Uri master)
Endpoint = new NodeEndpoint
{
Sync = new Uri("rhino.dht://" + Environment.MachineName + ":" + port + "/"),
- Async = new Uri("rhino.queues://" + Environment.MachineName + ":" + (port + 1) + "/")
+ Async = new Uri("rhino.queues://" + Environment.MachineName + ":" + (port + 1) + "/replication")
};
queueManager = new QueueManager(new IPEndPoint(IPAddress.Any, port + 1), name + ".queue.esent");
@@ -274,9 +274,10 @@ private void HandleTopologyUpdate(MessageStreamWriter<StorageMessageUnion> write
public void Dispose()
{
listener.Stop();
+ queueManager.Dispose();
+
node.Dispose();
storage.Dispose();
- queueManager.Dispose();
}
}
}
View
57 Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs
@@ -1,12 +1,18 @@
using System;
using System.Collections.Generic;
+using System.IO;
using System.Linq;
using System.Threading;
+using System.Transactions;
+using Google.ProtocolBuffers;
+using log4net;
using Rhino.DistributedHashTable.Commands;
using Rhino.DistributedHashTable.Parameters;
+using Rhino.DistributedHashTable.Protocol;
using Rhino.DistributedHashTable.Remote;
using Rhino.DistributedHashTable.Util;
using Rhino.Queues;
+using Rhino.Queues.Model;
namespace Rhino.DistributedHashTable.Internal
{
@@ -18,6 +24,8 @@ public class DistributedHashTableNode : IDistributedHashTableNode
private readonly IMessageSerializer messageSerializer;
private readonly IQueueManager queueManager;
private readonly IDistributedHashTableNodeReplicationFactory replicationFactory;
+ private readonly Thread backgroundReplication;
+ private readonly ILog log = LogManager.GetLogger(typeof(DistributedHashTableNode));
public DistributedHashTableNode(IDistributedHashTableMaster master,
IExecuter executer,
@@ -33,6 +41,52 @@ public class DistributedHashTableNode : IDistributedHashTableNode
this.queueManager = queueManager;
this.replicationFactory = replicationFactory;
State = NodeState.NotStarted;
+ backgroundReplication = new Thread(BackgroundReplication);
+ }
+
+ private void BackgroundReplication()
+ {
+ var errors = new Dictionary<MessageId, int>();
+ while (disposed == false)
+ {
+ int numOfErrors = 0;
+ MessageId id = null;
+ try
+ {
+ using (var tx = new TransactionScope())
+ {
+ var message = queueManager.Receive("replication");
+ id = message.Id;
+ if (errors.TryGetValue(id, out numOfErrors) == false)
+ numOfErrors = 0;
+ if (numOfErrors > 5)
+ {
+ log.ErrorFormat("Could not process message {0}, failed too many times, discarding", id);
+ continue;
+ }
+
+ var requests = messageSerializer.Deserialize(message.Data);
+ var puts = requests.OfType<ExtendedPutRequest>().ToArray();
+ var removes = requests.OfType<ExtendedRemoveRequest>().ToArray();
+ if (puts.Length > 0)
+ Storage.Put(GetTopologyVersion(), puts);
+ if (removes.Length > 0)
+ Storage.Remove(GetTopologyVersion(), removes);
+ tx.Complete();
+ }
+ errors.Remove(id);
+ }
+ catch (ObjectDisposedException)
+ {
+
+ }
+ catch (Exception e)
+ {
+ log.Error("Could not process message, will retry again", e);
+ if (id != null)
+ errors[id] = numOfErrors + 1;
+ }
+ }
}
public NodeState State { get; set; }
@@ -161,10 +215,13 @@ private void StartPendingBackupsForCurrentNode(Topology topology)
}
protected int currentlyReplicatingBackups;
+ private volatile bool disposed;
public void Dispose()
{
+ disposed = true;
executer.Dispose();
+ backgroundReplication.Join();
}
public void SetTopology(Topology topology)
View
14 Rhino.DistributedHashTable/Internal/NodeEndpoint.cs
@@ -11,13 +11,23 @@ public class NodeEndpoint
private byte[] serializedEndpoint;
private byte[] hash;
public Uri Sync { get; set; }
- public Uri Async { get; set; }
+ private Uri async;
+ public Uri Async
+ {
+ get { return async; }
+ set
+ {
+ if (value.AbsolutePath.EndsWith("replication")==false)
+ throw new ArgumentException("Async uri must ends with 'replication'", "value");
+ async = value;
+ }
+ }
public static NodeEndpoint ForTest(int port)
{
return new NodeEndpoint
{
- Async = new Uri("rhino.queues://test:" + port),
+ Async = new Uri("rhino.queues://test:" + port +"/replication"),
Sync = new Uri("tcp://test:" + port)
};
}

0 comments on commit 75b1d09

Please sign in to comment.
Something went wrong with that request. Please try again.