Permalink
Browse files

Adding more tests for read/write for the DHT

  • Loading branch information...
1 parent ed817bc commit 716cc9411da3e9db9c8ba6395a664ee1271e328c ayenderahien committed Jun 5, 2009
@@ -27,7 +27,6 @@ public ReadingValues()
{
Key = "test",
Bytes = new byte[]{1,2,4},
- TopologyVersion = guid,
Segment = 0,
});
}
@@ -82,7 +81,6 @@ public void WillGetVersionNumberFromPut()
{
Key = "test",
Bytes = new byte[] { 1, 2, 4 },
- TopologyVersion = guid,
Segment = 0,
});
Assert.NotNull(results[0].Version);
@@ -96,7 +94,6 @@ public void WillReplicateToOtherSystems()
{
Key = "test",
Bytes = new byte[] { 1, 2, 4 },
- TopologyVersion = guid,
Segment = 0,
};
distributedHashTableStorage.Put(guid, request);
@@ -111,7 +108,6 @@ public void WillReplicateToOwnerWhenFailedOverToAnotherNode()
{
Key = "test",
Bytes = new byte[] { 1, 2, 4 },
- TopologyVersion = guid,
Segment = 0,
};
distributedHashTableStorage.Put(guid, request);
@@ -126,7 +122,6 @@ public void WillReplicateToOtherBackupsWhenFailedOverToAnotherNode()
{
Key = "test",
Bytes = new byte[] { 1, 2, 4 },
- TopologyVersion = guid,
Segment = 0,
};
distributedHashTableStorage.Put(guid, request);
@@ -158,7 +153,6 @@ public RemovingValues()
{
Key = "test",
Bytes = new byte[] { 1, 2, 4 },
- TopologyVersion = guid,
Segment = 0,
});
version = results[0].Version;
@@ -171,7 +165,6 @@ public void WillConfirmRemovalOfExistingValue()
{
Key = "test",
SpecificVersion = version,
- TopologyVersion = guid,
Segment = 0,
});
Assert.True(results[0]);
@@ -184,7 +177,6 @@ public void WillNotConfirmRemovalOfNonExistingValue()
{
Key = "test2",
SpecificVersion = version,
- TopologyVersion = guid,
Segment = 0,
});
Assert.False(results[0]);
@@ -198,7 +190,6 @@ public void WillReplicateToOtherSystems()
{
Key = "test",
SpecificVersion = version,
- TopologyVersion = guid,
Segment = 0,
};
distributedHashTableStorage.Remove(guid, request);
@@ -213,7 +204,6 @@ public void WillReplicateToOwnerWhenFailedOverToAnotherNode()
{
Key = "test",
SpecificVersion = version,
- TopologyVersion = guid,
Segment = 0,
};
distributedHashTableStorage.Remove(guid, request);
@@ -228,7 +218,6 @@ public void WillReplicateToOtherBackupsWhenFailedOverToAnotherNode()
{
Key = "test",
SpecificVersion = version,
- TopologyVersion = guid,
Segment = 0,
};
distributedHashTableStorage.Remove(guid, request);
@@ -1,6 +1,7 @@
using System;
using Rhino.DistributedHashTable.Client;
using Rhino.DistributedHashTable.Hosting;
+using Rhino.DistributedHashTable.Parameters;
using Xunit;
using System.Linq;
@@ -20,15 +21,73 @@ public NodeOverTheNetwork()
storageHost.Start();
}
-
[Fact]
- public void WillJoinMaster()
+ public void NodeHaveJoinedMasterAutomatically()
{
var masterProxy = new DistributedHashTableMasterClient(masterUri);
var topology = masterProxy.GetTopology();
Assert.True(topology.Segments.All(x => x.AssignedEndpoint == storageHost.Endpoint));
}
+ [Fact]
+ public void CanPutItem()
+ {
+ using (var storageProxy = new DistributedHashTableStorageClient(storageHost.Endpoint))
+ {
+ var masterProxy = new DistributedHashTableMasterClient(masterUri);
+ var topology = masterProxy.GetTopology();
+ var results = storageProxy.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] {1, 2, 3, 4},
+ Key = "test",
+ Segment = 1,
+ });
+ Assert.False(results[0].ConflictExists);
+
+ var values = storageProxy.Get(topology.Version, new ExtendedGetRequest
+ {
+ Key = "test",
+ Segment = 1
+ });
+
+ Assert.Equal(1, values[0].Length);
+ Assert.Equal(new byte[] {1, 2, 3, 4}, values[0][0].Data);
+ }
+ }
+
+ [Fact]
+ public void CanRemoveItem()
+ {
+ using (var storageProxy = new DistributedHashTableStorageClient(storageHost.Endpoint))
+ {
+ var masterProxy = new DistributedHashTableMasterClient(masterUri);
+ var topology = masterProxy.GetTopology();
+ var results = storageProxy.Put(topology.Version, new ExtendedPutRequest
+ {
+ Bytes = new byte[] { 1, 2, 3, 4 },
+ Key = "test",
+ Segment = 1,
+ });
+ Assert.False(results[0].ConflictExists);
+
+ var removed = storageProxy.Remove(topology.Version, new ExtendedRemoveRequest
+ {
+ Key = "test",
+ SpecificVersion = results[0].Version,
+ Segment = 1
+ });
+ Assert.True(removed[0]);
+
+ var values = storageProxy.Get(topology.Version, new ExtendedGetRequest
+ {
+ Key = "test",
+ Segment = 1
+ });
+
+ Assert.Equal(0, values[0].Length);
+ }
+ }
+
public void Dispose()
{
storageHost.Dispose();
@@ -52,6 +52,7 @@ public void Dispose()
writer.Write(new StorageMessageUnion.Builder
{
Type = StorageMessageType.PutRequests,
+ TopologyVersion = ByteString.CopyFrom(topologyVersion.ToByteArray()),
PutRequestsList =
{
valuesToAdd.Select(x => CreatePutRequest(x))
@@ -118,6 +119,7 @@ private static PutRequestMessage CreatePutRequest(ExtendedPutRequest x)
writer.Write(new StorageMessageUnion.Builder
{
Type = StorageMessageType.RemoveRequests,
+ TopologyVersion = ByteString.CopyFrom(topologyVersion.ToByteArray()),
RemoveRequestsList =
{
valuesToRemove.Select(x => new RemoveRequestMessage.Builder
@@ -146,6 +148,7 @@ private static PutRequestMessage CreatePutRequest(ExtendedPutRequest x)
writer.Write(new StorageMessageUnion.Builder
{
Type = StorageMessageType.GetRequests,
+ TopologyVersion = ByteString.CopyFrom(topologyVersion.ToByteArray()),
GetRequestsList =
{
valuesToGet.Select(x => CreateGetRequest(x))
@@ -154,7 +157,7 @@ private static PutRequestMessage CreatePutRequest(ExtendedPutRequest x)
writer.Flush();
stream.Flush();
- var union = ReadReply(StorageMessageType.RemoveResponses);
+ var union = ReadReply(StorageMessageType.GetResponses);
return union.GetResponsesList.Select(x =>
x.ValuesList.Select(y=> new Value
{
@@ -73,6 +73,10 @@ private void OnBeginAcceptTcpClient(IAsyncResult result)
{
return;
}
+ catch(InvalidOperationException)
+ {
+ return;
+ }
try
{
using (client)
@@ -130,7 +134,6 @@ private void OnBeginAcceptTcpClient(IAsyncResult result)
var requests = wrapper.RemoveRequestsList.Select(x =>
new ExtendedRemoveRequest
{
- TopologyVersion = topologyVersion,
Segment = x.Segment,
Key = x.Key,
SpecificVersion = GetVersion(x.SpecificVersion),
@@ -140,6 +143,7 @@ private void OnBeginAcceptTcpClient(IAsyncResult result)
writer.Write(new StorageMessageUnion.Builder
{
Type = StorageMessageType.RemoveResponses,
+ TopologyVersion = ByteString.CopyFrom(topologyVersion.ToByteArray()),
RemoveResponesList =
{
removed.Select(x => new RemoveResponseMessage.Builder
@@ -157,22 +161,31 @@ private void OnBeginAcceptTcpClient(IAsyncResult result)
var puts = wrapper.PutRequestsList.Select(x => new ExtendedPutRequest
{
Bytes = x.Bytes.ToByteArray(),
- ExpiresAt = x.ExpiresAtAsDouble != null ? DateTime.FromOADate(x.ExpiresAtAsDouble.Value) : (DateTime?)null,
+ ExpiresAt = x.HasExpiresAtAsDouble ?
+ DateTime.FromOADate(x.ExpiresAtAsDouble.Value) :
+ (DateTime?)null,
IsReadOnly = x.IsReadOnly,
IsReplicationRequest = x.IsReplicationRequest,
Key = x.Key,
OptimisticConcurrency = x.OptimisticConcurrency,
ParentVersions = x.ParentVersionsList.Select(y => GetVersion(y)).ToArray(),
- ReplicationTimeStamp = x.ReplicationTimeStampAsDouble != null ? DateTime.FromOADate(x.ReplicationTimeStampAsDouble.Value) : (DateTime?)null,
+ ReplicationTimeStamp = x.HasReplicationTimeStampAsDouble ?
+ DateTime.FromOADate(x.ReplicationTimeStampAsDouble.Value) :
+ (DateTime?)null,
ReplicationVersion = GetVersion(x.ReplicationVersion),
Segment = x.Segment,
- TopologyVersion = topologyVersion,
Tag = x.Tag
}).ToArray();
var results = storage.Put(topologyVersion, puts);
+ var xx = storage.Get(topologyVersion, new ExtendedGetRequest
+ {
+ Key = puts[0].Key,
+ Segment = 1,
+ });
writer.Write(new StorageMessageUnion.Builder
{
Type = StorageMessageType.PutResponses,
+ TopologyVersion = ByteString.CopyFrom(topologyVersion.ToByteArray()),
PutResponsesList =
{
results.Select(x=> new PutResponseMessage.Builder
@@ -193,7 +206,6 @@ private void OnBeginAcceptTcpClient(IAsyncResult result)
Segment = x.Segment,
Key = x.Key,
SpecifiedVersion = GetVersion(x.SpecificVersion),
- TopologyVersion = topologyVersion,
IsReplicationRequest = false
}).ToArray());
var reply = new StorageMessageUnion.Builder
@@ -226,7 +238,7 @@ private void OnBeginAcceptTcpClient(IAsyncResult result)
private static ValueVersion GetVersion(Protocol.ValueVersion version)
{
- if (version == null)
+ if (version == Protocol.ValueVersion.DefaultInstance)
return null;
return new ValueVersion
{
@@ -59,6 +59,8 @@ public bool IsSegmentOwned(int range)
IExtendedRequest[] requests)
{
var ownerSegment = Topology.GetSegment(range);
+ if (ownerSegment.AssignedEndpoint == null)
+ return;
queueManager.Send(ownerSegment.AssignedEndpoint.Async,
new MessagePayload
{
@@ -70,9 +72,13 @@ public bool IsSegmentOwned(int range)
IExtendedRequest[] requests)
{
var ownerSegment = Topology.GetSegment(range);
- foreach (var endpoint in ownerSegment.Backups.Append(ownerSegment.AssignedEndpoint).Where(x => x != endpoint))
+ foreach (var otherBackup in ownerSegment.Backups
+ .Append(ownerSegment.AssignedEndpoint)
+ .Where(x => x != endpoint))
{
- queueManager.Send(endpoint.Async,
+ if(otherBackup == null)
+ continue;
+ queueManager.Send(otherBackup.Async,
new MessagePayload
{
Data = messageSerializer.Serialize(requests),
@@ -97,8 +103,8 @@ public void GivingUpOn(params int[] rangesGivingUpOn)
public void Start()
{
- Topology = master.GetTopology();
var assignedSegments = master.Join(endpoint);
+ Topology = master.GetTopology();
rangesThatWeAreCatchingUpOn = assignedSegments
.Where(x => x.AssignedEndpoint != endpoint)
.ToList();
@@ -9,7 +9,6 @@ public ExtendedGetRequest()
{
Segment = -1;
}
- public Guid TopologyVersion { get; set; }
public int Segment { get; set; }
public bool IsReplicationRequest { get; set; }
}
@@ -9,7 +9,6 @@ public ExtendedPutRequest()
{
Segment = -1;
}
- public Guid TopologyVersion { get; set; }
public int Segment { get; set; }
public bool IsReplicationRequest { get; set; }
}
@@ -9,7 +9,6 @@ public ExtendedRemoveRequest()
{
Segment = -1;
}
- public Guid TopologyVersion { get; set; }
public int Segment { get; set; }
public bool IsReplicationRequest { get; set; }
}
@@ -4,7 +4,6 @@ namespace Rhino.DistributedHashTable.Parameters
{
public interface IExtendedRequest
{
- Guid TopologyVersion { get; set; }
int Segment { get; set; }
bool IsReplicationRequest { get; set; }
}

0 comments on commit 716cc94

Please sign in to comment.