Permalink
Browse files

More work on getting the master client working - adding more operations

  • Loading branch information...
1 parent 9811f24 commit 8c80a5b49272d4728b996bc1a8c03e839eeef382 ayenderahien committed Jun 5, 2009
@@ -0,0 +1,82 @@
+using System;
+using Rhino.DistributedHashTable.Client;
+using Rhino.DistributedHashTable.Hosting;
+using Rhino.DistributedHashTable.Internal;
+using Xunit;
+using System.Linq;
+
+namespace Rhino.DistributedHashTable.IntegrationTests
+{
+ public class MasterOverTheNetwork
+ {
+ public class CanCommunicateWithMasterUsingClientProxy : FullIntegrationTest, IDisposable
+ {
+ private readonly DistributedHashTableMasterHost masterHost;
+ private readonly DistributedHashTableMasterClient masterProxy;
+
+ public CanCommunicateWithMasterUsingClientProxy()
+ {
+ masterHost = new DistributedHashTableMasterHost();
+
+ masterHost.Start();
+
+ masterProxy = new DistributedHashTableMasterClient(new Uri("rhino.dht://localhost:2200"));
+ }
+
+ [Fact]
+ public void CanGetTopologyWhenThereAreNoNodes()
+ {
+ var topology = masterProxy.GetTopology();
+ Assert.NotNull(topology);
+ Assert.NotEqual(Guid.Empty, topology.Version);
+ Assert.NotEqual(DateTime.MinValue, topology.Timestamp);
+ Assert.Equal(8192, topology.Segments.Length);
+ Assert.True(topology.Segments.All(x => x.AssignedEndpoint == null));
+ }
+
+ [Fact]
+ public void CanJoinToMaster()
+ {
+ var endpoint = new NodeEndpoint
+ {
+ Async = new Uri("rhino.queues://localhost:2202"),
+ Sync = new Uri("rhino.dht://localhost:2201")
+ };
+ var segments = masterProxy.Join(endpoint);
+ Assert.Equal(8192, segments.Length);
+ Assert.True(segments.All(x => x.AssignedEndpoint.Equals(endpoint)));
+ }
+
+ [Fact]
+ public void CanCatchUpOnSegment()
+ {
+ var endpoint = new NodeEndpoint
+ {
+ Async = new Uri("rhino.queues://localhost:2202"),
+ Sync = new Uri("rhino.dht://localhost:2201")
+ };
+ masterProxy.Join(new NodeEndpoint
+ {
+ Async = new Uri("rhino.queues://other:2202"),
+ Sync = new Uri("rhino.dht://other:2201")
+ });
+ var segments = masterProxy.Join(endpoint);
+
+ masterProxy.CaughtUp(endpoint, segments[0].Index, segments[1].Index);
+
+ var topology = masterProxy.GetTopology();
+ Assert.Equal(endpoint, topology.Segments[segments[0].Index].AssignedEndpoint);
+ Assert.Equal(endpoint, topology.Segments[segments[1].Index].AssignedEndpoint);
+
+ Assert.NotEqual(endpoint, topology.Segments[segments[2].Index].AssignedEndpoint);
+ Assert.Equal(endpoint, topology.Segments[segments[2].Index].InProcessOfMovingToEndpoint);
+ }
+
+ public void Dispose()
+ {
+ masterHost.Dispose();
+ }
+ }
+
+ }
+}
@@ -91,7 +91,7 @@ public class WhenThereAreKeysInTable : EsentTestBase, IDisposable
private readonly IDistributedHashTableNode node;
private readonly Guid guid;
private readonly IDistributedHashTableNodeReplication replication;
- private PutResult putResult;
+ private readonly PutResult putResult;
public WhenThereAreKeysInTable()
{
@@ -9,7 +9,7 @@
namespace Rhino.DistributedHashTable.IntegrationTests.Mini
{
- public class OnlineSegmentReplicationCommandTest
+ public class OnlineSegmentReplicationCommandTest : EsentTestBase
{
private readonly OnlineSegmentReplicationCommand command;
private readonly IDistributedHashTableNode node;
@@ -57,6 +57,7 @@
</ItemGroup>
<ItemGroup>
<Compile Include="FullIntegrationTest.cs" />
+ <Compile Include="MasterOverTheNetwork.cs" />
<Compile Include="Mini\DistributedHashTableReplicationTest.cs" />
<Compile Include="Mini\DistributedHashTableStorageTest.cs" />
<Compile Include="Mini\EsentTestBase.cs" />
@@ -1,8 +1,5 @@
using System;
-using System.Linq;
-using System.ServiceModel;
using Rhino.DistributedHashTable.Hosting;
-using Rhino.DistributedHashTable.Internal;
using Xunit;
namespace Rhino.DistributedHashTable.IntegrationTests
@@ -0,0 +1,171 @@
+using System;
+using System.IO;
+using System.Linq;
+using System.Net.Sockets;
+using System.Threading;
+using Google.ProtocolBuffers;
+using Rhino.DistributedHashTable.Exceptions;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Protocol;
+using NodeEndpoint = Rhino.DistributedHashTable.Internal.NodeEndpoint;
+using Segment = Rhino.DistributedHashTable.Internal.Segment;
+using Rhino.DistributedHashTable.Util;
+
+namespace Rhino.DistributedHashTable.Client
+{
+ public class DistributedHashTableMasterClient : IDistributedHashTableMaster
+ {
+ private readonly Uri uri;
+
+ public DistributedHashTableMasterClient(Uri uri)
+ {
+ this.uri = uri;
+ }
+
+ private T Execute<T>(Func<MessageStreamWriter<MasterMessageUnion>, MessageStreamIterator<MasterMessageUnion>, Stream, T> func)
+ {
+ using (var client = new TcpClient(uri.Host, uri.Port))
+ using (var stream = client.GetStream())
+ {
+ var writer = new MessageStreamWriter<MasterMessageUnion>(stream);
+ var reader = MessageStreamIterator<MasterMessageUnion>.FromStreamProvider(() => stream);
+ return func(writer, reader, stream);
+ }
+ }
+
+ private void Execute(Action<MessageStreamWriter<MasterMessageUnion>, MessageStreamIterator<MasterMessageUnion>, Stream> func)
+ {
+ Execute<object>((writer,
+ iterator,
+ stream) =>
+ {
+ func(writer, iterator, stream);
+ return null;
+ });
+ }
+
+ public Segment[] Join(NodeEndpoint endPoint)
+ {
+ return Execute((writer, iterator, stream) =>
+ {
+ writer.Write(new MasterMessageUnion.Builder
+ {
+ Type = MasterMessageType.JoinRequest,
+ JoinRequest = new JoinRequestMessage.Builder
+ {
+ EndpointJoining = new Protocol.NodeEndpoint.Builder
+ {
+ Async = endPoint.Async.ToString(),
+ Sync = endPoint.Sync.ToString()
+ }.Build()
+ }.Build()
+ }.Build());
+ writer.Flush();
+ stream.Flush();
+
+ var union = iterator.First();
+ if (union.Type == MasterMessageType.MasterErrorResult)
+ throw new RemoteNodeException(union.Exception.Message);
+ if (union.Type != MasterMessageType.JoinResult)
+ throw new UnexpectedReplyException("Got reply " + union.Type + " but expected JoinResult");
+
+ var response = union.JoinResponse;
+
+ return response.SegmentsList.Select(x => ConvertSegment(x)).ToArray();
+ });
+ }
+
+ public void CaughtUp(NodeEndpoint endPoint,
+ params int[] caughtUpSegments)
+ {
+ Execute((writer,
+ iterator,
+ stream) =>
+ {
+ writer.Write(new MasterMessageUnion.Builder
+ {
+ Type = MasterMessageType.CaughtUpRequest,
+ CaughtUp = new CaughtUpRequestMessage.Builder
+ {
+ CaughtUpSegmentsList = { caughtUpSegments },
+ Endpoint = new Protocol.NodeEndpoint.Builder
+ {
+ Async = endPoint.Async.ToString(),
+ Sync = endPoint.Sync.ToString()
+ }.Build()
+ }.Build()
+ }.Build());
+ writer.Flush();
+ stream.Flush();
+
+ Thread.Sleep(15000);
+
+ var union = iterator.First();
+ if (union.Type == MasterMessageType.MasterErrorResult)
+ throw new RemoteNodeException(union.Exception.Message);
+ if (union.Type != MasterMessageType.CaughtUpResponse)
+ throw new UnexpectedReplyException("Got reply " + union.Type + " but expected CaughtUpResponse");
+ });
+ }
+
+ public Topology GetTopology()
+ {
+ return Execute((writer, iterator, stream) =>
+ {
+ writer.Write(new MasterMessageUnion.Builder
+ {
+ Type = MasterMessageType.GetTopologyRequest,
+ }.Build());
+ writer.Flush();
+ stream.Flush();
+
+ var union = iterator.First();
+ if (union.Type == MasterMessageType.MasterErrorResult)
+ throw new RemoteNodeException(union.Exception.Message);
+ if (union.Type != MasterMessageType.GetTopologyResult)
+ throw new UnexpectedReplyException("Got reply " + union.Type + " but expected GetTopologyResult");
+
+ var topology = union.Topology;
+ var segments = topology.SegmentsList.Select(x => ConvertSegment(x));
+ return new Topology(segments.ToArray(), new Guid(topology.Version.ToByteArray()))
+ {
+ Timestamp = DateTime.FromOADate(topology.TimestampAsDouble)
+ };
+ });
+ }
+
+ private static Segment ConvertSegment(Protocol.Segment x)
+ {
+ return new Segment
+ {
+ Version = new Guid(x.Version.ToByteArray()),
+ AssignedEndpoint = x.AssignedEndpoint != Protocol.NodeEndpoint.DefaultInstance
+ ? new NodeEndpoint
+ {
+ Async = new Uri(x.AssignedEndpoint.Async),
+ Sync = new Uri(x.AssignedEndpoint.Sync)
+ }
+ : null,
+ InProcessOfMovingToEndpoint = x.InProcessOfMovingToEndpoint != Protocol.NodeEndpoint.DefaultInstance
+ ? new NodeEndpoint
+ {
+ Async = new Uri(x.InProcessOfMovingToEndpoint.Async),
+ Sync = new Uri(x.InProcessOfMovingToEndpoint.Sync)
+ }
+ : null,
+ Index = x.Index,
+ Backups = x.BackupsList.Select(b => new NodeEndpoint
+ {
+ Async = new Uri(b.Async),
+ Sync = new Uri(b.Sync)
+ }).ToSet(),
+ };
+ }
+
+ public void GaveUp(NodeEndpoint endpoint,
+ params int[] rangesGivingUpOn)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
@@ -0,0 +1,35 @@
+using System;
+using System.Runtime.Serialization;
+
+namespace Rhino.DistributedHashTable.Exceptions
+{
+ [Serializable]
+ public class RemoteNodeException : Exception
+ {
+ //
+ // For guidelines regarding the creation of new exception types, see
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp
+ // and
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp
+ //
+
+ public RemoteNodeException()
+ {
+ }
+
+ public RemoteNodeException(string message) : base(message)
+ {
+ }
+
+ public RemoteNodeException(string message,
+ Exception inner) : base(message, inner)
+ {
+ }
+
+ protected RemoteNodeException(
+ SerializationInfo info,
+ StreamingContext context) : base(info, context)
+ {
+ }
+ }
+}
@@ -0,0 +1,35 @@
+using System;
+using System.Runtime.Serialization;
+
+namespace Rhino.DistributedHashTable.Exceptions
+{
+ [Serializable]
+ public class UnexpectedReplyException : Exception
+ {
+ //
+ // For guidelines regarding the creation of new exception types, see
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp
+ // and
+ // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp
+ //
+
+ public UnexpectedReplyException()
+ {
+ }
+
+ public UnexpectedReplyException(string message) : base(message)
+ {
+ }
+
+ public UnexpectedReplyException(string message,
+ Exception inner) : base(message, inner)
+ {
+ }
+
+ protected UnexpectedReplyException(
+ SerializationInfo info,
+ StreamingContext context) : base(info, context)
+ {
+ }
+ }
+}
Oops, something went wrong.

0 comments on commit 8c80a5b

Please sign in to comment.