Permalink
Browse files

Change version to be numeric

Now when we move a node from ownership position on a node, it will remove it from the forward listing.

git-svn-id: https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/experiments/dht/dht@2192 079b0acf-d9fa-0310-9935-e5ade295c882
  • Loading branch information...
ayenderahien
ayenderahien committed Jun 6, 2009
1 parent d10c47e commit a803da47a687ff107db29ff93928880b38d6876e
Showing with 378 additions and 334 deletions.
  1. +62 −0 Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs
  2. +1 −1 Rhino.DistributedHashTable.ClusterTests/MasterOverTheNetwork.cs
  3. +11 −11 Rhino.DistributedHashTable.IntegrationTests/DistributedHashTableReplicationTest.cs
  4. +22 −22 Rhino.DistributedHashTable.IntegrationTests/DistributedHashTableStorageTest.cs
  5. +5 −5 Rhino.DistributedHashTable.IntegrationTests/OnlineRangeReplicationCommandTest.cs
  6. +15 −8 Rhino.DistributedHashTable.Tests/NodeReplicationBehavior.cs
  7. +6 −6 Rhino.DistributedHashTable/Client/DistributedHashTableStorageClient.cs
  8. +8 −8 Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
  9. +27 −25 Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs
  10. +36 −4 Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs
  11. +12 −12 Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs
  12. +1 −1 Rhino.DistributedHashTable/Internal/IDistributedHashTableNode.cs
  13. +5 −5 Rhino.DistributedHashTable/Internal/IDistributedHashTableStorage.cs
  14. +1 −8 Rhino.DistributedHashTable/Internal/Segment.cs
  15. +2 −10 Rhino.DistributedHashTable/Internal/Topology.cs
  16. +1 −0 Rhino.DistributedHashTable/Parameters/ExtendedGetRequest.cs
  17. +1 −0 Rhino.DistributedHashTable/Parameters/ExtendedPutRequest.cs
  18. +1 −0 Rhino.DistributedHashTable/Parameters/ExtendedRemoveRequest.cs
  19. +1 −0 Rhino.DistributedHashTable/Parameters/IExtendedRequest.cs
  20. +143 −187 Rhino.DistributedHashTable/Protocol/ProtocolDef.cs
  21. +6 −7 Rhino.DistributedHashTable/Protocol/ProtocolDef.proto
  22. +9 −10 Rhino.DistributedHashTable/Protocol/ProtocolDef.proto.bin
  23. +2 −4 Rhino.DistributedHashTable/Util/PrtoBufConverter.cs
