diff --git a/Rhino.DistributedHashTable.Client/DistributedHashTable.cs b/Rhino.DistributedHashTable.Client/DistributedHashTable.cs new file mode 100644 index 0000000..fddbf97 --- /dev/null +++ b/Rhino.DistributedHashTable.Client/DistributedHashTable.cs @@ -0,0 +1,288 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Rhino.DistributedHashTable.Client.Exceptions; +using Rhino.DistributedHashTable.Client.Pooling; +using Rhino.DistributedHashTable.Client.Util; +using Rhino.DistributedHashTable.Exceptions; +using Rhino.DistributedHashTable.Internal; +using Rhino.DistributedHashTable.Parameters; +using Rhino.PersistentHashTable; + +namespace Rhino.DistributedHashTable.Client +{ + public class DistributedHashTable : IDistributedHashTable + { + private readonly IDistributedHashTableMaster master; + private readonly IConnectionPool pool; + private Topology topology; + + public DistributedHashTable(IDistributedHashTableMaster master, IConnectionPool pool) + { + this.master = master; + this.pool = pool; + topology = master.GetTopology(); + } + + public PutResult[] Put(params PutRequest[] valuesToAdd) + { + return PutInternal(valuesToAdd, 0); + } + + private PutResult[] PutInternal(PutRequest[] valuesToAdd, int backupIndex) + { + var results = new PutResult[valuesToAdd.Length]; + + var groupedByEndpoint = from req in valuesToAdd + let er = new + { + OriginalIndex = Array.IndexOf(valuesToAdd, req), + Put = new ExtendedPutRequest + { + Bytes = req.Bytes, + ExpiresAt = req.ExpiresAt, + IsReadOnly = req.IsReadOnly, + Key = req.Key, + OptimisticConcurrency = req.OptimisticConcurrency, + ParentVersions = req.ParentVersions, + Segment = GetSegmentFromKey(req.Key), + } + } + group er by GetEndpointByBackupIndex(topology.Segments[er.Put.Segment], backupIndex) into g + select g; + + foreach (var endpoint in groupedByEndpoint) + { + if (endpoint.Key == null) + throw new NoMoreBackupsException(); + + var requests = endpoint.ToArray(); + var putRequests = requests.Select(x => x.Put).ToArray(); + + var putsResults = GetPutsResults(endpoint.Key, putRequests, backupIndex); + for (var i = 0; i < putsResults.Length; i++) + { + results[requests[i].OriginalIndex] = putsResults[i]; + } + } + return results; + } + + private static NodeEndpoint GetEndpointByBackupIndex(Segment segment, int backupIndex) + { + if (backupIndex == 0) + return segment.AssignedEndpoint; + return segment.Backups.ElementAtOrDefault(backupIndex - 1); + } + + private PutResult[] GetPutsResults(NodeEndpoint endpoint, + ExtendedPutRequest[] putRequests, + int backupIndex) + { + try + { + using (var client = pool.Create(endpoint)) + { + return client.Put(topology.Version, putRequests); + } + } + catch (SeeOtherException soe) + { + return GetPutsResults(soe.Endpoint, putRequests, backupIndex); + } + catch (TopologyVersionDoesNotMatchException) + { + RefreshTopology(); + return PutInternal(putRequests, backupIndex); + } + catch (Exception) + { + try + { + return PutInternal(putRequests, backupIndex + 1); + } + catch (NoMoreBackupsException) + { + } + throw; + } + } + + private void RefreshTopology() + { + topology = master.GetTopology(); + } + + private static int GetSegmentFromKey(string key) + { + var crc32 = (int)Crc32.Compute(Encoding.Unicode.GetBytes(key)); + return Math.Abs(crc32 % Constants.NumberOfSegments); + } + + public Value[][] Get(params GetRequest[] valuesToGet) + { + return GetInternal(valuesToGet, 0); + + } + + private Value[][] GetInternal(GetRequest[] valuesToGet, + int backupIndex) + { + var results = new Value[valuesToGet.Length][]; + + var groupedByEndpoint = from req in valuesToGet + let er = new + { + OriginalIndex = Array.IndexOf(valuesToGet, req), + Get = new ExtendedGetRequest + { + Key = req.Key, + SpecifiedVersion = req.SpecifiedVersion, + Segment = GetSegmentFromKey(req.Key), + } + } + group er by GetEndpointByBackupIndex(topology.Segments[er.Get.Segment], backupIndex) into g + select g; + foreach (var endpoint in groupedByEndpoint) + { + if (endpoint.Key == null) + throw new NoMoreBackupsException(); + + var requests = endpoint.ToArray(); + var getRequests = requests.Select(x => x.Get).ToArray(); + + var putsResults = GetGetsResults(endpoint.Key, getRequests, backupIndex); + for (var i = 0; i < putsResults.Length; i++) + { + results[requests[i].OriginalIndex] = putsResults[i]; + } + + } + + return results; + } + + private Value[][] GetGetsResults(NodeEndpoint endpoint, + ExtendedGetRequest[] getRequests, + int backupIndex) + { + try + { + using (var client = pool.Create(endpoint)) + { + return client.Get(topology.Version, getRequests); + } + } + catch (SeeOtherException soe) + { + return GetGetsResults(soe.Endpoint, getRequests, backupIndex); + } + catch (TopologyVersionDoesNotMatchException) + { + RefreshTopology(); + return GetInternal(getRequests, backupIndex); + } + catch (Exception) + { + try + { + return GetInternal(getRequests, backupIndex + 1); + } + catch (NoMoreBackupsException) + { + } + throw; + } + } + + public bool[] Remove(params RemoveRequest[] valuesToRemove) + { + return RemoveInternal(valuesToRemove, 0); + } + + private bool[] RemoveInternal(RemoveRequest[] valuesToRemove, + int backupIndex) + { + var results = new bool[valuesToRemove.Length]; + + var groupedByEndpoint = from req in valuesToRemove + let er = new + { + OriginalIndex = Array.IndexOf(valuesToRemove, req), + Remove = new ExtendedRemoveRequest + { + Key = req.Key, + SpecificVersion = req.SpecificVersion, + Segment = GetSegmentFromKey(req.Key), + } + } + group er by GetEndpointByBackupIndex(topology.Segments[er.Remove.Segment], backupIndex) into g + select g; + + foreach (var endpoint in groupedByEndpoint) + { + if (endpoint.Key == null) + throw new NoMoreBackupsException(); + + var requests = endpoint.ToArray(); + var removeRequests = requests.Select(x => x.Remove).ToArray(); + + var removesResults = GetRemovesResults(endpoint.Key, removeRequests, backupIndex); + for (var i = 0; i < removesResults.Length; i++) + { + results[requests[i].OriginalIndex] = removesResults[i]; + } + } + return results; + } + + private bool[] GetRemovesResults(NodeEndpoint endpoint, + ExtendedRemoveRequest[] removeRequests, + int backupIndex) + { + try + { + using (var client = pool.Create(endpoint)) + { + return client.Remove(topology.Version, removeRequests); + } + } + catch (SeeOtherException soe) + { + return GetRemovesResults(soe.Endpoint, removeRequests, backupIndex); + } + catch (TopologyVersionDoesNotMatchException) + { + RefreshTopology(); + return RemoveInternal(removeRequests, backupIndex); + } + catch (Exception) + { + try + { + return RemoveInternal(removeRequests, backupIndex + 1); + } + catch (NoMoreBackupsException) + { + } + throw; + } + } + + public int[] AddItems(params AddItemRequest[] itemsToAdd) + { + throw new NotImplementedException(); + } + + public void RemoteItems(params RemoveItemRequest[] itemsToRemove) + { + throw new NotImplementedException(); + } + + public KeyValuePair[] GetItems(GetItemsRequest request) + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/Rhino.DistributedHashTable.Client/Exceptions/NoMoreBackupsException.cs b/Rhino.DistributedHashTable.Client/Exceptions/NoMoreBackupsException.cs new file mode 100644 index 0000000..02c2b9c --- /dev/null +++ b/Rhino.DistributedHashTable.Client/Exceptions/NoMoreBackupsException.cs @@ -0,0 +1,35 @@ +using System; +using System.Runtime.Serialization; + +namespace Rhino.DistributedHashTable.Client.Exceptions +{ + [Serializable] + public class NoMoreBackupsException : Exception + { + // + // For guidelines regarding the creation of new exception types, see + // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp + // and + // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp + // + + public NoMoreBackupsException() + { + } + + public NoMoreBackupsException(string message) : base(message) + { + } + + public NoMoreBackupsException(string message, + Exception inner) : base(message, inner) + { + } + + protected NoMoreBackupsException( + SerializationInfo info, + StreamingContext context) : base(info, context) + { + } + } +} \ No newline at end of file diff --git a/Rhino.DistributedHashTable/IDistributedHashTable.cs b/Rhino.DistributedHashTable.Client/IDistributedHashTable.cs similarity index 81% rename from Rhino.DistributedHashTable/IDistributedHashTable.cs rename to Rhino.DistributedHashTable.Client/IDistributedHashTable.cs index d472b09..cbd1f0b 100644 --- a/Rhino.DistributedHashTable/IDistributedHashTable.cs +++ b/Rhino.DistributedHashTable.Client/IDistributedHashTable.cs @@ -1,9 +1,8 @@ using System.Collections.Generic; +using Rhino.PersistentHashTable; -namespace Rhino.DistributedHashTable +namespace Rhino.DistributedHashTable.Client { - using PersistentHashTable; - public interface IDistributedHashTable { PutResult[] Put(params PutRequest[] valuesToAdd); diff --git a/Rhino.DistributedHashTable.Client/Pooling/DefaultConnectionPool.cs b/Rhino.DistributedHashTable.Client/Pooling/DefaultConnectionPool.cs new file mode 100644 index 0000000..84723bc --- /dev/null +++ b/Rhino.DistributedHashTable.Client/Pooling/DefaultConnectionPool.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using log4net; +using Rhino.DistributedHashTable.Internal; + +namespace Rhino.DistributedHashTable.Client.Pooling +{ + public class DefaultConnectionPool : IConnectionPool + { + private static readonly ILog log = LogManager.GetLogger(typeof (DefaultConnectionPool)); + readonly object locker = new object(); + + private readonly Dictionary> pooledConnections = + new Dictionary>(); + + public IDistributedHashTableStorage Create(NodeEndpoint endpoint) + { + PooledDistributedHashTableStorageClientConnection storage = null; + lock (locker) + { + LinkedList value; + if (pooledConnections.TryGetValue(endpoint, out value) && value.Count > 0) + { + storage = value.First.Value; + value.RemoveFirst(); + } + } + if (storage != null) + { + if (storage.Connected == false) + { + log.DebugFormat("Found unconnected connection in the pool for {0}", endpoint.Sync); + try + { + storage.Dispose(); + } + catch (Exception e) + { + log.Debug("Error when disposing unconnected connection in the pool", e); + } + } + else + { + return storage; + } + } + log.DebugFormat("Creating new connection in the pool to {0}", endpoint.Sync); + return new PooledDistributedHashTableStorageClientConnection(this, endpoint); + } + + private void PutMeBack(PooledDistributedHashTableStorageClientConnection connection) + { + lock (locker) + { + LinkedList value; + if (pooledConnections.TryGetValue(connection.Endpoint, out value) == false) + { + pooledConnections[connection.Endpoint] = value = new LinkedList(); + } + value.AddLast(connection); + } + log.DebugFormat("Put connection for {0} back in the pool", connection.Endpoint.Sync); + } + + class PooledDistributedHashTableStorageClientConnection : DistributedHashTableStorageClient + { + private readonly DefaultConnectionPool pool; + + public PooledDistributedHashTableStorageClientConnection( + DefaultConnectionPool pool, + NodeEndpoint endpoint) : base(endpoint) + { + this.pool = pool; + } + + public bool Connected + { + get { return client.Connected; } + } + + public override void Dispose() + { + if(Marshal.GetExceptionCode() != 0)//we are here because of some sort of error + { + log.Debug("There was an error during the usage of pooled client connection, will not return it to the pool (may be poisioned)"); + base.Dispose(); + } + else if(Connected == false) + { + log.Debug("The connection was disconnected, will not return connection to the pool"); + base.Dispose(); + } + else + { + pool.PutMeBack(this); + } + } + } + } +} \ No newline at end of file diff --git a/Rhino.DistributedHashTable.Client/Pooling/IConnectionPool.cs b/Rhino.DistributedHashTable.Client/Pooling/IConnectionPool.cs new file mode 100644 index 0000000..1f299ff --- /dev/null +++ b/Rhino.DistributedHashTable.Client/Pooling/IConnectionPool.cs @@ -0,0 +1,9 @@ +using Rhino.DistributedHashTable.Internal; + +namespace Rhino.DistributedHashTable.Client.Pooling +{ + public interface IConnectionPool + { + IDistributedHashTableStorage Create(NodeEndpoint endpoint); + } +} \ No newline at end of file diff --git a/Rhino.DistributedHashTable.Client/Properties/AssemblyInfo.cs b/Rhino.DistributedHashTable.Client/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..6be29e0 --- /dev/null +++ b/Rhino.DistributedHashTable.Client/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Rhino.DistributedHashTable.Client")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Microsoft")] +[assembly: AssemblyProduct("Rhino.DistributedHashTable.Client")] +[assembly: AssemblyCopyright("Copyright © Microsoft 2009")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("dcd8d96b-2ce9-4356-8b62-dab30519f277")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Rhino.DistributedHashTable.Client/Rhino.DistributedHashTable.Client.csproj b/Rhino.DistributedHashTable.Client/Rhino.DistributedHashTable.Client.csproj new file mode 100644 index 0000000..6dace70 --- /dev/null +++ b/Rhino.DistributedHashTable.Client/Rhino.DistributedHashTable.Client.csproj @@ -0,0 +1,82 @@ + + + + Debug + AnyCPU + 9.0.30729 + 2.0 + {D910183F-1578-43AE-BCD2-F5A9E19079FC} + Library + Properties + Rhino.DistributedHashTable.Client + Rhino.DistributedHashTable.Client + v3.5 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + False + ..\..\SharedLibs\Google\Google.ProtocolBuffers.dll + + + False + ..\Rhino.DistributedHashTable\bin\Debug\log4net.dll + + + + 3.5 + + + 3.5 + + + 3.5 + + + + + + + + + + + + + + + + {F30B2D63-CED5-4C8A-908F-0B5503D984A9} + Rhino.PersistentHashTable + + + {4E8D44D2-505D-488C-B92C-51147748B104} + Rhino.DistributedHashTable + + + + + \ No newline at end of file diff --git a/Rhino.DistributedHashTable.Client/Util/Crc32.cs b/Rhino.DistributedHashTable.Client/Util/Crc32.cs new file mode 100644 index 0000000..3546d1f --- /dev/null +++ b/Rhino.DistributedHashTable.Client/Util/Crc32.cs @@ -0,0 +1,126 @@ +using System; +using System.Security.Cryptography; + +namespace Rhino.DistributedHashTable.Client.Util +{ + /// + /// Taken from + /// http://damieng.com/blog/2006/08/08/calculating_crc32_in_c_and_net + /// + public class Crc32 : HashAlgorithm + { + public const UInt32 DefaultPolynomial = 0xedb88320; + public const UInt32 DefaultSeed = 0xffffffff; + private static UInt32[] defaultTable; + + private UInt32 hash; + private readonly UInt32 seed; + private readonly UInt32[] table; + + public Crc32() + { + table = InitializeTable(DefaultPolynomial); + seed = DefaultSeed; + Initialize(); + } + + public Crc32(UInt32 polynomial, + UInt32 seed) + { + table = InitializeTable(polynomial); + this.seed = seed; + Initialize(); + } + + public override int HashSize + { + get { return 32; } + } + + public override void Initialize() + { + hash = seed; + } + + protected override void HashCore(byte[] buffer, + int start, + int length) + { + hash = CalculateHash(table, hash, buffer, start, length); + } + + protected override byte[] HashFinal() + { + var hashBuffer = UInt32ToBigEndianBytes(~hash); + HashValue = hashBuffer; + return hashBuffer; + } + + public static UInt32 Compute(byte[] buffer) + { + return ~CalculateHash(InitializeTable(DefaultPolynomial), DefaultSeed, buffer, 0, buffer.Length); + } + + public static UInt32 Compute(UInt32 seed, + byte[] buffer) + { + return ~CalculateHash(InitializeTable(DefaultPolynomial), seed, buffer, 0, buffer.Length); + } + + public static UInt32 Compute(UInt32 polynomial, + UInt32 seed, + byte[] buffer) + { + return ~CalculateHash(InitializeTable(polynomial), seed, buffer, 0, buffer.Length); + } + + private static UInt32[] InitializeTable(UInt32 polynomial) + { + if (polynomial == DefaultPolynomial && defaultTable != null) + return defaultTable; + + var createTable = new UInt32[256]; + for (var i = 0; i < 256; i++) + { + var entry = (UInt32) i; + for (var j = 0; j < 8; j++) + if ((entry & 1) == 1) + entry = (entry >> 1) ^ polynomial; + else + entry = entry >> 1; + createTable[i] = entry; + } + + if (polynomial == DefaultPolynomial) + defaultTable = createTable; + + return createTable; + } + + private static UInt32 CalculateHash(UInt32[] table, + UInt32 seed, + byte[] buffer, + int start, + int size) + { + var crc = seed; + for (var i = start; i < size; i++) + unchecked + { + crc = (crc >> 8) ^ table[buffer[i] ^ crc & 0xff]; + } + return crc; + } + + private static byte[] UInt32ToBigEndianBytes(UInt32 x) + { + return new byte[] + { + (byte) ((x >> 24) & 0xff), + (byte) ((x >> 16) & 0xff), + (byte) ((x >> 8) & 0xff), + (byte) (x & 0xff) + }; + } + } +} \ No newline at end of file diff --git a/Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs b/Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs index cf649da..53715ff 100644 --- a/Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs +++ b/Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs @@ -2,6 +2,7 @@ using System.Linq; using System.Threading; using Rhino.DistributedHashTable.Client; +using Rhino.DistributedHashTable.Exceptions; using Rhino.DistributedHashTable.Hosting; using Rhino.DistributedHashTable.Internal; using Rhino.DistributedHashTable.Parameters; @@ -57,33 +58,39 @@ public void AfterBothNodesJoinedWillAutomaticallyReplicateToBackupNode() topology = masterProxy.GetTopology(); var segment = topology.Segments.First(x => x.AssignedEndpoint == storageHostA.Endpoint).Index; - using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint)) - { - nodeA.Put(topology.Version, new ExtendedPutRequest - { - Bytes = new byte[] { 2, 2, 0, 0 }, - Key = "abc", - Segment = segment - }); - } - - using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint)) + RepeatWhileThereAreTopologyChangedErrors(() => { - topology = masterProxy.GetTopology(); - Value[][] values = null; - for (var i = 0; i < 100; i++) + using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint)) { - values = nodeB.Get(topology.Version, new ExtendedGetRequest + nodeA.Put(topology.Version, new ExtendedPutRequest { + Bytes = new byte[] { 2, 2, 0, 0 }, Key = "abc", Segment = segment }); - if (values[0].Length != 0) - break; - Thread.Sleep(250); } - Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data); - } + }); + + RepeatWhileThereAreTopologyChangedErrors(() => + { + using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint)) + { + topology = masterProxy.GetTopology(); + Value[][] values = null; + for (var i = 0; i < 100; i++) + { + values = nodeB.Get(topology.Version, new ExtendedGetRequest + { + Key = "abc", + Segment = segment + }); + if (values[0].Length != 0) + break; + Thread.Sleep(250); + } + Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data); + } + }); } [Fact] @@ -106,35 +113,43 @@ public void CanReadValueFromBackupNodeThatUsedToBeTheSegmentOwner() Thread.Sleep(500); } - topology = masterProxy.GetTopology(); - var segment = topology.Segments.First(x => x.AssignedEndpoint == storageHostA.Endpoint).Index; - using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint)) - { - nodeA.Put(topology.Version, new ExtendedPutRequest - { - Bytes = new byte[] { 2, 2, 0, 0 }, - Key = "abc", - Segment = segment - }); - } - - using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint)) + int segment = 0; + + RepeatWhileThereAreTopologyChangedErrors(() => { topology = masterProxy.GetTopology(); - Value[][] values = null; - for (var i = 0; i < 100; i++) + segment = topology.Segments.First(x => x.AssignedEndpoint == storageHostA.Endpoint).Index; + using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint)) { - values = nodeB.Get(topology.Version, new ExtendedGetRequest + nodeA.Put(topology.Version, new ExtendedPutRequest { + Bytes = new byte[] {2, 2, 0, 0}, Key = "abc", Segment = segment }); - if (values[0].Length != 0) - break; - Thread.Sleep(250); } - Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data); - } + }); + + RepeatWhileThereAreTopologyChangedErrors(() => + { + using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint)) + { + topology = masterProxy.GetTopology(); + Value[][] values = null; + for (var i = 0; i < 100; i++) + { + values = nodeB.Get(topology.Version, new ExtendedGetRequest + { + Key = "abc", + Segment = segment + }); + if (values[0].Length != 0) + break; + Thread.Sleep(250); + } + Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data); + } + }); using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint)) { @@ -237,5 +252,23 @@ public void WillReplicateValuesToSecondJoin() } } } + + // we have to do this ugliness because the cluster is in a state of flux right now + // with segments moving & topology changes + public static void RepeatWhileThereAreTopologyChangedErrors(Action action) + { + while(true) + { + try + { + action(); + break; + } + catch (TopologyVersionDoesNotMatchException) + { + + } + } + } } } \ No newline at end of file diff --git a/Rhino.DistributedHashTable.ClusterTests/MasterOverTheNetwork.cs b/Rhino.DistributedHashTable.ClusterTests/MasterOverTheNetwork.cs index fb40376..c6da55a 100644 --- a/Rhino.DistributedHashTable.ClusterTests/MasterOverTheNetwork.cs +++ b/Rhino.DistributedHashTable.ClusterTests/MasterOverTheNetwork.cs @@ -30,7 +30,7 @@ public void CanGetTopologyWhenThereAreNoNodes() Assert.NotNull(topology); Assert.NotEqual(0, topology.Version); Assert.NotEqual(DateTime.MinValue, topology.Timestamp); - Assert.Equal(8192, topology.Segments.Length); + Assert.Equal(Constants.NumberOfSegments, topology.Segments.Length); Assert.True(topology.Segments.All(x => x.AssignedEndpoint == null)); } @@ -43,7 +43,7 @@ public void CanJoinToMaster() Sync = new Uri("rhino.dht://localhost:2201") }; var segments = masterProxy.Join(endpoint); - Assert.Equal(8192, segments.Length); + Assert.Equal(Constants.NumberOfSegments, segments.Length); Assert.True(segments.All(x => x.AssignedEndpoint.Equals(endpoint))); } diff --git a/Rhino.DistributedHashTable.IntegrationTests/OnlineRangeReplicationCommandTest.cs b/Rhino.DistributedHashTable.IntegrationTests/OnlineRangeReplicationCommandTest.cs index 395d054..dd4af6a 100644 --- a/Rhino.DistributedHashTable.IntegrationTests/OnlineRangeReplicationCommandTest.cs +++ b/Rhino.DistributedHashTable.IntegrationTests/OnlineRangeReplicationCommandTest.cs @@ -27,12 +27,14 @@ public OnlineSegmentReplicationCommandTest() storage = MockRepository.GenerateStub(); node.Storage = storage; node.Stub(x => x.GetTopologyVersion()).Return(topologyVersion); + var factory = MockRepository.GenerateStub(); + factory.Stub(x => x.Create(null)).IgnoreArguments().Return(replication); command = new OnlineSegmentReplicationCommand( endpoint, new[] { new Segment { Index = 0 }, new Segment { Index = 1 }, }, ReplicationType.Ownership, node, - replication); + factory); } [Fact] diff --git a/Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs b/Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs index 8646338..aba5488 100644 --- a/Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs +++ b/Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs @@ -57,7 +57,7 @@ public void SegmentAssignmentsWillNotChange() [Fact] public void WillNotChangeTotalNumberOfSegments() { - Assert.Equal(8192, master.Segments.Count()); + Assert.Equal(Constants.NumberOfSegments, master.Segments.Count()); } [Fact] @@ -119,7 +119,7 @@ public NewEndpointJoiningMasterWithTwoNodes() [Fact] public void WillNotChangeTotalNumberOfSegments() { - Assert.Equal(8192, master.Segments.Count()); + Assert.Equal(Constants.NumberOfSegments, master.Segments.Count()); } [Fact] diff --git a/Rhino.DistributedHashTable.sln b/Rhino.DistributedHashTable.sln index bf36c00..ddf5667 100644 --- a/Rhino.DistributedHashTable.sln +++ b/Rhino.DistributedHashTable.sln @@ -17,6 +17,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.Queues.Tests", "..\qu EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.DistributedHashTable.ClusterTests", "Rhino.DistributedHashTable.ClusterTests\Rhino.DistributedHashTable.ClusterTests.csproj", "{D9377684-AC14-47F6-A3E8-E84AD0FC2BAA}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.DistributedHashTable.Client", "Rhino.DistributedHashTable.Client\Rhino.DistributedHashTable.Client.csproj", "{D910183F-1578-43AE-BCD2-F5A9E19079FC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -55,6 +57,10 @@ Global {D9377684-AC14-47F6-A3E8-E84AD0FC2BAA}.Debug|Any CPU.Build.0 = Debug|Any CPU {D9377684-AC14-47F6-A3E8-E84AD0FC2BAA}.Release|Any CPU.ActiveCfg = Release|Any CPU {D9377684-AC14-47F6-A3E8-E84AD0FC2BAA}.Release|Any CPU.Build.0 = Release|Any CPU + {D910183F-1578-43AE-BCD2-F5A9E19079FC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D910183F-1578-43AE-BCD2-F5A9E19079FC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D910183F-1578-43AE-BCD2-F5A9E19079FC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D910183F-1578-43AE-BCD2-F5A9E19079FC}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Rhino.DistributedHashTable/Client/DistributedHashTableStorageClient.cs b/Rhino.DistributedHashTable/Client/DistributedHashTableStorageClient.cs index d34f564..5ce25e3 100644 --- a/Rhino.DistributedHashTable/Client/DistributedHashTableStorageClient.cs +++ b/Rhino.DistributedHashTable/Client/DistributedHashTableStorageClient.cs @@ -26,7 +26,7 @@ public class DistributedHashTableStorageClient : IDistributedHashTableRemoteNode { private readonly NodeEndpoint endpoint; - private readonly TcpClient client; + protected readonly TcpClient client; private readonly NetworkStream stream; private readonly MessageStreamWriter writer; @@ -43,7 +43,7 @@ public NodeEndpoint Endpoint get { return endpoint; } } - public void Dispose() + public virtual void Dispose() { stream.Dispose(); client.Close(); diff --git a/Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs b/Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs index 570e14b..21a10d6 100644 --- a/Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs +++ b/Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs @@ -13,9 +13,10 @@ public class OnlineSegmentReplicationCommand : ICommand private readonly ILog log = LogManager.GetLogger(typeof(OnlineSegmentReplicationCommand)); private readonly IDistributedHashTableNode node; - private readonly IDistributedHashTableNodeReplication otherNode; - private readonly string endpoint; + private readonly IDistributedHashTableNodeReplicationFactory factory; + private IDistributedHashTableNodeReplication otherNode; private readonly Segment[] segments; + private readonly NodeEndpoint endpoint; private readonly ReplicationType type; private bool continueWorking = true; @@ -26,13 +27,13 @@ public class OnlineSegmentReplicationCommand : ICommand Segment[] segments, ReplicationType type, IDistributedHashTableNode node, - IDistributedHashTableNodeReplication otherNode) + IDistributedHashTableNodeReplicationFactory factory) { - this.endpoint = endpoint.Sync.ToString(); + this.endpoint = endpoint; this.segments = segments; this.type = type; this.node = node; - this.otherNode = otherNode; + this.factory = factory; } public void AbortExecution() @@ -42,13 +43,14 @@ public void AbortExecution() public bool Execute() { - log.DebugFormat("Replication from {0} of {1} segments for {2}", endpoint, segments.Length, type); + log.DebugFormat("Replication from {0} of {1} segments for {2}", endpoint.Sync, segments.Length, type); var processedSegments = new List(); if (continueWorking == false) return false; try { + otherNode = factory.Create(endpoint); var segmentsToLoad = AssignAllEmptySegmentsFromEndpoint(processedSegments); if (continueWorking == false) return false; @@ -56,7 +58,7 @@ public bool Execute() } catch (Exception e) { - log.Warn("Could not replicate segments", e); + log.Warn("Could not replicate segments for " + type, e); return false; } finally @@ -70,8 +72,9 @@ public bool Execute() { if (log.IsWarnEnabled) { - log.WarnFormat("Giving up replicating the following segments: [{0}]", - string.Join(", ", array.Select(x => x.ToString()).ToArray())); + log.WarnFormat("Giving up replicating for {0} the {0} segments", + type, + array.Length); } node.GivingUpOn(type, array); } @@ -123,12 +126,12 @@ private void ReplicateSegment(Segment segment) { log.DebugFormat("Starting replication of segment [{0}] from {1}", segment, - endpoint); + endpoint.Sync); var result = otherNode.ReplicateNextPage(node.Endpoint, type, segment.Index); log.DebugFormat("Replication of segment [{0}] from {1} got {2} puts & {3} removals", segment, - endpoint, + endpoint.Sync, result.PutRequests.Length, result.RemoveRequests.Length); @@ -155,7 +158,7 @@ private List AssignAllEmptySegmentsFromEndpoint(List processedSegm processedSegments.AddRange(assignedSegments); node.DoneReplicatingSegments(type, assignedSegments); - log.DebugFormat("{0} empty segments assigned from {1}", assignedSegments.Length, endpoint); + log.DebugFormat("{0} empty segments assigned from {1}", assignedSegments.Length, endpoint.Sync); remainingSegments.AddRange( segments.Where(x => assignedSegments.Contains(x.Index) == false) ); diff --git a/Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs b/Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs index d82f4a2..0ace7b4 100644 --- a/Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs +++ b/Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs @@ -24,6 +24,7 @@ public class DistributedHashTableStorageHost : IDisposable private readonly IDistributedHashTableNode node; private readonly QueueManager queueManager; private readonly IDistributedHashTableStorage storage; + private IDistributedHashTableNodeReplication replication; public DistributedHashTableStorageHost(Uri master) : this(master, "node", 2201) @@ -50,7 +51,9 @@ public DistributedHashTableStorageHost(Uri master) queueManager, new NonPooledDistributedHashTableNodeFactory() ); - storage = new DistributedHashTableStorage(name + ".data.esent", node); + var dhtStorage = new DistributedHashTableStorage(name + ".data.esent", node); + replication = dhtStorage.Replication; + storage = dhtStorage; listener = new TcpListener( Socket.OSSupportsIPv6 ? IPAddress.IPv6Any : IPAddress.Any, @@ -180,7 +183,7 @@ private void HandleTopologyUpdate(MessageStreamWriter write private void HandleReplicateNextPage(StorageMessageUnion wrapper, MessageStreamWriter writer) { - var replicationResult = storage.Replication.ReplicateNextPage( + var replicationResult = replication.ReplicateNextPage( wrapper.ReplicateNextPageRequest.ReplicationEndpoint.GetNodeEndpoint(), wrapper.ReplicateNextPageRequest.Type == ReplicationType.Backup ? Internal.ReplicationType.Backup : Internal.ReplicationType.Ownership, wrapper.ReplicateNextPageRequest.Segment @@ -206,7 +209,7 @@ private void HandleTopologyUpdate(MessageStreamWriter write private void HandleAssignEmpty(StorageMessageUnion wrapper, MessageStreamWriter writer) { - var segments = storage.Replication.AssignAllEmptySegments( + var segments = replication.AssignAllEmptySegments( wrapper.AssignAllEmptySegmentsRequest.ReplicationEndpoint.GetNodeEndpoint(), wrapper.AssignAllEmptySegmentsRequest.Type == ReplicationType.Backup ? Internal.ReplicationType.Backup : Internal.ReplicationType.Ownership, wrapper.AssignAllEmptySegmentsRequest.SegmentsList.ToArray() diff --git a/Rhino.DistributedHashTable/Internal/Constants.cs b/Rhino.DistributedHashTable/Internal/Constants.cs index d8fc641..60c9bbd 100644 --- a/Rhino.DistributedHashTable/Internal/Constants.cs +++ b/Rhino.DistributedHashTable/Internal/Constants.cs @@ -2,6 +2,7 @@ namespace Rhino.DistributedHashTable.Internal { public static class Constants { + public const int NumberOfSegments = 8192; public const string RhinoDhtStartToken = "@rdht://"; public const string MovedSegment = RhinoDhtStartToken + "Segment/Moved/"; } diff --git a/Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs b/Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs index 24faed5..4be9310 100644 --- a/Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs +++ b/Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs @@ -163,7 +163,7 @@ public Topology GetTopology() private static IEnumerable CreateDefaultSegments() { - for (var i = 0; i < 8192; i++) + for (var i = 0; i < Constants.NumberOfSegments; i++) { var segment = new Segment { diff --git a/Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs b/Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs index efd7a4a..a261cd1 100644 --- a/Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs +++ b/Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs @@ -183,7 +183,7 @@ public void Start() segmentToReplicate.ToArray(), ReplicationType.Ownership, this, - replicationFactory.Create(segmentToReplicate.Key)) + replicationFactory) ); } @@ -214,7 +214,7 @@ private void StartPendingBackupsForCurrentNode(Topology topology) segmentToReplicate.ToArray(), ReplicationType.Backup, this, - replicationFactory.Create(segmentToReplicate.Key)); + replicationFactory); Interlocked.Increment(ref currentlyReplicatingBackups); @@ -257,7 +257,7 @@ private void RemoveMoveMarkerForSegmentsThatWeAReNoLongerResponsibleFor(Topology IsLocal = true }; var movedMarkers = Storage.Get(GetTopologyVersion(), - segmentsThatWereMovedFromNode.ToArray()); + segmentsThatWereMovedFromNode.ToArray()); var requests = movedMarkers .Where(x => x.Length == 1) .Select(values => values[0]) diff --git a/Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs b/Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs index a059c50..1447336 100644 --- a/Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs +++ b/Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs @@ -125,13 +125,21 @@ public PutResult[] Put(int topologyVersion, params ExtendedPutRequest[] valuesTo } } - private void AssertMatchingTopologyVersion(int topologyVersion) + private void AssertMatchingTopologyVersion(int topologyVersionFromClient) { - if(TopologyVersion != topologyVersion) + //client thinks that the version is newer + if(topologyVersionFromClient > TopologyVersion) + { + log.InfoFormat("Got request for topology {0} but current local version is {1}, forcing topology update, request will still fail", + topologyVersionFromClient, + TopologyVersion); + distributedHashTableNode.UpdateTopology(); + } + if(TopologyVersion != topologyVersionFromClient) { log.InfoFormat("Got request for topology {0} but current local version is {1}", - TopologyVersion, - topologyVersion); + topologyVersionFromClient, + TopologyVersion); throw new TopologyVersionDoesNotMatchException("Topology Version doesn't match, you need to refresh the topology from the master"); } } diff --git a/Rhino.DistributedHashTable/Internal/IDistributedHashTableStorage.cs b/Rhino.DistributedHashTable/Internal/IDistributedHashTableStorage.cs index 0cb0e45..52659e5 100644 --- a/Rhino.DistributedHashTable/Internal/IDistributedHashTableStorage.cs +++ b/Rhino.DistributedHashTable/Internal/IDistributedHashTableStorage.cs @@ -1,6 +1,5 @@ using System; using Rhino.DistributedHashTable.Parameters; -using Rhino.DistributedHashTable.Remote; using Rhino.PersistentHashTable; namespace Rhino.DistributedHashTable.Internal @@ -12,7 +11,5 @@ public interface IDistributedHashTableStorage : IDisposable bool[] Remove(int topologyVersion, params ExtendedRemoveRequest[] valuesToRemove); Value[][] Get(int topologyVersion, params ExtendedGetRequest[] valuesToGet); - - IDistributedHashTableNodeReplication Replication { get; } } } \ No newline at end of file diff --git a/Rhino.DistributedHashTable/Rhino.DistributedHashTable.csproj b/Rhino.DistributedHashTable/Rhino.DistributedHashTable.csproj index bf92d01..01b7b27 100644 --- a/Rhino.DistributedHashTable/Rhino.DistributedHashTable.csproj +++ b/Rhino.DistributedHashTable/Rhino.DistributedHashTable.csproj @@ -97,7 +97,6 @@ -