Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Master topology is now persisted

Will not try to update topology if there is already a pending updating operation
Can remove a value without a specific version


git-svn-id: https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/experiments/dht/dht@2197 079b0acf-d9fa-0310-9935-e5ade295c882
  • Loading branch information...
commit 4bbbbb816bb84b7c9c3ccde12609f40e2cb96f5f 1 parent c950dd9
ayenderahien authored
Showing with 353 additions and 77 deletions.
  1. +1 −1  Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs
  2. +58 −0 Rhino.DistributedHashTable.ClusterTests/DistributedHashTableMasterHostTest.cs
  3. +4 −12 Rhino.DistributedHashTable.ClusterTests/FullIntegrationTest.cs
  4. +2 −0  Rhino.DistributedHashTable.ClusterTests/Rhino.DistributedHashTable.ClusterTests.csproj
  5. +18 −0 Rhino.DistributedHashTable.ClusterTests/WithDebug.cs
  6. +25 −4 Rhino.DistributedHashTable.Tests/BackCopiesBehavior.cs
  7. +7 −1 Rhino.DistributedHashTable.Tests/MasterCaughtUpBehavior.cs
  8. +7 −1 Rhino.DistributedHashTable.Tests/MasterGaveUpBehavior.cs
  9. +44 −6 Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs
  10. +10 −0 Rhino.DistributedHashTable.Tests/MasterTestBase.cs
  11. +7 −1 Rhino.DistributedHashTable.Tests/RangesBehavior.cs
  12. +1 −0  Rhino.DistributedHashTable.Tests/Rhino.DistributedHashTable.Tests.csproj
  13. +6 −0 Rhino.DistributedHashTable/Commands/UpdateTopologyCommand.cs
  14. +58 −3 Rhino.DistributedHashTable/Hosting/DistributedHashTableMasterHost.cs
  15. +1 −2  Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
  16. +1 −0  Rhino.DistributedHashTable/Internal/Constants.cs
  17. +42 −7 Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs
  18. +26 −26 Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs
  19. +35 −13 Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs
