Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Adding start of client project - note that it is not tested or even run!

Fixed a bunch of issues relating to server availability in the face of failure.
All tests are passing, including the cluster tests

git-svn-id: https://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools/experiments/dht/dht@2195 079b0acf-d9fa-0310-9935-e5ade295c882
  • Loading branch information...
commit c950dd9838c122b73c9506b5abbd35644056428a 1 parent a803da4
ayenderahien authored
Showing with 806 additions and 78 deletions.
  1. +288 −0 Rhino.DistributedHashTable.Client/DistributedHashTable.cs
  2. +35 −0 Rhino.DistributedHashTable.Client/Exceptions/NoMoreBackupsException.cs
  3. +2 −3 {Rhino.DistributedHashTable → Rhino.DistributedHashTable.Client}/IDistributedHashTable.cs
  4. +101 −0 Rhino.DistributedHashTable.Client/Pooling/DefaultConnectionPool.cs
  5. +9 −0 Rhino.DistributedHashTable.Client/Pooling/IConnectionPool.cs
  6. +36 −0 Rhino.DistributedHashTable.Client/Properties/AssemblyInfo.cs
  7. +82 −0 Rhino.DistributedHashTable.Client/Rhino.DistributedHashTable.Client.csproj
  8. +126 −0 Rhino.DistributedHashTable.Client/Util/Crc32.cs
  9. +74 −41 Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs
  10. +2 −2 Rhino.DistributedHashTable.ClusterTests/MasterOverTheNetwork.cs
  11. +3 −1 Rhino.DistributedHashTable.IntegrationTests/OnlineRangeReplicationCommandTest.cs
  12. +2 −2 Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs
  13. +6 −0 Rhino.DistributedHashTable.sln
  14. +2 −2 Rhino.DistributedHashTable/Client/DistributedHashTableStorageClient.cs
  15. +15 −12 Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs
  16. +6 −3 Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
  17. +1 −0  Rhino.DistributedHashTable/Internal/Constants.cs
  18. +1 −1  Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs
  19. +3 −3 Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs
  20. +12 −4 Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs
  21. +0 −3  Rhino.DistributedHashTable/Internal/IDistributedHashTableStorage.cs
  22. +0 −1  Rhino.DistributedHashTable/Rhino.DistributedHashTable.csproj