@@ -86,6 +86,68 @@ public void AfterBothNodesJoinedWillAutomaticallyReplicateToBackupNode()
}
}
+ [Fact]
+ public void CanReadValueFromBackupNodeThatUsedToBeTheSegmentOwner()
+ {
+ storageHostB.Start();
+
+ var masterProxy = new DistributedHashTableMasterClient(masterUri);
+
+ Topology topology;
+ for (var i = 0; i < 50; i++)
+ {
+ topology = masterProxy.GetTopology();
+ var count = topology.Segments
+ .Where(x => x.AssignedEndpoint == storageHostA.Endpoint)
+ .Count();
+
+ if (count == 4096)
+ break;
+ Thread.Sleep(500);
+ }
+
+ topology = masterProxy.GetTopology();
+ var segment = topology.Segments.First(x => x.AssignedEndpoint == storageHostA.Endpoint).Index;
+ using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
+ {
+ nodeA.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] { 2, 2, 0, 0 },
+ Key = "abc",
+ Segment = segment
+ });
+ }
+
+ using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint))
+ {
+ topology = masterProxy.GetTopology();
+ Value[][] values = null;
+ for (var i = 0; i < 100; i++)
+ {
+ values = nodeB.Get(topology.Version, new ExtendedGetRequest
+ {
+ Key = "abc",
+ Segment = segment
+ });
+ if (values[0].Length != 0)
+ break;
+ Thread.Sleep(250);
+ }
+ Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data);
+ }
+
+ using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
+ {
+ topology = masterProxy.GetTopology();
+ var values = nodeA.Get(topology.Version, new ExtendedGetRequest
+ {
+ Key = "abc",
+ Segment = segment
+ });
+ Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data);
+ }
+ }
+
[Fact]
public void TwoNodesCanJoinToTheCluster()
@@ -28,7 +28,7 @@ public void CanGetTopologyWhenThereAreNoNodes()
{
var topology = masterProxy.GetTopology();
Assert.NotNull(topology);
- Assert.NotEqual(Guid.Empty, topology.Version);
+ Assert.NotEqual(0, topology.Version);
Assert.NotEqual(DateTime.MinValue, topology.Timestamp);
Assert.Equal(8192, topology.Segments.Length);
Assert.True(topology.Segments.All(x => x.AssignedEndpoint == null));
@@ -16,14 +16,14 @@ public class WhenThereAreNoKeysInTable: EsentTestBase, IDisposable
{
private readonly DistributedHashTableStorage distributedHashTableStorage;
private readonly IDistributedHashTableNode node;
- private readonly Guid guid;
+ private readonly int topologyVersion;
private readonly IDistributedHashTableNodeReplication replication;
public WhenThereAreNoKeysInTable()
{
node = MockRepository.GenerateStub<IDistributedHashTableNode>();
- guid = Guid.NewGuid();
- node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ topologyVersion = 7;
+ node.Stub(x => x.GetTopologyVersion()).Return(topologyVersion);
distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
node);
replication = distributedHashTableStorage.Replication;
@@ -43,7 +43,7 @@ public void PuttingKeyInSegmentAsisgnedElsewhereShouldThrow()
{
var ranges = Enumerable.Range(0, 500).ToArray();
replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ReplicationType.Ownership, ranges);
- var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Put(guid, new ExtendedPutRequest()
+ var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Put(topologyVersion, new ExtendedPutRequest()
{
Key = "test",
Segment = 0,
@@ -57,7 +57,7 @@ public void GettingKeyInSegmentAsisgnedElsewhereShouldThrow()
{
var ranges = Enumerable.Range(0, 500).ToArray();
replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ReplicationType.Ownership, ranges);
- var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Get(guid, new ExtendedGetRequest()
+ var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Get(topologyVersion, new ExtendedGetRequest()
{
Key = "test",
Segment = 0,
@@ -71,7 +71,7 @@ public void RemovingKeyInSegmentAsisgnedElsewhereShouldThrow()
{
var ranges = Enumerable.Range(0, 500).ToArray();
replication.AssignAllEmptySegments(NodeEndpoint.ForTest(1), ReplicationType.Ownership, ranges);
- var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Remove(guid, new ExtendedRemoveRequest
+ var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Remove(topologyVersion, new ExtendedRemoveRequest
{
Key = "test",
Segment = 0,
@@ -89,19 +89,19 @@ public class WhenThereAreKeysInTable : EsentTestBase, IDisposable
{
private readonly DistributedHashTableStorage distributedHashTableStorage;
private readonly IDistributedHashTableNode node;
- private readonly Guid guid;
+ private readonly int topologyVersion;
private readonly IDistributedHashTableNodeReplication replication;
private readonly PutResult putResult;
public WhenThereAreKeysInTable()
{
node = MockRepository.GenerateStub<IDistributedHashTableNode>();
- guid = Guid.NewGuid();
- node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ topologyVersion = 9;
+ node.Stub(x => x.GetTopologyVersion()).Return(topologyVersion);
distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
node);
replication = distributedHashTableStorage.Replication;
- putResult = distributedHashTableStorage.Put(guid, new ExtendedPutRequest
+ putResult = distributedHashTableStorage.Put(topologyVersion, new ExtendedPutRequest
{
Tag = 0,
Bytes = new byte[] {1},
@@ -161,7 +161,7 @@ public void WhenDoneGettingAllKeysWillAssignSegmentToEndpoint()
result = replication.ReplicateNextPage(NodeEndpoint.ForTest(1), ReplicationType.Ownership, 0);
Assert.Equal(0, result.PutRequests.Length);
- var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Remove(guid, new ExtendedRemoveRequest
+ var exception = Assert.Throws<SeeOtherException>(() => node.Storage.Remove(topologyVersion, new ExtendedRemoveRequest
{
Key = "test",
Segment = 0,
@@ -13,17 +13,17 @@ public class ReadingValues : EsentTestBase, IDisposable
{
private readonly DistributedHashTableStorage distributedHashTableStorage;
private readonly IDistributedHashTableNode node;
- private readonly Guid guid;
+ private readonly int topologyVersion;
public ReadingValues()
{
node = MockRepository.GenerateStub<IDistributedHashTableNode>();
- guid = Guid.NewGuid();
- node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ topologyVersion = 1;
+ node.Stub(x => x.GetTopologyVersion()).Return(topologyVersion);
distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
node);
- distributedHashTableStorage.Put(guid, new ExtendedPutRequest
+ distributedHashTableStorage.Put(topologyVersion, new ExtendedPutRequest
{
Key = "test",
Bytes = new byte[]{1,2,4},
@@ -34,7 +34,7 @@ public ReadingValues()
[Fact]
public void WillReturnNullForMissingValue()
{
- var values = distributedHashTableStorage.Get(guid, new ExtendedGetRequest
+ var values = distributedHashTableStorage.Get(topologyVersion, new ExtendedGetRequest
{
Key = "missing-value",
Segment = 0
@@ -45,7 +45,7 @@ public void WillReturnNullForMissingValue()
[Fact]
public void WillReturnValueForExistingValue()
{
- var values = distributedHashTableStorage.Get(guid, new ExtendedGetRequest
+ var values = distributedHashTableStorage.Get(topologyVersion, new ExtendedGetRequest
{
Key = "test",
Segment = 0
@@ -63,21 +63,21 @@ public class WritingValues : EsentTestBase, IDisposable
{
private readonly DistributedHashTableStorage distributedHashTableStorage;
private readonly IDistributedHashTableNode node;
- private readonly Guid guid;
+ private readonly int topologyVersion;
public WritingValues()
{
node = MockRepository.GenerateStub<IDistributedHashTableNode>();
- guid = Guid.NewGuid();
- node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ topologyVersion = 2;
+ node.Stub(x => x.GetTopologyVersion()).Return(topologyVersion);
distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
node);
}
[Fact]
public void WillGetVersionNumberFromPut()
{
- var results = distributedHashTableStorage.Put(guid, new ExtendedPutRequest
+ var results = distributedHashTableStorage.Put(topologyVersion, new ExtendedPutRequest
{
Key = "test",
Bytes = new byte[] { 1, 2, 4 },
@@ -96,7 +96,7 @@ public void WillReplicateToOtherSystems()
Bytes = new byte[] { 1, 2, 4 },
Segment = 0,
};
- distributedHashTableStorage.Put(guid, request);
+ distributedHashTableStorage.Put(topologyVersion, request);
node.AssertWasCalled(x=>x.SendToAllOtherBackups(0, request));
}
@@ -110,7 +110,7 @@ public void WillReplicateToOwnerWhenFailedOverToAnotherNode()
Bytes = new byte[] { 1, 2, 4 },
Segment = 0,
};
- distributedHashTableStorage.Put(guid, request);
+ distributedHashTableStorage.Put(topologyVersion, request);
node.AssertWasCalled(x => x.SendToOwner(0, request));
}
@@ -124,7 +124,7 @@ public void WillReplicateToOtherBackupsWhenFailedOverToAnotherNode()
Bytes = new byte[] { 1, 2, 4 },
Segment = 0,
};
- distributedHashTableStorage.Put(guid, request);
+ distributedHashTableStorage.Put(topologyVersion, request);
node.AssertWasCalled(x => x.SendToAllOtherBackups(0, request));
}
@@ -138,18 +138,18 @@ public class RemovingValues : EsentTestBase, IDisposable
{
private readonly DistributedHashTableStorage distributedHashTableStorage;
private readonly IDistributedHashTableNode node;
- private readonly Guid guid;
+ private readonly int topologyVersion;
private ValueVersion version;
public RemovingValues()
{
node = MockRepository.GenerateStub<IDistributedHashTableNode>();
- guid = Guid.NewGuid();
- node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ topologyVersion = 1;
+ node.Stub(x => x.GetTopologyVersion()).Return(topologyVersion);
distributedHashTableStorage = new DistributedHashTableStorage("test.esent",
node);
- var results = distributedHashTableStorage.Put(guid, new ExtendedPutRequest
+ var results = distributedHashTableStorage.Put(topologyVersion, new ExtendedPutRequest
{
Key = "test",
Bytes = new byte[] { 1, 2, 4 },
@@ -161,7 +161,7 @@ public RemovingValues()
[Fact]
public void WillConfirmRemovalOfExistingValue()
{
- var results = distributedHashTableStorage.Remove(guid, new ExtendedRemoveRequest
+ var results = distributedHashTableStorage.Remove(topologyVersion, new ExtendedRemoveRequest
{
Key = "test",
SpecificVersion = version,
@@ -173,7 +173,7 @@ public void WillConfirmRemovalOfExistingValue()
[Fact]
public void WillNotConfirmRemovalOfNonExistingValue()
{
- var results = distributedHashTableStorage.Remove(guid, new ExtendedRemoveRequest
+ var results = distributedHashTableStorage.Remove(topologyVersion, new ExtendedRemoveRequest
{
Key = "test2",
SpecificVersion = version,
@@ -192,7 +192,7 @@ public void WillReplicateToOtherSystems()
SpecificVersion = version,
Segment = 0,
};
- distributedHashTableStorage.Remove(guid, request);
+ distributedHashTableStorage.Remove(topologyVersion, request);
node.AssertWasCalled(x => x.SendToAllOtherBackups(0, request));
}
@@ -206,7 +206,7 @@ public void WillReplicateToOwnerWhenFailedOverToAnotherNode()
SpecificVersion = version,
Segment = 0,
};
- distributedHashTableStorage.Remove(guid, request);
+ distributedHashTableStorage.Remove(topologyVersion, request);
node.AssertWasCalled(x => x.SendToOwner(0, request));
}
@@ -220,7 +220,7 @@ public void WillReplicateToOtherBackupsWhenFailedOverToAnotherNode()
SpecificVersion = version,
Segment = 0,
};
- distributedHashTableStorage.Remove(guid, request);
+ distributedHashTableStorage.Remove(topologyVersion, request);
node.AssertWasCalled(x => x.SendToAllOtherBackups(0, request));
}
@@ -16,7 +16,7 @@ public class OnlineSegmentReplicationCommandTest : EsentTestBase
private readonly IDistributedHashTableNodeReplication replication;
private readonly NodeEndpoint endpoint;
private readonly IDistributedHashTableStorage storage;
- private readonly Guid guid = Guid.NewGuid();
+ private readonly int topologyVersion = 4;
public OnlineSegmentReplicationCommandTest()
{
@@ -26,7 +26,7 @@ public OnlineSegmentReplicationCommandTest()
node.Stub(x => x.Endpoint).Return(NodeEndpoint.ForTest(2));
storage = MockRepository.GenerateStub<IDistributedHashTableStorage>();
node.Storage = storage;
- node.Stub(x => x.GetTopologyVersion()).Return(guid);
+ node.Stub(x => x.GetTopologyVersion()).Return(topologyVersion);
command = new OnlineSegmentReplicationCommand(
endpoint,
new[] { new Segment { Index = 0 }, new Segment { Index = 1 }, },
@@ -127,7 +127,7 @@ public void WillPutReturnedItemsIntoStorage()
var success = command.Execute();
Assert.True(success);
- storage.AssertWasCalled(x => x.Put(guid, request));
+ storage.AssertWasCalled(x => x.Put(topologyVersion, request));
}
[Fact]
@@ -149,7 +149,7 @@ public void WillRemoveReturnedRemovalFromStorage()
var success = command.Execute();
Assert.True(success);
- storage.AssertWasCalled(x => x.Remove(guid, request));
+ storage.AssertWasCalled(x => x.Remove(topologyVersion, request));
}
[Fact]
@@ -210,7 +210,7 @@ public void WillRepeatReplicationUntilGetDone()
var success = command.Execute();
Assert.True(success);
- storage.AssertWasCalled(x => x.Put(guid, request), o => o.Repeat.Times(6));
+ storage.AssertWasCalled(x => x.Put(topologyVersion, request), o => o.Repeat.Times(6));
}
}
}
Oops, something went wrong.

0 comments on commit a803da4

Please sign in to comment.