Permalink
Browse files

Adding initial client for storage

  • Loading branch information...
ayenderahien
ayenderahien committed Jun 5, 2009
1 parent 1eaaf9a commit bcb46af8e143e1425a7cdf992968fa98fa23a65c
@@ -22,31 +22,29 @@ public DistributedHashTableMasterClient(Uri uri)
this.uri = uri;
}
- private T Execute<T>(Func<MessageStreamWriter<MasterMessageUnion>, MessageStreamIterator<MasterMessageUnion>, Stream, T> func)
+ private T Execute<T>(Func<MessageStreamWriter<MasterMessageUnion>, Stream, T> func)
{
using (var client = new TcpClient(uri.Host, uri.Port))
using (var stream = client.GetStream())
{
var writer = new MessageStreamWriter<MasterMessageUnion>(stream);
- var reader = MessageStreamIterator<MasterMessageUnion>.FromStreamProvider(() => stream);
- return func(writer, reader, stream);
+ return func(writer, stream);
}
}
- private void Execute(Action<MessageStreamWriter<MasterMessageUnion>, MessageStreamIterator<MasterMessageUnion>, Stream> func)
+ private void Execute(Action<MessageStreamWriter<MasterMessageUnion>, Stream> func)
{
Execute<object>((writer,
- iterator,
stream) =>
{
- func(writer, iterator, stream);
+ func(writer, stream);
return null;
});
}
public Segment[] Join(NodeEndpoint endpoint)
{
- return Execute((writer, iterator, stream) =>
+ return Execute((writer, stream) =>
{
writer.Write(new MasterMessageUnion.Builder
{
@@ -63,23 +61,31 @@ public Segment[] Join(NodeEndpoint endpoint)
writer.Flush();
stream.Flush();
- var union = iterator.First();
- if (union.Type == MasterMessageType.MasterErrorResult)
- throw new RemoteNodeException(union.Exception.Message);
- if (union.Type != MasterMessageType.JoinResult)
- throw new UnexpectedReplyException("Got reply " + union.Type + " but expected JoinResult");
+ var union = ReadReply(MasterMessageType.JoinResult, stream);
var response = union.JoinResponse;
return response.SegmentsList.Select(x => ConvertSegment(x)).ToArray();
});
}
+ private MasterMessageUnion ReadReply(MasterMessageType responses, Stream stream)
+ {
+ var iterator = MessageStreamIterator<MasterMessageUnion>.FromStreamProvider(() => new UndisposableStream(stream));
+ var union = iterator.First();
+
+ if (union.Type == MasterMessageType.MasterErrorResult)
+ throw new RemoteNodeException(union.Exception.Message);
+ if (union.Type != responses)
+ throw new UnexpectedReplyException("Got reply " + union.Type + " but expected " + responses);
+
+ return union;
+ }
+
public void CaughtUp(NodeEndpoint endpoint,
params int[] caughtUpSegments)
{
Execute((writer,
- iterator,
stream) =>
{
writer.Write(new MasterMessageUnion.Builder
@@ -98,17 +104,13 @@ public Segment[] Join(NodeEndpoint endpoint)
writer.Flush();
stream.Flush();
- var union = iterator.First();
- if (union.Type == MasterMessageType.MasterErrorResult)
- throw new RemoteNodeException(union.Exception.Message);
- if (union.Type != MasterMessageType.CaughtUpResponse)
- throw new UnexpectedReplyException("Got reply " + union.Type + " but expected CaughtUpResponse");
+ ReadReply(MasterMessageType.CaughtUpResponse, stream);
});
}
public Topology GetTopology()
{
- return Execute((writer, iterator, stream) =>
+ return Execute((writer, stream) =>
{
writer.Write(new MasterMessageUnion.Builder
{
@@ -117,11 +119,7 @@ public Topology GetTopology()
writer.Flush();
stream.Flush();
- var union = iterator.First();
- if (union.Type == MasterMessageType.MasterErrorResult)
- throw new RemoteNodeException(union.Exception.Message);
- if (union.Type != MasterMessageType.GetTopologyResult)
- throw new UnexpectedReplyException("Got reply " + union.Type + " but expected GetTopologyResult");
+ var union = ReadReply(MasterMessageType.GetTopologyResult, stream);
var topology = union.Topology;
var segments = topology.SegmentsList.Select(x => ConvertSegment(x));
@@ -164,7 +162,6 @@ private static Segment ConvertSegment(Protocol.Segment x)
params int[] rangesGivingUpOn)
{
Execute((writer,
- iterator,
stream) =>
{
writer.Write(new MasterMessageUnion.Builder
@@ -183,11 +180,7 @@ private static Segment ConvertSegment(Protocol.Segment x)
writer.Flush();
stream.Flush();
- var union = iterator.First();
- if (union.Type == MasterMessageType.MasterErrorResult)
- throw new RemoteNodeException(union.Exception.Message);
- if (union.Type != MasterMessageType.GaveUpResponse)
- throw new UnexpectedReplyException("Got reply " + union.Type + " but expected GaveUpResponse");
+ ReadReply(MasterMessageType.GaveUpResponse, stream);
});
}
}
@@ -0,0 +1,196 @@
+using System;
+using System.Net.Sockets;
+using Google.ProtocolBuffers;
+using Rhino.DistributedHashTable.Exceptions;
+using Rhino.DistributedHashTable.Internal;
+using Rhino.DistributedHashTable.Parameters;
+using Rhino.DistributedHashTable.Protocol;
+using Rhino.DistributedHashTable.Remote;
+using Rhino.DistributedHashTable.Util;
+using Rhino.PersistentHashTable;
+using NodeEndpoint = Rhino.DistributedHashTable.Internal.NodeEndpoint;
+using Value = Rhino.PersistentHashTable.Value;
+using System.Linq;
+using ValueVersion=Rhino.DistributedHashTable.Protocol.ValueVersion;
+
+namespace Rhino.DistributedHashTable.Client
+{
+ /// <summary>
+ /// Thread Safety - This is NOT a thread safe connection
+ /// Exception Safety - After an exception is thrown, it should be disposed and not used afterward
+ /// Connection Pooling - It is expected that this will be part of a connection pool
+ /// </summary>
+ public class DistributedHashTableStorageClient : IDistributedHashTableStorage
+ {
+ private readonly NodeEndpoint endpoint;
+ private readonly TcpClient client;
+ private readonly NetworkStream stream;
+ private readonly MessageStreamWriter<StorageMessageUnion> writer;
+
+ public DistributedHashTableStorageClient(NodeEndpoint endpoint)
+ {
+ this.endpoint = endpoint;
+ client = new TcpClient(endpoint.Sync.Host, endpoint.Sync.Port);
+ stream = client.GetStream();
+ writer = new MessageStreamWriter<StorageMessageUnion>(stream);
+ }
+
+ public NodeEndpoint Endpoint
+ {
+ get { return endpoint; }
+ }
+
+ public void Dispose()
+ {
+ stream.Dispose();
+ client.Close();
+ }
+
+ public PutResult[] Put(Guid topologyVersion,
+ params ExtendedPutRequest[] valuesToAdd)
+ {
+ writer.Write(new StorageMessageUnion.Builder
+ {
+ Type = StorageMessageType.PutRequests,
+ PutRequestsList =
+ {
+ valuesToAdd.Select(x => CreatePutRequest(x))
+ }
+ }.Build());
+ writer.Flush();
+ stream.Flush();
+
+ var union = ReadReply(StorageMessageType.PutResponses);
+ return union.PutResponsesList.Select(x => new PutResult
+ {
+ ConflictExists = x.ConflictExists,
+ Version = new PersistentHashTable.ValueVersion
+ {
+ InstanceId = new Guid(x.Version.InstanceId.ToByteArray()),
+ Number = x.Version.Number
+ }
+ }).ToArray();
+ }
+
+ private StorageMessageUnion ReadReply(StorageMessageType responses)
+ {
+ var iterator = MessageStreamIterator<StorageMessageUnion>.FromStreamProvider(() => new UndisposableStream(stream));
+ var union = iterator.First();
+
+ if (union.Type == StorageMessageType.StorageErrorResult)
+ throw new RemoteNodeException(union.Exception.Message);
+ if (union.Type != responses)
+ throw new UnexpectedReplyException("Got reply " + union.Type + " but expected " + responses);
+
+ return union;
+ }
+
+ private static PutRequestMessage CreatePutRequest(ExtendedPutRequest x)
+ {
+ var builder = new PutRequestMessage.Builder
+ {
+ Bytes = ByteString.CopyFrom(x.Bytes),
+ IsReadOnly = x.IsReadOnly,
+ IsReplicationRequest = x.IsReplicationRequest,
+ Key = x.Key,
+ OptimisticConcurrency = x.OptimisticConcurrency,
+ Segment = x.Segment,
+ Tag = x.Tag,
+ };
+ if (x.ExpiresAt != null)
+ builder.ExpiresAtAsDouble = x.ExpiresAt.Value.ToOADate();
+ if (x.ReplicationTimeStamp != null)
+ builder.ReplicationTimeStampAsDouble = x.ReplicationTimeStamp.Value.ToOADate();
+ if (x.ReplicationVersion != null)
+ {
+ builder.ReplicationVersion = new ValueVersion.Builder
+ {
+ InstanceId = ByteString.CopyFrom(x.ReplicationVersion.InstanceId.ToByteArray()),
+ Number = x.ReplicationVersion.Number
+ }.Build();
+ }
+ return builder.Build();
+ }
+
+ public bool[] Remove(Guid topologyVersion,
+ params ExtendedRemoveRequest[] valuesToRemove)
+ {
+ writer.Write(new StorageMessageUnion.Builder
+ {
+ Type = StorageMessageType.RemoveRequests,
+ RemoveRequestsList =
+ {
+ valuesToRemove.Select(x => new RemoveRequestMessage.Builder
+ {
+ IsReplicationRequest = x.IsReplicationRequest,
+ Key = x.Key,
+ Segment = x.Segment,
+ SpecificVersion = new ValueVersion.Builder
+ {
+ InstanceId = ByteString.CopyFrom(x.SpecificVersion.InstanceId.ToByteArray()),
+ Number = x.SpecificVersion.Number
+ }.Build()
+ }.Build())
+ }
+ }.Build());
+ writer.Flush();
+ stream.Flush();
+
+ var union = ReadReply(StorageMessageType.RemoveResponses);
+ return union.RemoveResponesList.Select(x => x.WasRemoved).ToArray();
+ }
+
+ public Value[][] Get(Guid topologyVersion,
+ params ExtendedGetRequest[] valuesToGet)
+ {
+ writer.Write(new StorageMessageUnion.Builder
+ {
+ Type = StorageMessageType.GetRequests,
+ GetRequestsList =
+ {
+ valuesToGet.Select(x => CreateGetRequest(x))
+ }
+ }.Build());
+ writer.Flush();
+ stream.Flush();
+
+ var union = ReadReply(StorageMessageType.RemoveResponses);
+ return union.GetResponsesList.Select(x =>
+ x.ValuesList.Select(y=> new Value
+ {
+ Data = y.Data.ToByteArray(),
+ ExpiresAt = y.ExpiresAtAsDouble != null ? DateTime.FromOADate(y.ExpiresAtAsDouble.Value) : (DateTime?)null,
+ Key = y.Key,
+ ParentVersions = y.ParentVersionsList.Select(z => new PersistentHashTable.ValueVersion
+ {
+ InstanceId = new Guid(z.InstanceId.ToByteArray()),
+ Number = z.Number
+ }).ToArray(),
+ ReadOnly = y.ReadOnly,
+ Sha256Hash = y.Sha256Hash.ToByteArray(),
+ Tag = y.Tag,
+ Timestamp = DateTime.FromOADate(y.TimeStampAsDouble),
+ Version = new PersistentHashTable.ValueVersion
+ {
+ InstanceId = new Guid(y.Version.InstanceId.ToByteArray()),
+ Number = y.Version.Number
+ }
+ }).ToArray()
+ ).ToArray();
+ }
+
+ private static GetRequestMessage CreateGetRequest(ExtendedGetRequest x)
+ {
+ return new GetRequestMessage.Builder
+ {
+ Key = x.Key,
+ Segment = x.Segment,
+ }.Build();
+ }
+
+ public IDistributedHashTableNodeReplication Replication
+ {
+ get { throw new NotImplementedException(); }
+ }
+ }
+}
@@ -79,7 +79,7 @@ public PutResult[] Put(Guid topologyVersion, params ExtendedPutRequest[] valuesT
{
AssertSegmentNotMoved(actions, request.Segment);
- request.Tag = (int)request.Segment;
+ request.Tag = request.Segment;
if (request.ParentVersions == null)
throw new ArgumentException("Could not accept request with no ParentVersions");
@@ -103,18 +103,18 @@ public PutResult[] Put(Guid topologyVersion, params ExtendedPutRequest[] valuesT
}
private static void AssertSegmentNotMoved(PersistentHashTableActions actions,
- int? range)
+ int? segment)
{
- if(range == null)
- throw new ArgumentNullException("range","Segment cannot be null");
+ if(segment < 0)
+ throw new ArgumentNullException("segment", "Segment cannot be negative");
var values = actions.Get(new GetRequest
{
- Key = Constants.MovedSegment + range
+ Key = Constants.MovedSegment + segment
});
if(values.Length>0)
{
- throw new SeeOtherException("This key belongs to a range assigned to another node")
+ throw new SeeOtherException("This key belongs to a segment assigned to another node")
{
Endpoint = NodeEndpoint.FromBytes(values[0].Data)
};
@@ -177,13 +177,13 @@ public bool[] Remove(Guid topologyVersion, params ExtendedRemoveRequest[] values
if (valuesToSend[0].IsReplicationRequest)
return;
- if (distributedHashTableNode.IsSegmentOwned(valuesToSend[0].Segment.Value) == false)
+ if (distributedHashTableNode.IsSegmentOwned(valuesToSend[0].Segment) == false)
{
// if this got to us because of fail over, and we need to replicate to the real owner
// and to any other backups
- distributedHashTableNode.SendToOwner(valuesToSend[0].Segment.Value, valuesToSend);
+ distributedHashTableNode.SendToOwner(valuesToSend[0].Segment, valuesToSend);
}
- distributedHashTableNode.SendToAllOtherBackups(valuesToSend[0].Segment.Value, valuesToSend);
+ distributedHashTableNode.SendToAllOtherBackups(valuesToSend[0].Segment, valuesToSend);
}
public Value[][] Get(Guid topologyVersion,params ExtendedGetRequest[] valuesToGet)
@@ -5,8 +5,12 @@ namespace Rhino.DistributedHashTable.Parameters
{
public class ExtendedGetRequest : GetRequest, IExtendedRequest
{
+ public ExtendedGetRequest()
+ {
+ Segment = -1;
+ }
public Guid TopologyVersion { get; set; }
- public int? Segment { get; set; }
+ public int Segment { get; set; }
public bool IsReplicationRequest { get; set; }
}
}
Oops, something went wrong.

0 comments on commit bcb46af

Please sign in to comment.