288 Rhino.DistributedHashTable.Client/DistributedHashTable.cs
View
@@ -0,0 +1,288 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Rhino.DistributedHashTable.Client.Exceptions;
+using Rhino.DistributedHashTable.Client.Pooling;
+using Rhino.DistributedHashTable.Client.Util;
+using Rhino.DistributedHashTable.Exceptions;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Parameters;
+using Rhino.PersistentHashTable;
+
+namespace Rhino.DistributedHashTable.Client
+{
+ public class DistributedHashTable : IDistributedHashTable
+ {
+ private readonly IDistributedHashTableMaster master;
+ private readonly IConnectionPool pool;
+ private Topology topology;
+
+ public DistributedHashTable(IDistributedHashTableMaster master, IConnectionPool pool)
+ {
+ this.master = master;
+ this.pool = pool;
+ topology = master.GetTopology();
+ }
+
+ public PutResult[] Put(params PutRequest[] valuesToAdd)
+ {
+ return PutInternal(valuesToAdd, 0);
+ }
+
+ private PutResult[] PutInternal(PutRequest[] valuesToAdd, int backupIndex)
+ {
+ var results = new PutResult[valuesToAdd.Length];
+
+ var groupedByEndpoint = from req in valuesToAdd
+ let er = new
+ {
+ OriginalIndex = Array.IndexOf(valuesToAdd, req),
+ Put = new ExtendedPutRequest
+ {
+ Bytes = req.Bytes,
+ ExpiresAt = req.ExpiresAt,
+ IsReadOnly = req.IsReadOnly,
+ Key = req.Key,
+ OptimisticConcurrency = req.OptimisticConcurrency,
+ ParentVersions = req.ParentVersions,
+ Segment = GetSegmentFromKey(req.Key),
+ }
+ }
+ group er by GetEndpointByBackupIndex(topology.Segments[er.Put.Segment], backupIndex) into g
+ select g;
+
+ foreach (var endpoint in groupedByEndpoint)
+ {
+ if (endpoint.Key == null)
+ throw new NoMoreBackupsException();
+
+ var requests = endpoint.ToArray();
+ var putRequests = requests.Select(x => x.Put).ToArray();
+
+ var putsResults = GetPutsResults(endpoint.Key, putRequests, backupIndex);
+ for (var i = 0; i < putsResults.Length; i++)
+ {
+ results[requests[i].OriginalIndex] = putsResults[i];
+ }
+ }
+ return results;
+ }
+
+ private static NodeEndpoint GetEndpointByBackupIndex(Segment segment, int backupIndex)
+ {
+ if (backupIndex == 0)
+ return segment.AssignedEndpoint;
+ return segment.Backups.ElementAtOrDefault(backupIndex - 1);
+ }
+
+ private PutResult[] GetPutsResults(NodeEndpoint endpoint,
+ ExtendedPutRequest[] putRequests,
+ int backupIndex)
+ {
+ try
+ {
+ using (var client = pool.Create(endpoint))
+ {
+ return client.Put(topology.Version, putRequests);
+ }
+ }
+ catch (SeeOtherException soe)
+ {
+ return GetPutsResults(soe.Endpoint, putRequests, backupIndex);
+ }
+ catch (TopologyVersionDoesNotMatchException)
+ {
+ RefreshTopology();
+ return PutInternal(putRequests, backupIndex);
+ }
+ catch (Exception)
+ {
+ try
+ {
+ return PutInternal(putRequests, backupIndex + 1);
+ }
+ catch (NoMoreBackupsException)
+ {
+ }
+ throw;
+ }
+ }
+
+ private void RefreshTopology()
+ {
+ topology = master.GetTopology();
+ }
+
+ private static int GetSegmentFromKey(string key)
+ {
+ var crc32 = (int)Crc32.Compute(Encoding.Unicode.GetBytes(key));
+ return Math.Abs(crc32 % Constants.NumberOfSegments);
+ }
+
+ public Value[][] Get(params GetRequest[] valuesToGet)
+ {
+ return GetInternal(valuesToGet, 0);
+
+ }
+
+ private Value[][] GetInternal(GetRequest[] valuesToGet,
+ int backupIndex)
+ {
+ var results = new Value[valuesToGet.Length][];
+
+ var groupedByEndpoint = from req in valuesToGet
+ let er = new
+ {
+ OriginalIndex = Array.IndexOf(valuesToGet, req),
+ Get = new ExtendedGetRequest
+ {
+ Key = req.Key,
+ SpecifiedVersion = req.SpecifiedVersion,
+ Segment = GetSegmentFromKey(req.Key),
+ }
+ }
+ group er by GetEndpointByBackupIndex(topology.Segments[er.Get.Segment], backupIndex) into g
+ select g;
+ foreach (var endpoint in groupedByEndpoint)
+ {
+ if (endpoint.Key == null)
+ throw new NoMoreBackupsException();
+
+ var requests = endpoint.ToArray();
+ var getRequests = requests.Select(x => x.Get).ToArray();
+
+ var putsResults = GetGetsResults(endpoint.Key, getRequests, backupIndex);
+ for (var i = 0; i < putsResults.Length; i++)
+ {
+ results[requests[i].OriginalIndex] = putsResults[i];
+ }
+
+ }
+
+ return results;
+ }
+
+ private Value[][] GetGetsResults(NodeEndpoint endpoint,
+ ExtendedGetRequest[] getRequests,
+ int backupIndex)
+ {
+ try
+ {
+ using (var client = pool.Create(endpoint))
+ {
+ return client.Get(topology.Version, getRequests);
+ }
+ }
+ catch (SeeOtherException soe)
+ {
+ return GetGetsResults(soe.Endpoint, getRequests, backupIndex);
+ }
+ catch (TopologyVersionDoesNotMatchException)
+ {
+ RefreshTopology();
+ return GetInternal(getRequests, backupIndex);
+ }
+ catch (Exception)
+ {
+ try
+ {
+ return GetInternal(getRequests, backupIndex + 1);
+ }
+ catch (NoMoreBackupsException)
+ {
+ }
+ throw;
+ }
+ }
+
+ public bool[] Remove(params RemoveRequest[] valuesToRemove)
+ {
+ return RemoveInternal(valuesToRemove, 0);
+ }
+
+ private bool[] RemoveInternal(RemoveRequest[] valuesToRemove,
+ int backupIndex)
+ {
+ var results = new bool[valuesToRemove.Length];
+
+ var groupedByEndpoint = from req in valuesToRemove
+ let er = new
+ {
+ OriginalIndex = Array.IndexOf(valuesToRemove, req),
+ Remove = new ExtendedRemoveRequest
+ {
+ Key = req.Key,
+ SpecificVersion = req.SpecificVersion,
+ Segment = GetSegmentFromKey(req.Key),
+ }
+ }
+ group er by GetEndpointByBackupIndex(topology.Segments[er.Remove.Segment], backupIndex) into g
+ select g;
+
+ foreach (var endpoint in groupedByEndpoint)
+ {
+ if (endpoint.Key == null)
+ throw new NoMoreBackupsException();
+
+ var requests = endpoint.ToArray();
+ var removeRequests = requests.Select(x => x.Remove).ToArray();
+
+ var removesResults = GetRemovesResults(endpoint.Key, removeRequests, backupIndex);
+ for (var i = 0; i < removesResults.Length; i++)
+ {
+ results[requests[i].OriginalIndex] = removesResults[i];
+ }
+ }
+ return results;
+ }
+
+ private bool[] GetRemovesResults(NodeEndpoint endpoint,
+ ExtendedRemoveRequest[] removeRequests,
+ int backupIndex)
+ {
+ try
+ {
+ using (var client = pool.Create(endpoint))
+ {
+ return client.Remove(topology.Version, removeRequests);
+ }
+ }
+ catch (SeeOtherException soe)
+ {
+ return GetRemovesResults(soe.Endpoint, removeRequests, backupIndex);
+ }
+ catch (TopologyVersionDoesNotMatchException)
+ {
+ RefreshTopology();
+ return RemoveInternal(removeRequests, backupIndex);
+ }
+ catch (Exception)
+ {
+ try
+ {
+ return RemoveInternal(removeRequests, backupIndex + 1);
+ }
+ catch (NoMoreBackupsException)
+ {
+ }
+ throw;
+ }
+ }
+
+ public int[] AddItems(params AddItemRequest[] itemsToAdd)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void RemoteItems(params RemoveItemRequest[] itemsToRemove)
+ {
+ throw new NotImplementedException();
+ }
+
+ public KeyValuePair<int, byte[]>[] GetItems(GetItemsRequest request)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
35 Rhino.DistributedHashTable.Client/Exceptions/NoMoreBackupsException.cs
View
@@ -0,0 +1,35 @@
+using System;
+using System.Runtime.Serialization;
+
+namespace Rhino.DistributedHashTable.Client.Exceptions
+{
+ [Serializable]
+ public class NoMoreBackupsException : Exception
+ {
+ //
+ // For guidelines regarding the creation of new exception types, see
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp
+ // and
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp
+ //
+
+ public NoMoreBackupsException()
+ {
+ }
+
+ public NoMoreBackupsException(string message) : base(message)
+ {
+ }
+
+ public NoMoreBackupsException(string message,
+ Exception inner) : base(message, inner)
+ {
+ }
+
+ protected NoMoreBackupsException(
+ SerializationInfo info,
+ StreamingContext context) : base(info, context)
+ {
+ }
+ }
+}
5 Rhino.DistributedHashTable/IDistributedHashTable.cs → ...tributedHashTable.Client/IDistributedHashTable.cs
View
@@ -1,9 +1,8 @@
using System.Collections.Generic;
+using Rhino.PersistentHashTable;
-namespace Rhino.DistributedHashTable
+namespace Rhino.DistributedHashTable.Client
{
- using PersistentHashTable;
-
public interface IDistributedHashTable
{
PutResult[] Put(params PutRequest[] valuesToAdd);
101 Rhino.DistributedHashTable.Client/Pooling/DefaultConnectionPool.cs
View
@@ -0,0 +1,101 @@
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using log4net;
+using Rhino.DistributedHashTable.Internal;
+
+namespace Rhino.DistributedHashTable.Client.Pooling
+{
+ public class DefaultConnectionPool : IConnectionPool
+ {
+ private static readonly ILog log = LogManager.GetLogger(typeof (DefaultConnectionPool));
+ readonly object locker = new object();
+
+ private readonly Dictionary<NodeEndpoint, LinkedList<PooledDistributedHashTableStorageClientConnection>> pooledConnections =
+ new Dictionary<NodeEndpoint, LinkedList<PooledDistributedHashTableStorageClientConnection>>();
+
+ public IDistributedHashTableStorage Create(NodeEndpoint endpoint)
+ {
+ PooledDistributedHashTableStorageClientConnection storage = null;
+ lock (locker)
+ {
+ LinkedList<PooledDistributedHashTableStorageClientConnection> value;
+ if (pooledConnections.TryGetValue(endpoint, out value) && value.Count > 0)
+ {
+ storage = value.First.Value;
+ value.RemoveFirst();
+ }
+ }
+ if (storage != null)
+ {
+ if (storage.Connected == false)
+ {
+ log.DebugFormat("Found unconnected connection in the pool for {0}", endpoint.Sync);
+ try
+ {
+ storage.Dispose();
+ }
+ catch (Exception e)
+ {
+ log.Debug("Error when disposing unconnected connection in the pool", e);
+ }
+ }
+ else
+ {
+ return storage;
+ }
+ }
+ log.DebugFormat("Creating new connection in the pool to {0}", endpoint.Sync);
+ return new PooledDistributedHashTableStorageClientConnection(this, endpoint);
+ }
+
+ private void PutMeBack(PooledDistributedHashTableStorageClientConnection connection)
+ {
+ lock (locker)
+ {
+ LinkedList<PooledDistributedHashTableStorageClientConnection> value;
+ if (pooledConnections.TryGetValue(connection.Endpoint, out value) == false)
+ {
+ pooledConnections[connection.Endpoint] = value = new LinkedList<PooledDistributedHashTableStorageClientConnection>();
+ }
+ value.AddLast(connection);
+ }
+ log.DebugFormat("Put connection for {0} back in the pool", connection.Endpoint.Sync);
+ }
+
+ class PooledDistributedHashTableStorageClientConnection : DistributedHashTableStorageClient
+ {
+ private readonly DefaultConnectionPool pool;
+
+ public PooledDistributedHashTableStorageClientConnection(
+ DefaultConnectionPool pool,
+ NodeEndpoint endpoint) : base(endpoint)
+ {
+ this.pool = pool;
+ }
+
+ public bool Connected
+ {
+ get { return client.Connected; }
+ }
+
+ public override void Dispose()
+ {
+ if(Marshal.GetExceptionCode() != 0)//we are here because of some sort of error
+ {
+ log.Debug("There was an error during the usage of pooled client connection, will not return it to the pool (may be poisioned)");
+ base.Dispose();
+ }
+ else if(Connected == false)
+ {
+ log.Debug("The connection was disconnected, will not return connection to the pool");
+ base.Dispose();
+ }
+ else
+ {
+ pool.PutMeBack(this);
+ }
+ }
+ }
+ }
+}
9 Rhino.DistributedHashTable.Client/Pooling/IConnectionPool.cs
View
@@ -0,0 +1,9 @@
+using Rhino.DistributedHashTable.Internal;
+
+namespace Rhino.DistributedHashTable.Client.Pooling
+{
+ public interface IConnectionPool
+ {
+ IDistributedHashTableStorage Create(NodeEndpoint endpoint);
+ }
+}
36 Rhino.DistributedHashTable.Client/Properties/AssemblyInfo.cs
View
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Rhino.DistributedHashTable.Client")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Microsoft")]
+[assembly: AssemblyProduct("Rhino.DistributedHashTable.Client")]
+[assembly: AssemblyCopyright("Copyright © Microsoft 2009")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("dcd8d96b-2ce9-4356-8b62-dab30519f277")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
82 Rhino.DistributedHashTable.Client/Rhino.DistributedHashTable.Client.csproj
View
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.30729</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{D910183F-1578-43AE-BCD2-F5A9E19079FC}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Rhino.DistributedHashTable.Client</RootNamespace>
+ <AssemblyName>Rhino.DistributedHashTable.Client</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Google.ProtocolBuffers, Version=1.0.0.0, Culture=neutral, PublicKeyToken=17b3b1f090c3ea48, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\SharedLibs\Google\Google.ProtocolBuffers.dll</HintPath>
+ </Reference>
+ <Reference Include="log4net, Version=1.2.10.0, Culture=neutral, PublicKeyToken=1b44e1d426115821, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Rhino.DistributedHashTable\bin\Debug\log4net.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="DistributedHashTable.cs" />
+ <Compile Include="Exceptions\NoMoreBackupsException.cs" />
+ <Compile Include="IDistributedHashTable.cs" />
+ <Compile Include="Pooling\DefaultConnectionPool.cs" />
+ <Compile Include="Pooling\IConnectionPool.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Util\Crc32.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\pht\Rhino.PersistentHashTable\Rhino.PersistentHashTable.csproj">
+ <Project>{F30B2D63-CED5-4C8A-908F-0B5503D984A9}</Project>
+ <Name>Rhino.PersistentHashTable</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Rhino.DistributedHashTable\Rhino.DistributedHashTable.csproj">
+ <Project>{4E8D44D2-505D-488C-B92C-51147748B104}</Project>
+ <Name>Rhino.DistributedHashTable</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
126 Rhino.DistributedHashTable.Client/Util/Crc32.cs
View
@@ -0,0 +1,126 @@
+using System;
+using System.Security.Cryptography;
+
+namespace Rhino.DistributedHashTable.Client.Util
+{
+ /// <summary>
+ /// Taken from
+ /// http://damieng.com/blog/2006/08/08/calculating_crc32_in_c_and_net
+ /// </summary>
+ public class Crc32 : HashAlgorithm
+ {
+ public const UInt32 DefaultPolynomial = 0xedb88320;
+ public const UInt32 DefaultSeed = 0xffffffff;
+ private static UInt32[] defaultTable;
+
+ private UInt32 hash;
+ private readonly UInt32 seed;
+ private readonly UInt32[] table;
+
+ public Crc32()
+ {
+ table = InitializeTable(DefaultPolynomial);
+ seed = DefaultSeed;
+ Initialize();
+ }
+
+ public Crc32(UInt32 polynomial,
+ UInt32 seed)
+ {
+ table = InitializeTable(polynomial);
+ this.seed = seed;
+ Initialize();
+ }
+
+ public override int HashSize
+ {
+ get { return 32; }
+ }
+
+ public override void Initialize()
+ {
+ hash = seed;
+ }
+
+ protected override void HashCore(byte[] buffer,
+ int start,
+ int length)
+ {
+ hash = CalculateHash(table, hash, buffer, start, length);
+ }
+
+ protected override byte[] HashFinal()
+ {
+ var hashBuffer = UInt32ToBigEndianBytes(~hash);
+ HashValue = hashBuffer;
+ return hashBuffer;
+ }
+
+ public static UInt32 Compute(byte[] buffer)
+ {
+ return ~CalculateHash(InitializeTable(DefaultPolynomial), DefaultSeed, buffer, 0, buffer.Length);
+ }
+
+ public static UInt32 Compute(UInt32 seed,
+ byte[] buffer)
+ {
+ return ~CalculateHash(InitializeTable(DefaultPolynomial), seed, buffer, 0, buffer.Length);
+ }
+
+ public static UInt32 Compute(UInt32 polynomial,
+ UInt32 seed,
+ byte[] buffer)
+ {
+ return ~CalculateHash(InitializeTable(polynomial), seed, buffer, 0, buffer.Length);
+ }
+
+ private static UInt32[] InitializeTable(UInt32 polynomial)
+ {
+ if (polynomial == DefaultPolynomial && defaultTable != null)
+ return defaultTable;
+
+ var createTable = new UInt32[256];
+ for (var i = 0; i < 256; i++)
+ {
+ var entry = (UInt32) i;
+ for (var j = 0; j < 8; j++)
+ if ((entry & 1) == 1)
+ entry = (entry >> 1) ^ polynomial;
+ else
+ entry = entry >> 1;
+ createTable[i] = entry;
+ }
+
+ if (polynomial == DefaultPolynomial)
+ defaultTable = createTable;
+
+ return createTable;
+ }
+
+ private static UInt32 CalculateHash(UInt32[] table,
+ UInt32 seed,
+ byte[] buffer,
+ int start,
+ int size)
+ {
+ var crc = seed;
+ for (var i = start; i < size; i++)
+ unchecked
+ {
+ crc = (crc >> 8) ^ table[buffer[i] ^ crc & 0xff];
+ }
+ return crc;
+ }
+
+ private static byte[] UInt32ToBigEndianBytes(UInt32 x)
+ {
+ return new byte[]
+ {
+ (byte) ((x >> 24) & 0xff),
+ (byte) ((x >> 16) & 0xff),
+ (byte) ((x >> 8) & 0xff),
+ (byte) (x & 0xff)
+ };
+ }
+ }
+}
115 Rhino.DistributedHashTable.ClusterTests/ClusterTests.cs
View
@@ -2,6 +2,7 @@
using System.Linq;
using System.Threading;
using Rhino.DistributedHashTable.Client;
+using Rhino.DistributedHashTable.Exceptions;
using Rhino.DistributedHashTable.Hosting;
using Rhino.DistributedHashTable.Internal;
using Rhino.DistributedHashTable.Parameters;
@@ -57,33 +58,39 @@ public void AfterBothNodesJoinedWillAutomaticallyReplicateToBackupNode()
topology = masterProxy.GetTopology();
var segment = topology.Segments.First(x => x.AssignedEndpoint == storageHostA.Endpoint).Index;
- using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
- {
- nodeA.Put(topology.Version, new ExtendedPutRequest
- {
- Bytes = new byte[] { 2, 2, 0, 0 },
- Key = "abc",
- Segment = segment
- });
- }
-
- using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint))
+ RepeatWhileThereAreTopologyChangedErrors(() =>
{
- topology = masterProxy.GetTopology();
- Value[][] values = null;
- for (var i = 0; i < 100; i++)
+ using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
{
- values = nodeB.Get(topology.Version, new ExtendedGetRequest
+ nodeA.Put(topology.Version, new ExtendedPutRequest
{
+ Bytes = new byte[] { 2, 2, 0, 0 },
Key = "abc",
Segment = segment
});
- if (values[0].Length != 0)
- break;
- Thread.Sleep(250);
}
- Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data);
- }
+ });
+
+ RepeatWhileThereAreTopologyChangedErrors(() =>
+ {
+ using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint))
+ {
+ topology = masterProxy.GetTopology();
+ Value[][] values = null;
+ for (var i = 0; i < 100; i++)
+ {
+ values = nodeB.Get(topology.Version, new ExtendedGetRequest
+ {
+ Key = "abc",
+ Segment = segment
+ });
+ if (values[0].Length != 0)
+ break;
+ Thread.Sleep(250);
+ }
+ Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data);
+ }
+ });
}
[Fact]
@@ -106,35 +113,43 @@ public void CanReadValueFromBackupNodeThatUsedToBeTheSegmentOwner()
Thread.Sleep(500);
}
- topology = masterProxy.GetTopology();
- var segment = topology.Segments.First(x => x.AssignedEndpoint == storageHostA.Endpoint).Index;
- using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
- {
- nodeA.Put(topology.Version, new ExtendedPutRequest
- {
- Bytes = new byte[] { 2, 2, 0, 0 },
- Key = "abc",
- Segment = segment
- });
- }
-
- using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint))
+ int segment = 0;
+
+ RepeatWhileThereAreTopologyChangedErrors(() =>
{
topology = masterProxy.GetTopology();
- Value[][] values = null;
- for (var i = 0; i < 100; i++)
+ segment = topology.Segments.First(x => x.AssignedEndpoint == storageHostA.Endpoint).Index;
+ using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
{
- values = nodeB.Get(topology.Version, new ExtendedGetRequest
+ nodeA.Put(topology.Version, new ExtendedPutRequest
{
+ Bytes = new byte[] {2, 2, 0, 0},
Key = "abc",
Segment = segment
});
- if (values[0].Length != 0)
- break;
- Thread.Sleep(250);
}
- Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data);
- }
+ });
+
+ RepeatWhileThereAreTopologyChangedErrors(() =>
+ {
+ using (var nodeB = new DistributedHashTableStorageClient(storageHostB.Endpoint))
+ {
+ topology = masterProxy.GetTopology();
+ Value[][] values = null;
+ for (var i = 0; i < 100; i++)
+ {
+ values = nodeB.Get(topology.Version, new ExtendedGetRequest
+ {
+ Key = "abc",
+ Segment = segment
+ });
+ if (values[0].Length != 0)
+ break;
+ Thread.Sleep(250);
+ }
+ Assert.Equal(new byte[] { 2, 2, 0, 0 }, values[0][0].Data);
+ }
+ });
using (var nodeA = new DistributedHashTableStorageClient(storageHostA.Endpoint))
{
@@ -237,5 +252,23 @@ public void WillReplicateValuesToSecondJoin()
}
}
}
+
+ // we have to do this ugliness because the cluster is in a state of flux right now
+ // with segments moving & topology changes
+ public static void RepeatWhileThereAreTopologyChangedErrors(Action action)
+ {
+ while(true)
+ {
+ try
+ {
+ action();
+ break;
+ }
+ catch (TopologyVersionDoesNotMatchException)
+ {
+
+ }
+ }
+ }
}
}
4 Rhino.DistributedHashTable.ClusterTests/MasterOverTheNetwork.cs
View
@@ -30,7 +30,7 @@ public void CanGetTopologyWhenThereAreNoNodes()
Assert.NotNull(topology);
Assert.NotEqual(0, topology.Version);
Assert.NotEqual(DateTime.MinValue, topology.Timestamp);
- Assert.Equal(8192, topology.Segments.Length);
+ Assert.Equal(Constants.NumberOfSegments, topology.Segments.Length);
Assert.True(topology.Segments.All(x => x.AssignedEndpoint == null));
}
@@ -43,7 +43,7 @@ public void CanJoinToMaster()
Sync = new Uri("rhino.dht://localhost:2201")
};
var segments = masterProxy.Join(endpoint);
- Assert.Equal(8192, segments.Length);
+ Assert.Equal(Constants.NumberOfSegments, segments.Length);
Assert.True(segments.All(x => x.AssignedEndpoint.Equals(endpoint)));
}
4 Rhino.DistributedHashTable.IntegrationTests/OnlineRangeReplicationCommandTest.cs
View
@@ -27,12 +27,14 @@ public OnlineSegmentReplicationCommandTest()
storage = MockRepository.GenerateStub<IDistributedHashTableStorage>();
node.Storage = storage;
node.Stub(x => x.GetTopologyVersion()).Return(topologyVersion);
+ var factory = MockRepository.GenerateStub<IDistributedHashTableNodeReplicationFactory>();
+ factory.Stub(x => x.Create(null)).IgnoreArguments().Return(replication);
command = new OnlineSegmentReplicationCommand(
endpoint,
new[] { new Segment { Index = 0 }, new Segment { Index = 1 }, },
ReplicationType.Ownership,
node,
- replication);
+ factory);
}
[Fact]
4 Rhino.DistributedHashTable.Tests/MasterJoinBehavior.cs
View
@@ -57,7 +57,7 @@ public void SegmentAssignmentsWillNotChange()
[Fact]
public void WillNotChangeTotalNumberOfSegments()
{
- Assert.Equal(8192, master.Segments.Count());
+ Assert.Equal(Constants.NumberOfSegments, master.Segments.Count());
}
[Fact]
@@ -119,7 +119,7 @@ public NewEndpointJoiningMasterWithTwoNodes()
[Fact]
public void WillNotChangeTotalNumberOfSegments()
{
- Assert.Equal(8192, master.Segments.Count());
+ Assert.Equal(Constants.NumberOfSegments, master.Segments.Count());
}
[Fact]
6 Rhino.DistributedHashTable.sln
View
@@ -17,6 +17,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.Queues.Tests", "..\qu
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.DistributedHashTable.ClusterTests", "Rhino.DistributedHashTable.ClusterTests\Rhino.DistributedHashTable.ClusterTests.csproj", "{D9377684-AC14-47F6-A3E8-E84AD0FC2BAA}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rhino.DistributedHashTable.Client", "Rhino.DistributedHashTable.Client\Rhino.DistributedHashTable.Client.csproj", "{D910183F-1578-43AE-BCD2-F5A9E19079FC}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -55,6 +57,10 @@ Global
{D9377684-AC14-47F6-A3E8-E84AD0FC2BAA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D9377684-AC14-47F6-A3E8-E84AD0FC2BAA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D9377684-AC14-47F6-A3E8-E84AD0FC2BAA}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D910183F-1578-43AE-BCD2-F5A9E19079FC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D910183F-1578-43AE-BCD2-F5A9E19079FC}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D910183F-1578-43AE-BCD2-F5A9E19079FC}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D910183F-1578-43AE-BCD2-F5A9E19079FC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
4 Rhino.DistributedHashTable/Client/DistributedHashTableStorageClient.cs
View
@@ -26,7 +26,7 @@ public class DistributedHashTableStorageClient :
IDistributedHashTableRemoteNode
{
private readonly NodeEndpoint endpoint;
- private readonly TcpClient client;
+ protected readonly TcpClient client;
private readonly NetworkStream stream;
private readonly MessageStreamWriter<StorageMessageUnion> writer;
@@ -43,7 +43,7 @@ public NodeEndpoint Endpoint
get { return endpoint; }
}
- public void Dispose()
+ public virtual void Dispose()
{
stream.Dispose();
client.Close();
27 Rhino.DistributedHashTable/Commands/OnlineRangeReplicationCommand.cs
View
@@ -13,9 +13,10 @@ public class OnlineSegmentReplicationCommand : ICommand
private readonly ILog log = LogManager.GetLogger(typeof(OnlineSegmentReplicationCommand));
private readonly IDistributedHashTableNode node;
- private readonly IDistributedHashTableNodeReplication otherNode;
- private readonly string endpoint;
+ private readonly IDistributedHashTableNodeReplicationFactory factory;
+ private IDistributedHashTableNodeReplication otherNode;
private readonly Segment[] segments;
+ private readonly NodeEndpoint endpoint;
private readonly ReplicationType type;
private bool continueWorking = true;
@@ -26,13 +27,13 @@ public class OnlineSegmentReplicationCommand : ICommand
Segment[] segments,
ReplicationType type,
IDistributedHashTableNode node,
- IDistributedHashTableNodeReplication otherNode)
+ IDistributedHashTableNodeReplicationFactory factory)
{
- this.endpoint = endpoint.Sync.ToString();
+ this.endpoint = endpoint;
this.segments = segments;
this.type = type;
this.node = node;
- this.otherNode = otherNode;
+ this.factory = factory;
}
public void AbortExecution()
@@ -42,13 +43,14 @@ public void AbortExecution()
public bool Execute()
{
- log.DebugFormat("Replication from {0} of {1} segments for {2}", endpoint, segments.Length, type);
+ log.DebugFormat("Replication from {0} of {1} segments for {2}", endpoint.Sync, segments.Length, type);
var processedSegments = new List<int>();
if (continueWorking == false)
return false;
try
{
+ otherNode = factory.Create(endpoint);
var segmentsToLoad = AssignAllEmptySegmentsFromEndpoint(processedSegments);
if (continueWorking == false)
return false;
@@ -56,7 +58,7 @@ public bool Execute()
}
catch (Exception e)
{
- log.Warn("Could not replicate segments", e);
+ log.Warn("Could not replicate segments for " + type, e);
return false;
}
finally
@@ -70,8 +72,9 @@ public bool Execute()
{
if (log.IsWarnEnabled)
{
- log.WarnFormat("Giving up replicating the following segments: [{0}]",
- string.Join(", ", array.Select(x => x.ToString()).ToArray()));
+ log.WarnFormat("Giving up replicating for {0} the {0} segments",
+ type,
+ array.Length);
}
node.GivingUpOn(type, array);
}
@@ -123,12 +126,12 @@ private void ReplicateSegment(Segment segment)
{
log.DebugFormat("Starting replication of segment [{0}] from {1}",
segment,
- endpoint);
+ endpoint.Sync);
var result = otherNode.ReplicateNextPage(node.Endpoint, type, segment.Index);
log.DebugFormat("Replication of segment [{0}] from {1} got {2} puts & {3} removals",
segment,
- endpoint,
+ endpoint.Sync,
result.PutRequests.Length,
result.RemoveRequests.Length);
@@ -155,7 +158,7 @@ private List<Segment> AssignAllEmptySegmentsFromEndpoint(List<int> processedSegm
processedSegments.AddRange(assignedSegments);
node.DoneReplicatingSegments(type, assignedSegments);
- log.DebugFormat("{0} empty segments assigned from {1}", assignedSegments.Length, endpoint);
+ log.DebugFormat("{0} empty segments assigned from {1}", assignedSegments.Length, endpoint.Sync);
remainingSegments.AddRange(
segments.Where(x => assignedSegments.Contains(x.Index) == false)
);
9 Rhino.DistributedHashTable/Hosting/DistributedHashTableStorageHost.cs
View
@@ -24,6 +24,7 @@ public class DistributedHashTableStorageHost : IDisposable
private readonly IDistributedHashTableNode node;
private readonly QueueManager queueManager;
private readonly IDistributedHashTableStorage storage;
+ private IDistributedHashTableNodeReplication replication;
public DistributedHashTableStorageHost(Uri master)
: this(master, "node", 2201)
@@ -50,7 +51,9 @@ public DistributedHashTableStorageHost(Uri master)
queueManager,
new NonPooledDistributedHashTableNodeFactory()
);
- storage = new DistributedHashTableStorage(name + ".data.esent", node);
+ var dhtStorage = new DistributedHashTableStorage(name + ".data.esent", node);
+ replication = dhtStorage.Replication;
+ storage = dhtStorage;
listener = new TcpListener(
Socket.OSSupportsIPv6 ? IPAddress.IPv6Any : IPAddress.Any,
@@ -180,7 +183,7 @@ private void HandleTopologyUpdate(MessageStreamWriter<StorageMessageUnion> write
private void HandleReplicateNextPage(StorageMessageUnion wrapper,
MessageStreamWriter<StorageMessageUnion> writer)
{
- var replicationResult = storage.Replication.ReplicateNextPage(
+ var replicationResult = replication.ReplicateNextPage(
wrapper.ReplicateNextPageRequest.ReplicationEndpoint.GetNodeEndpoint(),
wrapper.ReplicateNextPageRequest.Type == ReplicationType.Backup ? Internal.ReplicationType.Backup : Internal.ReplicationType.Ownership,
wrapper.ReplicateNextPageRequest.Segment
@@ -206,7 +209,7 @@ private void HandleTopologyUpdate(MessageStreamWriter<StorageMessageUnion> write
private void HandleAssignEmpty(StorageMessageUnion wrapper,
MessageStreamWriter<StorageMessageUnion> writer)
{
- var segments = storage.Replication.AssignAllEmptySegments(
+ var segments = replication.AssignAllEmptySegments(
wrapper.AssignAllEmptySegmentsRequest.ReplicationEndpoint.GetNodeEndpoint(),
wrapper.AssignAllEmptySegmentsRequest.Type == ReplicationType.Backup ? Internal.ReplicationType.Backup : Internal.ReplicationType.Ownership,
wrapper.AssignAllEmptySegmentsRequest.SegmentsList.ToArray()
1  Rhino.DistributedHashTable/Internal/Constants.cs
View
@@ -2,6 +2,7 @@ namespace Rhino.DistributedHashTable.Internal
{
public static class Constants
{
+ public const int NumberOfSegments = 8192;
public const string RhinoDhtStartToken = "@rdht://";
public const string MovedSegment = RhinoDhtStartToken + "Segment/Moved/";
}
2  Rhino.DistributedHashTable/Internal/DistributedHashTableMaster.cs
View
@@ -163,7 +163,7 @@ public Topology GetTopology()
private static IEnumerable<Segment> CreateDefaultSegments()
{
- for (var i = 0; i < 8192; i++)
+ for (var i = 0; i < Constants.NumberOfSegments; i++)
{
var segment = new Segment
{
6 Rhino.DistributedHashTable/Internal/DistributedHashTableNode.cs
View
@@ -183,7 +183,7 @@ public void Start()
segmentToReplicate.ToArray(),
ReplicationType.Ownership,
this,
- replicationFactory.Create(segmentToReplicate.Key))
+ replicationFactory)
);
}
@@ -214,7 +214,7 @@ private void StartPendingBackupsForCurrentNode(Topology topology)
segmentToReplicate.ToArray(),
ReplicationType.Backup,
this,
- replicationFactory.Create(segmentToReplicate.Key));
+ replicationFactory);
Interlocked.Increment(ref currentlyReplicatingBackups);
@@ -257,7 +257,7 @@ private void RemoveMoveMarkerForSegmentsThatWeAReNoLongerResponsibleFor(Topology
IsLocal = true
};
var movedMarkers = Storage.Get(GetTopologyVersion(),
- segmentsThatWereMovedFromNode.ToArray());
+ segmentsThatWereMovedFromNode.ToArray());
var requests = movedMarkers
.Where(x => x.Length == 1)
.Select(values => values[0])
16 Rhino.DistributedHashTable/Internal/DistributedHashTableStorage.cs
View
@@ -125,13 +125,21 @@ public PutResult[] Put(int topologyVersion, params ExtendedPutRequest[] valuesTo
}
}
- private void AssertMatchingTopologyVersion(int topologyVersion)
+ private void AssertMatchingTopologyVersion(int topologyVersionFromClient)
{
- if(TopologyVersion != topologyVersion)
+ //client thinks that the version is newer
+ if(topologyVersionFromClient > TopologyVersion)
+ {
+ log.InfoFormat("Got request for topology {0} but current local version is {1}, forcing topology update, request will still fail",
+ topologyVersionFromClient,
+ TopologyVersion);
+ distributedHashTableNode.UpdateTopology();
+ }
+ if(TopologyVersion != topologyVersionFromClient)
{
log.InfoFormat("Got request for topology {0} but current local version is {1}",
- TopologyVersion,
- topologyVersion);
+ topologyVersionFromClient,
+ TopologyVersion);
throw new TopologyVersionDoesNotMatchException("Topology Version doesn't match, you need to refresh the topology from the master");
}
}
3  Rhino.DistributedHashTable/Internal/IDistributedHashTableStorage.cs
View
@@ -1,6 +1,5 @@
using System;
using Rhino.DistributedHashTable.Parameters;
-using Rhino.DistributedHashTable.Remote;
using Rhino.PersistentHashTable;
namespace Rhino.DistributedHashTable.Internal
@@ -12,7 +11,5 @@ public interface IDistributedHashTableStorage : IDisposable
bool[] Remove(int topologyVersion, params ExtendedRemoveRequest[] valuesToRemove);
Value[][] Get(int topologyVersion, params ExtendedGetRequest[] valuesToGet);
-
- IDistributedHashTableNodeReplication Replication { get; }
}
}
1  Rhino.DistributedHashTable/Rhino.DistributedHashTable.csproj
View
@@ -97,7 +97,6 @@
<Compile Include="Parameters\IExtendedRequest.cs" />
<Compile Include="Util\PrtoBufConverter.cs" />
<Compile Include="Util\EnumerableExtensions.cs" />
- <Compile Include="IDistributedHashTable.cs" />
<Compile Include="Internal\IDistributedHashTableMaster.cs" />
<Compile Include="Internal\NodeState.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
Please sign in to comment.
Something went wrong with that request. Please try again.