View
2  Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs
@@ -266,7 +266,7 @@ public static void RepeatWhileThereAreTopologyChangedErrors(Action action)
}
catch (TopologyVersionDoesNotMatchException)
{
-
+ Thread.Sleep(250);
}
}
}
View
58 Rhino.DistributedHashTable.ClusterTests/DistributedHashTableMasterHostTest.cs
@@ -0,0 +1,58 @@
+using System;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using Rhino.DistributedHashTable.Client;
+using Rhino.DistributedHashTable.Hosting;
+using Xunit;
+
+namespace Rhino.DistributedHashTable.ClusterTests
+{
+ public class DistributedHashTableMasterHostTest
+ {
+ public class AfterRestart : FullIntegrationTest , IDisposable
+ {
+ readonly DistributedHashTableMasterHost host;
+ private readonly Uri masterUri = new Uri("rhino.dht://" + Environment.MachineName + ":2200/master");
+
+ public AfterRestart()
+ {
+ host = new DistributedHashTableMasterHost();
+
+ host.Start();
+
+ using (var storageHost = new DistributedHashTableStorageHost(masterUri))
+ {
+ storageHost.Start();
+
+ var masterProxy = new DistributedHashTableMasterClient(masterUri);
+ while(true)
+ {
+ var topology = masterProxy.GetTopology();
+ if (topology.Segments.All(x => x.AssignedEndpoint != null))
+ break;
+ Thread.Sleep(100);
+ }
+ }
+
+ //restart
+ host.Dispose();
+ host = new DistributedHashTableMasterHost();
+ host.Start();
+ }
+
+ [Fact]
+ public void ShouldRetainPreviousTopology()
+ {
+ var masterProxy = new DistributedHashTableMasterClient(masterUri);
+ var topology = masterProxy.GetTopology();
+ Assert.True(topology.Segments.All(x => x.AssignedEndpoint != null));
+ }
+
+ public void Dispose()
+ {
+ host.Dispose();
+ }
+ }
+ }
+}
View
16 Rhino.DistributedHashTable.ClusterTests/FullIntegrationTest.cs
@@ -1,20 +1,9 @@
using System.IO;
-using log4net.Appender;
-using log4net.Config;
-using log4net.Layout;
namespace Rhino.DistributedHashTable.ClusterTests
{
- public class FullIntegrationTest
+ public class FullIntegrationTest : WithDebug
{
- static FullIntegrationTest()
- {
- BasicConfigurator.Configure(new DebugAppender
- {
- Layout = new SimpleLayout()
- });
- }
-
public FullIntegrationTest()
{
if(Directory.Exists("node.queue.esent"))
@@ -28,6 +17,9 @@ public FullIntegrationTest()
if (Directory.Exists("nodeB.data.esent"))
Directory.Delete("nodeB.data.esent", true);
+
+ if (Directory.Exists("master.esent"))
+ Directory.Delete("master.esent", true);
}
}
}
View
2  Rhino.DistributedHashTable.ClusterTests/Rhino.DistributedHashTable.ClusterTests.csproj
@@ -58,8 +58,10 @@
<Compile Include="ClusterTests.cs" />
<Compile Include="FullIntegrationTest.cs" />
<Compile Include="MasterOverTheNetwork.cs" />
+ <Compile Include="DistributedHashTableMasterHostTest.cs" />
<Compile Include="NodeOverTheNetwork.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="WithDebug.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\pht\Rhino.PersistentHashTable\Rhino.PersistentHashTable.csproj">
View
18 Rhino.DistributedHashTable.ClusterTests/WithDebug.cs
@@ -0,0 +1,18 @@
+using log4net.Appender;
+using log4net.Config;
+using log4net.Layout;
+
+namespace Rhino.DistributedHashTable.ClusterTests
+{
+ public class WithDebug
+ {
+ static WithDebug()
+ {
+ BasicConfigurator.Configure(new DebugAppender
+ {
+ Layout = new SimpleLayout()
+ });
+ }
+
+ }
+}
View
29 Rhino.DistributedHashTable.Tests/BackCopiesBehavior.cs
@@ -1,3 +1,4 @@
+using System;
using System.Net;
using Rhino.DistributedHashTable.Internal;
using Xunit;
@@ -7,7 +8,7 @@ namespace Rhino.DistributedHashTable.Tests
{
public class BackCopiesBehavior
{
- public class OnEmptyMaster
+ public class OnEmptyMaster : MasterTestBase
{
private readonly DistributedHashTableMaster master;
private readonly NodeEndpoint endPoint;
@@ -25,9 +26,14 @@ public void AddingNewNodeResultInAllSegmentsHavingNoBackupCopies()
Assert.True(master.Segments.All(x => x.PendingBackups.Count == 0));
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
- public class OnMasterWithOneExistingNode
+ public class OnMasterWithOneExistingNode : MasterTestBase
{
private readonly DistributedHashTableMaster master;
private readonly NodeEndpoint endPoint;
@@ -60,9 +66,14 @@ public void AddingNewNodeWillRaiseBackupChangedEvent()
Assert.True(wasChanged);
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
- public class OnMasterWithTwoNodes
+ public class OnMasterWithTwoNodes : MasterTestBase
{
private readonly DistributedHashTableMaster master;
private readonly NodeEndpoint endPoint;
@@ -87,9 +98,14 @@ public void AddingNewNodeResultInAllSegmentsHavingTwoBackupCopy()
master.CaughtUp(endPoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
Assert.True(master.Segments.All(x => x.PendingBackups.Count == 2));
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
- public class OnMasterWithThreeNodes
+ public class OnMasterWithThreeNodes : MasterTestBase
{
private readonly DistributedHashTableMaster master;
private readonly NodeEndpoint endPoint;
@@ -117,6 +133,11 @@ public void AddingNewNodeResultInAllSegmentsHavingAtLeastTwoBackupCopy()
master.CaughtUp(yetAnotherEndPoint, ReplicationType.Ownership, ranges.Select(x => x.Index).ToArray());
Assert.True(master.Segments.All(x => x.PendingBackups.Count >= 2));
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
}
}
View
8 Rhino.DistributedHashTable.Tests/MasterCaughtUpBehavior.cs
@@ -1,3 +1,4 @@
+using System;
using System.Linq;
using System.Net;
using Rhino.DistributedHashTable.Internal;
@@ -7,7 +8,7 @@ namespace Rhino.DistributedHashTable.Tests
{
public class MasterCaughtUpBehavior
{
- public class OnCaughtUp
+ public class OnCaughtUp : MasterTestBase
{
private readonly DistributedHashTableMaster master;
private readonly NodeEndpoint endPoint;
@@ -49,6 +50,11 @@ public void WhenCatchingUpOnBackupsWillMoveFromPendingBackupsToBackups()
Assert.Empty(segment.PendingBackups);
Assert.Equal(1, segment.Backups.Count);
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
}
}
View
8 Rhino.DistributedHashTable.Tests/MasterGaveUpBehavior.cs
@@ -1,3 +1,4 @@
+using System;
using System.Linq;
using Rhino.DistributedHashTable.Internal;
using Xunit;
@@ -6,7 +7,7 @@ namespace Rhino.DistributedHashTable.Tests
{
public class MasterGaveUpBehavior
{
- public class OnGaveUp
+ public class OnGaveUp : MasterTestBase
{
private readonly DistributedHashTableMaster master;
private readonly NodeEndpoint endPoint;
@@ -49,6 +50,11 @@ public void WhenGivingUpOnBackupsWillRemoveFromPendingBackups()
Assert.Empty(segment.PendingBackups);
Assert.Equal(0, segment.Backups.Count);
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
}
}
View
50 Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs
@@ -1,3 +1,4 @@
+using System;
using System.Linq;
using System.Net;
using Rhino.DistributedHashTable.Internal;
@@ -7,7 +8,7 @@ namespace Rhino.DistributedHashTable.Tests
{
public class MasterJoinBehavior
{
- public class OnEmptyMaster
+ public class OnEmptyMaster : MasterTestBase
{
private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
@@ -19,9 +20,26 @@ public void AllSegmentsAreDirectlyAllocatedToEndpoint()
Assert.True(master.Segments.All(x => x.AssignedEndpoint == endPoint));
}
+
+ [Fact]
+ public void DirectlyModifyingTopologyAndThenCallingRefreshEndpointsShouldShowAllEndpoints()
+ {
+ Assert.False(master.Endpoints.Any(x=>x == NodeEndpoint.ForTest(1)));
+
+ master.Topology.Segments[0].AssignedEndpoint = NodeEndpoint.ForTest(1);
+ master.RefreshEndpoints();
+
+ Assert.True(master.Endpoints.Any(x => x == NodeEndpoint.ForTest(1)));
+
+ }
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
- public class JoiningTwice
+ public class JoiningTwice : MasterTestBase
{
private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
@@ -34,10 +52,15 @@ public void IsNoOpp()
Assert.Equal(ranges1, ranges2);
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
- public class NewEndpointJoiningNonEmptyMaster
- {
+ public class NewEndpointJoiningNonEmptyMaster : MasterTestBase
+ {
private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
private readonly NodeEndpoint newEndpoint = NodeEndpoint.ForTest(1);
@@ -66,9 +89,14 @@ public void HalfOfTheSegmentsWillBeInTheProcessOfAssigningToNewEndpoint()
Assert.Equal(master.Segments.Count()/2,
master.Segments.Count(x => x.InProcessOfMovingToEndpoint == newEndpoint));
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
- public class NewEndpointJoiningMasterWhenAnotherJoinIsInTheProcessOfJoining
+ public class NewEndpointJoiningMasterWhenAnotherJoinIsInTheProcessOfJoining : MasterTestBase
{
private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
@@ -99,9 +127,14 @@ public void WillNotAffectJoiningOfExistingNode()
{
Assert.Equal(4096, master.Segments.Count(x => x.InProcessOfMovingToEndpoint == anotherNodeInTheProcessOfJoining));
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
- public class NewEndpointJoiningMasterWithTwoNodes
+ public class NewEndpointJoiningMasterWithTwoNodes : MasterTestBase
{
private readonly NodeEndpoint endPoint = NodeEndpoint.ForTest(0);
private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
@@ -133,6 +166,11 @@ public void ThirdOfTheAvailableSegmentsWillBeAssignedToNewNode()
{
Assert.Equal(2730, master.Segments.Count(x => x.InProcessOfMovingToEndpoint == newEndpoint));
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
}
}
View
10 Rhino.DistributedHashTable.Tests/MasterTestBase.cs
@@ -0,0 +1,10 @@
+using System;
+using System.IO;
+
+namespace Rhino.DistributedHashTable.Tests
+{
+ public abstract class MasterTestBase : IDisposable
+ {
+ public abstract void Dispose();
+ }
+}
View
8 Rhino.DistributedHashTable.Tests/RangesBehavior.cs
@@ -1,3 +1,4 @@
+using System;
using System.Net;
using Rhino.DistributedHashTable.Internal;
using Xunit;
@@ -7,7 +8,7 @@ namespace Rhino.DistributedHashTable.Tests
{
public class SegmentsBehavior
{
- public class WhenMasterCreatesSegment
+ public class WhenMasterCreatesSegment : MasterTestBase, IDisposable
{
private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
@@ -16,6 +17,11 @@ public void ThereShouldBe8192Segments()
{
Assert.Equal(8192, master.Segments.Count());
}
+
+ public override void Dispose()
+ {
+ master.Dispose();
+ }
}
}
}
View
1  Rhino.DistributedHashTable.Tests/Rhino.DistributedHashTable.Tests.csproj
@@ -59,6 +59,7 @@
<Compile Include="MasterCaughtUpBehavior.cs" />
<Compile Include="MasterGaveUpBehavior.cs" />
<Compile Include="MasterJoinBehavior.cs" />
+ <Compile Include="MasterTestBase.cs" />
<Compile Include="NodeReplicationBehavior.cs" />
<Compile Include="NodeStartupBehavior.cs" />
<Compile Include="Program.cs" />
View
6 Rhino.DistributedHashTable/Commands/UpdateTopologyCommand.cs
@@ -11,6 +11,8 @@ public class UpdateTopologyCommand : ICommand
private readonly IDistributedHashTableMaster master;
private readonly DistributedHashTableNode node;
+ public event Action Completed = delegate { };
+
public UpdateTopologyCommand(IDistributedHashTableMaster master,
DistributedHashTableNode node)
{
@@ -38,6 +40,10 @@ public bool Execute()
log.Warn("Unable to update topology, we are probably running on incosistent topology", e);
return false;
}
+ finally
+ {
+ Completed();
+ }
}
}
}
View
61 Rhino.DistributedHashTable/Hosting/DistributedHashTableMasterHost.cs
@@ -1,4 +1,5 @@
using System;
+using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
@@ -9,6 +10,7 @@
using Rhino.DistributedHashTable.Protocol;
using Rhino.DistributedHashTable.Remote;
using Rhino.DistributedHashTable.Util;
+using Rhino.PersistentHashTable;
using NodeEndpoint = Rhino.DistributedHashTable.Internal.NodeEndpoint;
using ReplicationType=Rhino.DistributedHashTable.Protocol.ReplicationType;
@@ -21,18 +23,20 @@ public class DistributedHashTableMasterHost : IDisposable
private readonly TcpListener listener;
private readonly DistributedHashTableMaster master;
+ private readonly PersistentHashTable.PersistentHashTable hashTable;
public DistributedHashTableMasterHost()
- : this(new ThreadPoolExecuter(), 2200)
+ : this("master.esent", new ThreadPoolExecuter(), 2200)
{
}
- public DistributedHashTableMasterHost(IExecuter executer, int port)
+ public DistributedHashTableMasterHost(string name, IExecuter executer, int port)
{
this.executer = executer;
master = new DistributedHashTableMaster();
master.TopologyChanged += OnTopologyChanged;
listener = new TcpListener(IPAddress.Any, port);
+ hashTable = new PersistentHashTable.PersistentHashTable(name);
}
private void OnTopologyChanged()
@@ -41,17 +45,68 @@ private void OnTopologyChanged()
executer.RegisterForExecution(new NotifyEndpointsAboutTopologyChange(
master.Endpoints.ToArray(),
new NonPooledDistributedHashTableNodeFactory()
- ));
+ ));
+
+ PersistTopology();
+ }
+
+ private void PersistTopology()
+ {
+ byte[] buffer;
+ var topology = master.Topology.GetTopology();
+ using (var stream = new MemoryStream())
+ {
+ var writer = new MessageStreamWriter<TopologyResultMessage>(stream);
+ writer.Write(topology);
+ writer.Flush();
+ buffer = stream.ToArray();
+ }
+
+ hashTable.Batch(actions =>
+ {
+ var values = actions.Get(new GetRequest
+ {
+ Key = Constants.Topology
+ });
+ actions.Put(new PutRequest
+ {
+ Key = Constants.Topology,
+ ParentVersions = values.Select(x => x.Version).ToArray(),
+ Bytes = buffer,
+ IsReadOnly = true
+ });
+
+ actions.Commit();
+ });
}
public void Dispose()
{
listener.Stop();
executer.Dispose();
+ master.Dispose();
+ hashTable.Dispose();
}
public void Start()
{
+ hashTable.Initialize();
+ hashTable.Batch(actions =>
+ {
+ var value = actions.Get(new GetRequest {Key = Constants.Topology}).LastOrDefault();
+
+ if (value != null)
+ {
+ var topology = MessageStreamIterator<TopologyResultMessage>
+ .FromStreamProvider(() => new MemoryStream(value.Data))
+ .First();
+
+ master.Topology = topology.GetTopology();
+ master.RefreshEndpoints();
+ }
+ actions.Commit();
+ });
+
listener.Start();
listener.BeginAcceptTcpClient(OnAcceptTcpClient, null);
OnTopologyChanged();
View
3  Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
@@ -285,9 +285,8 @@ private void HandleTopologyUpdate(MessageStreamWriter<StorageMessageUnion> write
public void Dispose()
{
listener.Stop();
- queueManager.Dispose();
-
node.Dispose();
+ queueManager.Dispose();
storage.Dispose();
}
}
View
1  Rhino.DistributedHashTable/Internal/Constants.cs
@@ -5,5 +5,6 @@ public static class Constants
public const int NumberOfSegments = 8192;
public const string RhinoDhtStartToken = "@rdht://";
public const string MovedSegment = RhinoDhtStartToken + "Segment/Moved/";
+ public const string Topology = RhinoDhtStartToken + "Topology/";
}
}
View
49 Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs
@@ -12,7 +12,7 @@ namespace Rhino.DistributedHashTable.Internal
/// The master is a SIGNLE THREADED service that manages all
/// operations in the cluster.
/// </summary>
- public class DistributedHashTableMaster : IDistributedHashTableMaster
+ public class DistributedHashTableMaster : IDistributedHashTableMaster, IDisposable
{
private readonly HashSet<NodeEndpoint> endpoints = new HashSet<NodeEndpoint>();
private readonly ILog log = LogManager.GetLogger(typeof(DistributedHashTableMaster));
@@ -274,14 +274,25 @@ private Segment[] JoinInternal(NodeEndpoint endpoint)
return Segments.Where(x => x.BelongsTo(endpoint)).ToArray();
}
- var segmentsThatHadNoOwner = Segments
- .Where(x => x.AssignedEndpoint == null)
- .Apply(x => x.AssignedEndpoint = endpoint)
- .ToArray();
- if (segmentsThatHadNoOwner.Length > 0)
+ var thereAreSegementsWithNoowners = Segments
+ .Any(x => x.AssignedEndpoint == null);
+ if (thereAreSegementsWithNoowners)
{
+ Topology = new Topology(Segments
+ .Where(x => x.AssignedEndpoint == null)
+ .Select(x => new Segment
+ {
+ AssignedEndpoint = endpoint,
+ Backups = x.Backups,
+ Index = x.Index,
+ InProcessOfMovingToEndpoint = x.InProcessOfMovingToEndpoint,
+ PendingBackups = x.PendingBackups
+ }).ToArray(),
+ Topology.Version + 1
+ );
+ TopologyChanged();
log.DebugFormat("Endpoint {0} was assigned all segments without owners", endpoint.Sync);
- return segmentsThatHadNoOwner;
+ return Segments.Where(x => x.AssignedEndpoint == endpoint).ToArray();
}
log.DebugFormat("New endpoint {0}, allocating segments for it", endpoint.Sync);
@@ -346,5 +357,29 @@ class NodeEndpointStats
public int BackupCount;
public int TentativeBackupCount;
}
+
+ public void Dispose()
+ {
+
+ }
+
+ public void RefreshEndpoints()
+ {
+ foreach (var segment in Segments)
+ {
+ if (segment.AssignedEndpoint != null)
+ endpoints.Add(segment.AssignedEndpoint);
+ if (segment.InProcessOfMovingToEndpoint != null)
+ endpoints.Add(segment.InProcessOfMovingToEndpoint);
+ foreach (var backup in segment.Backups)
+ {
+ endpoints.Add(backup);
+ }
+ foreach (var backup in segment.PendingBackups)
+ {
+ endpoints.Add(backup);
+ }
+ }
+ }
}
}
View
52 Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs
@@ -24,6 +24,8 @@ public class DistributedHashTableNode : IDistributedHashTableNode
private readonly Thread backgroundReplication;
private readonly ILog log = LogManager.GetLogger(typeof(DistributedHashTableNode));
+ private int pendingUpdating;
+
public DistributedHashTableNode(IDistributedHashTableMaster master,
IExecuter executer,
IMessageSerializer messageSerializer,
@@ -102,7 +104,12 @@ public NodeEndpoint Endpoint
public void UpdateTopology()
{
- executer.RegisterForExecution(new UpdateTopologyCommand(master, this));
+ if (Interlocked.CompareExchange(ref pendingUpdating, 0, 0) != 0)
+ return;
+ Interlocked.Increment(ref pendingUpdating);
+ var command = new UpdateTopologyCommand(master, this);
+ command.Completed += () => Interlocked.Decrement(ref pendingUpdating);
+ executer.RegisterForExecution(command);
}
public int GetTopologyVersion()
@@ -231,45 +238,38 @@ public void Dispose()
{
disposed = true;
executer.Dispose();
+ queueManager.Dispose();
backgroundReplication.Join();
}
public void SetTopology(Topology topology)
{
- RemoveMoveMarkerForSegmentsThatWeAReNoLongerResponsibleFor(topology);
+ RemoveMoveMarkerForSegmentsThatWeAreNoLongerResponsibleFor(topology);
Topology = topology;
StartPendingBackupsForCurrentNode(topology);
}
- private void RemoveMoveMarkerForSegmentsThatWeAReNoLongerResponsibleFor(Topology topology)
+ private void RemoveMoveMarkerForSegmentsThatWeAreNoLongerResponsibleFor(Topology topology)
{
if (Topology == null || topology == null)
return;
- var segmentsThatWereMovedFromNode =
- from prev in Topology.Segments
- join current in topology.Segments on prev.Index equals current.Index
- where prev.AssignedEndpoint == endpoint && current.AssignedEndpoint != endpoint
- select new ExtendedGetRequest
- {
- Key = Constants.MovedSegment + prev.Index,
- Segment = prev.Index,
- IsLocal = true
- };
- var movedMarkers = Storage.Get(GetTopologyVersion(),
- segmentsThatWereMovedFromNode.ToArray());
- var requests = movedMarkers
- .Where(x => x.Length == 1)
- .Select(values => values[0])
- .Select(value => new ExtendedRemoveRequest
- {
- Key = value.Key,
- SpecificVersion = value.Version,
- IsLocal = true
- }).ToArray();
- if (requests.Length == 0)
+ var removeSegmentsThatWereMovedFromNode =
+ (
+ from prev in Topology.Segments
+ join current in topology.Segments on prev.Index equals current.Index
+ where prev.AssignedEndpoint == endpoint && current.AssignedEndpoint != endpoint
+ select new ExtendedRemoveRequest
+ {
+ Key = Constants.MovedSegment + prev.Index,
+ Segment = prev.Index,
+ IsLocal = true
+ }
+ ).ToArray();
+
+ if (removeSegmentsThatWereMovedFromNode.Length == 0)
return;
- Storage.Remove(GetTopologyVersion(), requests);
+ Storage.Remove(GetTopologyVersion(), removeSegmentsThatWereMovedFromNode);
}
}
}
View
48 Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs
@@ -127,6 +127,9 @@ public PutResult[] Put(int topologyVersion, params ExtendedPutRequest[] valuesTo
private void AssertMatchingTopologyVersion(int topologyVersionFromClient)
{
+ if (TopologyVersion == topologyVersionFromClient)
+ return;
+
//client thinks that the version is newer
if(topologyVersionFromClient > TopologyVersion)
{
@@ -135,13 +138,14 @@ private void AssertMatchingTopologyVersion(int topologyVersionFromClient)
TopologyVersion);
distributedHashTableNode.UpdateTopology();
}
- if(TopologyVersion != topologyVersionFromClient)
+ else
{
log.InfoFormat("Got request for topology {0} but current local version is {1}",
- topologyVersionFromClient,
- TopologyVersion);
- throw new TopologyVersionDoesNotMatchException("Topology Version doesn't match, you need to refresh the topology from the master");
+ topologyVersionFromClient,
+ TopologyVersion);
}
+ throw new TopologyVersionDoesNotMatchException(
+ "Topology Version doesn't match, you need to refresh the topology from the master");
}
public bool[] Remove(int topologyVersion, params ExtendedRemoveRequest[] valuesToRemove)
@@ -157,19 +161,23 @@ public bool[] Remove(int topologyVersion, params ExtendedRemoveRequest[] valuesT
if (request.IsReplicationRequest == false && request.IsLocal == false)
AssertSegmentNotMoved(actions, request.Segment);
- if (request.SpecificVersion == null)
- throw new ArgumentException("Could not accept request with no SpecificVersion");
-
if (request.Key.StartsWith(Constants.RhinoDhtStartToken) && request.IsLocal == false)
throw new ArgumentException(Constants.RhinoDhtStartToken + " is a reserved key prefix");
- foreach (var hash in actions.GetReplicationHashes(request.Key, request.SpecificVersion))
+ if(request.SpecificVersion!=null)
+ {
+ RegisterRemovalForReplication(request, actions, request.SpecificVersion);
+ }
+ else// all versions
{
- actions.AddReplicationRemovalInfo(
- request.Key,
- request.SpecificVersion,
- hash
- );
+ var values = actions.Get(new GetRequest
+ {
+ Key = request.Key
+ });
+ foreach (var value in values)
+ {
+ RegisterRemovalForReplication(request, actions, value.Version);
+ }
}
var remove = actions.Remove(request);
@@ -186,6 +194,20 @@ public bool[] Remove(int topologyVersion, params ExtendedRemoveRequest[] valuesT
return results.ToArray();
}
+ private static void RegisterRemovalForReplication(RemoveRequest request,
+ PersistentHashTableActions actions,
+ ValueVersion version)
+ {
+ foreach (var hash in actions.GetReplicationHashes(request.Key, version))
+ {
+ actions.AddReplicationRemovalInfo(
+ request.Key,
+ version,
+ hash
+ );
+ }
+ }
+
private void HandleReplication(
IExtendedRequest[] valuesToSend)
{
Please sign in to comment.
Something went wrong with that request. Please try again.