Permalink
Browse files

Adding master & storage hosts

  • Loading branch information...
1 parent 21188d3 commit 9811f245808eb9ec9bfa08d22b39d6fb9c69f9eb ayenderahien committed Jun 4, 2009
@@ -1,49 +1,38 @@
-//using System;
-//using System.Linq;
-//using System.ServiceModel;
-//using Rhino.DistributedHashTable.Hosting;
-//using Rhino.DistributedHashTable.Internal;
-//using Xunit;
+using System;
+using System.Linq;
+using System.ServiceModel;
+using Rhino.DistributedHashTable.Hosting;
+using Rhino.DistributedHashTable.Internal;
+using Xunit;
-//namespace Rhino.DistributedHashTable.IntegrationTests
-//{
-// public class WhenNodeIsStarted : FullIntegrationTest, IDisposable
-// {
-// private readonly DistributedHashTableMasterHost masterHost;
-// private readonly DistributedHashTableStorageHost storageHost;
-// private readonly Uri masterUri = new Uri("net.tcp://" + Environment.MachineName + ":2200/master");
+namespace Rhino.DistributedHashTable.IntegrationTests
+{
+ public class WhenNodeIsStarted : FullIntegrationTest, IDisposable
+ {
+ private readonly DistributedHashTableMasterHost masterHost;
+ private readonly DistributedHashTableStorageHost storageHost;
+ private readonly Uri masterUri = new Uri("net.tcp://" + Environment.MachineName + ":2200/master");
-// public WhenNodeIsStarted()
-// {
-// masterHost = new DistributedHashTableMasterHost();
-// storageHost = new DistributedHashTableStorageHost(
-// masterUri);
+ public WhenNodeIsStarted()
+ {
+ masterHost = new DistributedHashTableMasterHost();
+ storageHost = new DistributedHashTableStorageHost(
+ masterUri);
-// masterHost.Start();
-// storageHost.Start();
-// }
+ masterHost.Start();
+ storageHost.Start();
+ }
-// [Fact]
-// public void WillJoinMaster()
-// {
-// var channel = ChannelFactory<IDistributedHashTableMaster>.CreateChannel(
-// Binding.DhtDefault,
-// new EndpointAddress(masterUri));
-// try
-// {
-// var topology = channel.GetTopology();
-// Assert.True(topology.Segments.All(x => x.AssignedEndpoint == storageHost.Endpoint));
-// }
-// finally
-// {
-// ((IClientChannel)channel).Dispose();
-// }
-// }
+ [Fact]
+ public void WillJoinMaster()
+ {
+ Assert.True(false);
+ }
-// public void Dispose()
-// {
-// masterHost.Dispose();
-// }
-// }
-//}
+ public void Dispose()
+ {
+ masterHost.Dispose();
+ }
+ }
+}
@@ -1,15 +1,19 @@
using System;
+using System.Linq;
using System.Net.Sockets;
using Google.ProtocolBuffers;
+using log4net;
using Rhino.DistributedHashTable.Internal;
using Rhino.DistributedHashTable.Protocol;
-using NodeEndpoint=Rhino.DistributedHashTable.Internal.NodeEndpoint;
-using Segment=Rhino.DistributedHashTable.Protocol.Segment;
+using NodeEndpoint = Rhino.DistributedHashTable.Internal.NodeEndpoint;
+using Segment = Rhino.DistributedHashTable.Protocol.Segment;
namespace Rhino.DistributedHashTable.Hosting
{
public class DistributedHashTableMasterHost : IDisposable
{
+ private readonly ILog log = LogManager.GetLogger(typeof(DistributedHashTableStorageHost));
+
private readonly TcpListener listener;
private readonly DistributedHashTableMaster master = new DistributedHashTableMaster();
@@ -19,7 +23,7 @@ public DistributedHashTableMasterHost()
}
public DistributedHashTableMasterHost(string name,
- int port)
+ int port)
{
}
@@ -56,73 +60,90 @@ private void OnAcceptTcpClient(IAsyncResult result)
using (client)
using (var stream = client.GetStream())
{
- var writer = new MessageStreamWriter<MessageWrapper>(stream);
- foreach (var wrapper in MessageStreamIterator<MessageWrapper>.FromStreamProvider(() => stream))
+ var writer = new MessageStreamWriter<MasterMessageUnion>(stream);
+ try
{
- switch (wrapper.Type)
+ foreach (var wrapper in MessageStreamIterator<MasterMessageUnion>.FromStreamProvider(() => stream))
{
- case MessageType.GetTopologyRequest:
- HandleGetToplogy(stream, writer);
- break;
- case MessageType.JoinRequest:
- HandleJoin(wrapper, writer);
- break;
- default:
- throw new ArgumentOutOfRangeException();
+ log.DebugFormat("Accepting message from {0} - {1}",
+ client.Client.RemoteEndPoint,
+ wrapper.Type);
+ switch (wrapper.Type)
+ {
+ case MasterMessageType.GetTopologyRequest:
+ HandleGetToplogy(writer);
+ break;
+ case MasterMessageType.JoinRequest:
+ HandleJoin(wrapper, writer);
+ break;
+ default:
+ throw new ArgumentOutOfRangeException();
+ }
}
+ writer.Flush();
+ stream.Flush();
+ }
+ catch (Exception e)
+ {
+ log.Warn("Error performing request",e );
+ writer.Write(new MasterMessageUnion.Builder
+ {
+ Type = MasterMessageType.MasterErrorResult,
+ Exception = new Error.Builder
+ {
+ Message = e.ToString()
+ }.Build()
+ }.Build());
}
- writer.Flush();
- stream.Flush();
}
}
+ catch (Exception e)
+ {
+ log.Warn("Error when processing request to master, error reporting failed as well!", e);
+ }
finally
{
listener.BeginAcceptTcpClient(OnAcceptTcpClient, null);
}
}
- private void HandleJoin(MessageWrapper wrapper,
- MessageStreamWriter<MessageWrapper> writer)
+ private void HandleJoin(MasterMessageUnion wrapper,
+ MessageStreamWriter<MasterMessageUnion> writer)
{
var endpoint = wrapper.JoinRequest.EndpointJoining;
var segments = master.Join(new NodeEndpoint
{
Async = new Uri(endpoint.Async),
Sync = new Uri(endpoint.Sync)
});
- var joinResponse = new JoinResponseMessage.Builder();
- foreach (var segment in segments)
+ var joinResponse = new JoinResponseMessage.Builder
{
- joinResponse.SegmentsList.Add(ConverToProtocolSegment(segment));
- }
- writer.Write(new MessageWrapper.Builder
+ SegmentsList = {segments.Select(x => ConvertToProtocolSegment(x))}
+ };
+ writer.Write(new MasterMessageUnion.Builder
{
- Type = MessageType.JoinResult,
+ Type = MasterMessageType.JoinResult,
JoinResponse = joinResponse.Build()
}.Build());
}
- private void HandleGetToplogy(NetworkStream stream,
- MessageStreamWriter<MessageWrapper> writer)
+ private void HandleGetToplogy(MessageStreamWriter<MasterMessageUnion> writer)
{
var topology = master.GetTopology();
var topologyResultMessage = new TopologyResultMessage.Builder
{
Version = ByteString.CopyFrom(topology.Version.ToByteArray()),
TimestampAsDouble = topology.Timestamp.ToOADate(),
+ SegmentsList = { topology.Segments.Select(x => ConvertToProtocolSegment(x)) }
};
- foreach (var segment in topology.Segments)
- {
- topologyResultMessage.SegmentsList.Add(ConverToProtocolSegment(segment));
- }
- writer.Write(new MessageWrapper.Builder
+ writer.Write(new MasterMessageUnion.Builder
{
- Type = MessageType.GetTopologyResult,
+ Type = MasterMessageType.GetTopologyResult,
Topology = topologyResultMessage.Build()
}.Build());
}
- private static Segment ConverToProtocolSegment(Internal.Segment segment)
+ private static Segment ConvertToProtocolSegment(Internal.Segment segment)
{
return new Segment.Builder
{
@@ -133,14 +154,14 @@ private static Segment ConverToProtocolSegment(Internal.Segment segment)
Sync = segment.AssignedEndpoint.Sync.ToString()
}.Build(),
InProcessOfMovingToEndpoint = segment.InProcessOfMovingToEndpoint == null
- ?
- Protocol.NodeEndpoint.DefaultInstance
- :
- new Protocol.NodeEndpoint.Builder
- {
- Async = segment.InProcessOfMovingToEndpoint.Async.ToString(),
- Sync = segment.InProcessOfMovingToEndpoint.Sync.ToString(),
- }.Build(),
+ ?
+ Protocol.NodeEndpoint.DefaultInstance
+ :
+ new Protocol.NodeEndpoint.Builder
+ {
+ Async = segment.InProcessOfMovingToEndpoint.Async.ToString(),
+ Sync = segment.InProcessOfMovingToEndpoint.Sync.ToString(),
+ }.Build(),
Version = ByteString.CopyFrom(segment.Version.ToByteArray()),
}.Build();
}
Oops, something went wrong. Retry.

0 comments on commit 9811f24

Please sign in to comment.