Browse files

Major refactor:

1. Make most calls asynchronous
2. Broader support for pipelining
3. Implement pub-sub

Test fixes in progress.
  • Loading branch information...
1 parent 2f25010 commit f7fa034ccb6ab81b77356cefdf6242fff74d680a @ccollie committed Jun 22, 2010
Showing with 3,089 additions and 692 deletions.
  1. +4 −5 Guanima.Redis.Tests/GenericTests.cs
  2. +2 −1 Guanima.Redis.Tests/Guanima.Redis.Tests.csproj
  3. +1 −1 Guanima.Redis.Tests/HashTests.cs
  4. +9 −9 Guanima.Redis.Tests/ListTests.cs
  5. +2 −2 Guanima.Redis.Tests/LockTests.cs
  6. +190 −0 Guanima.Redis.Tests/PubSub.cs
  7. +0 −1 Guanima.Redis.Tests/RedisClientTest.cs
  8. +4 −4 Guanima.Redis.Tests/{BaseRedisClientTest.cs → RedisClientTestFixture.cs}
  9. +1 −1 Guanima.Redis.Tests/SetTests.cs
  10. +1 −1 Guanima.Redis.Tests/SortedSetTests.cs
  11. +6 −6 Guanima.Redis.Tests/StringTests.cs
  12. +3 −4 Guanima.Redis.Tests/TransactionTests.cs
  13. +1 −1 Guanima.Redis.Tests/Transcoders/DefaultTranscoderTestFixture.cs
  14. +0 −1 Guanima.Redis/ClassDiagram1.cd
  15. +95 −0 Guanima.Redis/Client/ClientAsynchState.cs
  16. 0 Guanima.Redis/{ → Client}/IRedisClientTransaction.cs
  17. 0 Guanima.Redis/{ → Client}/IRedisPipeline.cs
  18. +4 −4 Guanima.Redis/{ → Client}/RedisClient.Connection.cs
  19. +6 −6 Guanima.Redis/{ → Client}/RedisClient.Control.cs
  20. +5 −5 Guanima.Redis/{ → Client}/RedisClient.Generic.cs
  21. 0 Guanima.Redis/{ → Client}/RedisClient.Hashes.cs
  22. 0 Guanima.Redis/{ → Client}/RedisClient.Lists.cs
  23. 0 Guanima.Redis/{ → Client}/RedisClient.Locks.cs
  24. +131 −0 Guanima.Redis/Client/RedisClient.Pipeline.cs
  25. 0 Guanima.Redis/{ → Client}/RedisClient.Poco.cs
  26. +84 −0 Guanima.Redis/Client/RedisClient.PubSub.cs
  27. 0 Guanima.Redis/{ → Client}/RedisClient.Sets.cs
  28. +0 −2 Guanima.Redis/{ → Client}/RedisClient.SortedSets.cs
  29. +2 −2 Guanima.Redis/{ → Client}/RedisClient.Strings.cs
  30. +5 −13 Guanima.Redis/{ → Client}/RedisClient.Transactions.cs
  31. +38 −100 Guanima.Redis/{ → Client}/RedisClient.cs
  32. 0 Guanima.Redis/{ → Client}/RedisClientTransaction.cs
  33. +482 −0 Guanima.Redis/Client/RedisCommandQueue.cs
  34. 0 Guanima.Redis/{ → Client}/RedisLockToken.cs
  35. +546 −0 Guanima.Redis/Client/RedisSubscription.cs
  36. +18 −0 Guanima.Redis/Client/ServerResponseConstants.cs
  37. +93 −0 Guanima.Redis/Commands/CommandBuffer.cs
  38. +32 −1 Guanima.Redis/Commands/CommandHelpers.cs
  39. +147 −0 Guanima.Redis/Commands/CommandNames.cs
  40. +4 −18 Guanima.Redis/Commands/Generic/AdHocCommand.cs
  41. +1 −1 Guanima.Redis/Commands/Generic/DelCommand.cs
  42. +11 −11 Guanima.Redis/Commands/Generic/PipelineCommand.cs
  43. +1 −13 Guanima.Redis/Commands/Generic/SortCommand.cs
  44. +1 −1 Guanima.Redis/Commands/Hashes/HIncrByCommand.cs
  45. +1 −1 Guanima.Redis/Commands/Hashes/HMGetCommand.cs
  46. +1 −1 Guanima.Redis/Commands/Hashes/HMSetCommand.cs
  47. +1 −1 Guanima.Redis/Commands/Hashes/HashFieldCommand.cs
  48. +4 −6 Guanima.Redis/Commands/IRedisCommand.cs
  49. +1 −1 Guanima.Redis/Commands/Lists/BLPopCommand.cs
  50. +1 −1 Guanima.Redis/Commands/Lists/BRPopCommand.cs
  51. +2 −2 Guanima.Redis/Commands/PubSub/PSubscribeCommand.cs
  52. +2 −2 Guanima.Redis/Commands/PubSub/PUnsubscribeCommand.cs
  53. +6 −0 Guanima.Redis/Commands/PubSub/PublishCommand.cs
  54. +28 −3 Guanima.Redis/Commands/PubSub/SubscribeCommand.cs
  55. +7 −2 Guanima.Redis/Commands/PubSub/UnsubscribeCommand.cs
  56. +41 −25 Guanima.Redis/Commands/RedisCommand.cs
  57. +1 −1 Guanima.Redis/Commands/Sets/SDiffCommand.cs
  58. +1 −1 Guanima.Redis/Commands/Sets/SDiffStoreCommand.cs
  59. +1 −1 Guanima.Redis/Commands/Sets/SInterCommand.cs
  60. +1 −1 Guanima.Redis/Commands/Sets/SInterStoreCommand.cs
  61. +1 −1 Guanima.Redis/Commands/Sets/SUnionCommand.cs
  62. +1 −1 Guanima.Redis/Commands/Sets/SUnionStoreCommand.cs
  63. +1 −1 Guanima.Redis/Commands/SortedSets/BaseUnionIntersectionCommand.cs
  64. +1 −1 Guanima.Redis/Commands/SortedSets/ZRangeByScoreCommand.cs
  65. +1 −1 Guanima.Redis/Commands/Strings/MGetCommand.cs
  66. +1 −1 Guanima.Redis/Commands/Strings/MSetCommand.cs
  67. +1 −1 Guanima.Redis/Commands/Strings/MSetNXCommand.cs
  68. +9 −9 Guanima.Redis/Commands/Transactions/MultiExecCommand.cs
  69. +18 −0 Guanima.Redis/Commands/Transactions/UnwatchCommand.cs
  70. +18 −0 Guanima.Redis/Commands/Transactions/WatchCommand.cs
  71. +37 −13 Guanima.Redis/Configuration/EndPointElement.cs
  72. +3 −3 Guanima.Redis/Extensions/ByteArrayExtensions.cs
  73. +31 −22 Guanima.Redis/Guanima.Redis.csproj
  74. +45 −0 Guanima.Redis/IRedisSubscription.cs
  75. +0 −36 Guanima.Redis/IServerResponse.cs
  76. +113 −5 Guanima.Redis/PooledSocket.cs
  77. +107 −230 Guanima.Redis/Protocol/RedisProtocol.cs
  78. +222 −0 Guanima.Redis/Protocol/ReplyParser.cs
  79. +0 −102 Guanima.Redis/RedisClient.Pipeline.cs
  80. +182 −2 Guanima.Redis/RedisValue.cs
  81. +137 −0 Guanima.Redis/Utils/BufferManager.cs
  82. +80 −0 Guanima.Redis/Utils/ResizableBuffer.cs
  83. +51 −0 Guanima.Redis/Utils/StringUtils.cs
View
9 Guanima.Redis.Tests/GenericTests.cs
@@ -1,11 +1,10 @@
-using System;
-using System.Threading;
+using System.Threading;
using NUnit.Framework;
namespace Guanima.Redis.Tests
{
[TestFixture]
- public class ClientGenericTests : BaseRedisClientTests
+ public class ClientGenericTests : RedisClientTestFixture
{
private const string Key = "_TEST_KEY_";
@@ -18,9 +17,9 @@ public void TestSelect()
r.Select(1);
r.Set(Key, "DB1");
r.Select(0);
- Assert.AreEqual(r.GetString(Key), "DB0");
+ Assert.AreEqual("DB0", r.GetString(Key));
r.Select(1);
- Assert.AreEqual(r.GetString(Key), "DB1");
+ Assert.AreEqual("DB1", r.GetString(Key));
}
#endregion
View
3 Guanima.Redis.Tests/Guanima.Redis.Tests.csproj
@@ -42,12 +42,13 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
- <Compile Include="BaseRedisClientTest.cs" />
+ <Compile Include="RedisClientTestFixture.cs" />
<Compile Include="HashTests.cs" />
<Compile Include="KetamaTest.cs" />
<Compile Include="GenericTests.cs" />
<Compile Include="ListTests.cs" />
<Compile Include="LockTests.cs" />
+ <Compile Include="PubSub.cs" />
<Compile Include="StringTests.cs" />
<Compile Include="RedisClientTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
View
2 Guanima.Redis.Tests/HashTests.cs
@@ -4,7 +4,7 @@
namespace Guanima.Redis.Tests
{
- public class HashTests : BaseRedisClientTests
+ public class HashTests : RedisClientTestFixture
{
[Test]
View
18 Guanima.Redis.Tests/ListTests.cs
@@ -6,7 +6,7 @@
namespace Guanima.Redis.Tests
{
[TestFixture]
- public class ListTests : BaseRedisClientTests
+ public class ListTests : RedisClientTestFixture
{
const string ListId = "testlist";
const string ListId2 = "testlist2";
@@ -33,7 +33,7 @@ public void TestRPush()
public void Can_GetListLength()
{
var storeMembers = new List<string> { "one", "two", "three", "four" };
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
storeMembers.ForEach(x => redis.RPush(ListId, x));
@@ -47,7 +47,7 @@ public void Can_GetListLength()
public void Can_GetItemFromList()
{
var storeMembers = new List<string> { "one", "two", "three", "four" };
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
storeMembers.ForEach(x => redis.RPush(ListId, x));
@@ -62,7 +62,7 @@ public void Can_GetItemFromList()
public void Can_Set_Item_In_List()
{
var storeMembers = new List<string> { "one", "two", "three", "four" };
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
storeMembers.ForEach(x => redis.RPush(ListId, x));
@@ -79,7 +79,7 @@ public void Can_Set_Item_In_List()
public void Test_RPop()
{
var storeMembers = new List<string> { "one", "two", "three", "four" };
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
storeMembers.ForEach(x => redis.RPush(ListId, x));
@@ -110,10 +110,10 @@ public void Test_Basic_LPop_RPop()
[Test]
- public void Can_DequeueFromList()
+ public void Can_Dequeue_From_List()
{
var storeMembers = new List<string> { "one", "two", "three", "four" };
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
storeMembers.ForEach(x => redis.RPush(ListId, x));
@@ -130,7 +130,7 @@ public void Can_Move_Values_Between_Lists()
var list2Members = new List<string> { "five", "six", "seven" };
const string item4 = "four";
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
list1Members.ForEach(x => redis.RPush(ListId, x));
list2Members.ForEach(x => redis.RPush(ListId2, x));
@@ -149,7 +149,7 @@ public void Can_Move_Values_Between_Lists()
[Test]
- // {LREM, starting from tail with negative count}
+ // LREM, starting from tail with negative count
public void Test_LRem()
{
r.FlushDB();
View
4 Guanima.Redis.Tests/LockTests.cs
@@ -4,7 +4,7 @@
namespace Guanima.Redis.Tests
{
[TestFixture]
- public class LockTests : BaseRedisClientTests
+ public class LockTests : RedisClientTestFixture
{
private const string LockName = "_LOCK_";
@@ -44,7 +44,7 @@ public void Acquiring_A_Lock_Should_Prevent_Another_Client_From_Acquiring_It()
using (r.Lock(LockName))
{
Assert.That(r.Exists(LockName));
- using(var newClient = GetClient())
+ using(var newClient = CreateClient())
{
Assert.Throws(typeof (RedisLockTimeoutException),
() =>
View
190 Guanima.Redis.Tests/PubSub.cs
@@ -0,0 +1,190 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Guanima.Redis.Extensions;
+using NUnit.Framework;
+
+namespace Guanima.Redis.Tests
+{
+ [TestFixture]
+ public class SimplePubSub : RedisClientTestFixture
+ {
+ const string ChannelName = "CHANNEL";
+ const string MessagePrefix = "MESSAGE ";
+ const int PublishMessageCount = 5;
+
+
+ [Test]
+ public void Publish_And_Receive_5_Messages()
+ {
+ using (var subscription = r.CreateSubscription())
+ {
+ subscription.OnException = ex =>
+ {
+ Console.WriteLine("Caught exception '{0}'", ex);
+ };
+
+ subscription.OnSubscribe = channel =>
+ {
+ Console.WriteLine("Subscribed to '{0}'", channel);
+ };
+
+ subscription.OnUnSubscribe = channel =>
+ {
+ Console.WriteLine("UnSubscribed from '{0}'", channel);
+ };
+
+ subscription.OnMessage = (channel, msg) =>
+ {
+ Console.WriteLine("Received '{0}' from channel '{1}'", msg.FromUtf8(), channel);
+ };
+
+ subscription.Subscribe(ChannelName);
+
+ ThreadPool.QueueUserWorkItem(x =>
+ {
+ Thread.Sleep(200);
+ Console.WriteLine("Begin publishing messages...");
+
+ using (var redisPublisher = CreateClient())
+ {
+ for (var i = 1; i <= PublishMessageCount; i++)
+ {
+ var message = MessagePrefix + i;
+ Console.WriteLine("Publishing '{0}' to '{1}'", message, ChannelName);
+ redisPublisher.Publish(ChannelName, message);
+ }
+ }
+ });
+
+ Console.WriteLine("Started Listening On '{0}'", ChannelName);
+ }
+
+ Console.WriteLine("EOF");
+
+ Thread.Sleep(5000);
+
+ /*Output:
+ Started Listening On 'CHANNEL'
+ Subscribed to 'CHANNEL'
+ Begin publishing messages...
+ Publishing 'MESSAGE 1' to 'CHANNEL'
+ Received 'MESSAGE 1' from channel 'CHANNEL'
+ Publishing 'MESSAGE 2' to 'CHANNEL'
+ Received 'MESSAGE 2' from channel 'CHANNEL'
+ Publishing 'MESSAGE 3' to 'CHANNEL'
+ Received 'MESSAGE 3' from channel 'CHANNEL'
+ Publishing 'MESSAGE 4' to 'CHANNEL'
+ Received 'MESSAGE 4' from channel 'CHANNEL'
+ Publishing 'MESSAGE 5' to 'CHANNEL'
+ Received 'MESSAGE 5' from channel 'CHANNEL'
+ UnSubscribed from 'CHANNEL'
+ EOF
+ */
+ }
+
+ [Test]
+ public void Publish_5_Messages_To_3_Clients()
+ {
+ const int noOfClients = 3;
+
+ var clients = new List<RedisClient>(noOfClients);
+ var subscriptions = new List<IRedisSubscription>();
+ var counts = new int[noOfClients];
+
+ for (var i = 0; i < noOfClients; i++)
+ {
+ var clientNo = i+1;
+ counts[i] = 0;
+ var subscriber = CreateClient();
+ clients.Add(subscriber);
+ var subscription = subscriber.CreateSubscription();
+
+ subscriptions.Add(subscription);
+
+ subscription.OnException = ex =>
+ {
+ Console.WriteLine("Client {0} Caught exception '{1}'", clientNo, ex.Message);
+ };
+
+ subscription.OnSubscribe = channel =>
+ {
+ Console.WriteLine("Client #{0} Subscribed to '{1}'", clientNo, channel);
+ };
+
+ subscription.OnUnSubscribe = channel =>
+ {
+ Console.WriteLine("Client #{0} UnSubscribed from '{1}'", clientNo, channel);
+ };
+
+ subscription.OnMessage = (channel, msg) =>
+ {
+ Console.WriteLine("Client #{0} Received '{1}' from channel '{2}'",
+ clientNo, msg.FromUtf8(), channel);
+
+ // var index = subscriptions.IndexOf(subscription);
+ // if (Interlocked.Increment(ref counts[index]) == PublishMessageCount)
+ //{
+ // subscription.UnSubscribe();
+ //}
+ };
+
+ subscription.Subscribe(ChannelName);
+
+ Console.WriteLine("Client #{0} started Listening On '{1}'", clientNo, ChannelName);
+
+ }
+
+
+ using (var redisClient = CreateClient())
+ {
+ Console.WriteLine("Begin publishing messages...");
+
+ for (var i = 1; i <= PublishMessageCount; i++)
+ {
+ var message = MessagePrefix + i;
+ Console.WriteLine("Publishing '{0}' to '{1}'", message, ChannelName);
+ redisClient.Publish(ChannelName, message);
+ }
+ }
+
+ Thread.Sleep(5000);
+
+ /*Output:
+ Client #1 started Listening On 'CHANNEL'
+ Client #2 started Listening On 'CHANNEL'
+ Client #1 Subscribed to 'CHANNEL'
+ Client #2 Subscribed to 'CHANNEL'
+ Client #3 started Listening On 'CHANNEL'
+ Client #3 Subscribed to 'CHANNEL'
+ Begin publishing messages...
+ Publishing 'MESSAGE 1' to 'CHANNEL'
+ Client #1 Received 'MESSAGE 1' from channel 'CHANNEL'
+ Client #2 Received 'MESSAGE 1' from channel 'CHANNEL'
+ Publishing 'MESSAGE 2' to 'CHANNEL'
+ Client #1 Received 'MESSAGE 2' from channel 'CHANNEL'
+ Client #2 Received 'MESSAGE 2' from channel 'CHANNEL'
+ Publishing 'MESSAGE 3' to 'CHANNEL'
+ Client #3 Received 'MESSAGE 1' from channel 'CHANNEL'
+ Client #3 Received 'MESSAGE 2' from channel 'CHANNEL'
+ Client #3 Received 'MESSAGE 3' from channel 'CHANNEL'
+ Client #1 Received 'MESSAGE 3' from channel 'CHANNEL'
+ Client #2 Received 'MESSAGE 3' from channel 'CHANNEL'
+ Publishing 'MESSAGE 4' to 'CHANNEL'
+ Client #1 Received 'MESSAGE 4' from channel 'CHANNEL'
+ Client #3 Received 'MESSAGE 4' from channel 'CHANNEL'
+ Publishing 'MESSAGE 5' to 'CHANNEL'
+ Client #1 Received 'MESSAGE 5' from channel 'CHANNEL'
+ Client #3 Received 'MESSAGE 5' from channel 'CHANNEL'
+ Client #1 UnSubscribed from 'CHANNEL'
+ Client #1 EOF
+ Client #3 UnSubscribed from 'CHANNEL'
+ Client #3 EOF
+ Client #2 Received 'MESSAGE 4' from channel 'CHANNEL'
+ Client #2 Received 'MESSAGE 5' from channel 'CHANNEL'
+ Client #2 UnSubscribed from 'CHANNEL'
+ Client #2 EOF
+ */
+ }
+ }
+}
View
1 Guanima.Redis.Tests/RedisClientTest.cs
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Net;
-using System.Threading;
using Guanima.Redis.Configuration;
using Guanima.Redis.KeyTransformers;
using Guanima.Redis.NodeLocators;
View
8 Guanima.Redis.Tests/BaseRedisClientTest.cs → ...ima.Redis.Tests/RedisClientTestFixture.cs
@@ -12,15 +12,15 @@
namespace Guanima.Redis.Tests
{
[TestFixture]
- public class BaseRedisClientTests
+ public class RedisClientTestFixture
{
public const int TestDb = 9;
protected RedisClient r;
[SetUp]
public virtual void Setup()
{
- r = GetClient();
+ r = CreateClient();
r.FlushDB();
}
@@ -32,10 +32,9 @@ public virtual void TearDown()
}
- protected virtual RedisClient GetClient()
+ protected virtual RedisClient CreateClient()
{
var client = GetSingleNodeClient();
- client.Select(TestDb);
return client;
}
@@ -54,6 +53,7 @@ protected virtual RedisClient GetSingleNodeClient()
mcc.SocketPool.MaxPoolSize = 100;
mcc.SocketPool.ConnectionTimeout = new TimeSpan(0, 0, 10);
mcc.SocketPool.DeadTimeout = new TimeSpan(0, 0, 30);
+ mcc.DefaultDB = 9;
return new RedisClient(mcc);
}
View
2 Guanima.Redis.Tests/SetTests.cs
@@ -4,7 +4,7 @@
namespace Guanima.Redis.Tests
{
[TestFixture]
- public class SetTests : BaseRedisClientTests
+ public class SetTests : RedisClientTestFixture
{
[Test]
View
2 Guanima.Redis.Tests/SortedSetTests.cs
@@ -4,7 +4,7 @@
namespace Guanima.Redis.Tests
{
[TestFixture]
- public class SortedSetTests : BaseRedisClientTests
+ public class SortedSetTests : RedisClientTestFixture
{
#region ZCard
View
12 Guanima.Redis.Tests/StringTests.cs
@@ -3,14 +3,14 @@
namespace Guanima.Redis.Tests
{
[TestFixture]
- public class StringTests : BaseRedisClientTests
+ public class StringTests : RedisClientTestFixture
{
[Test]
public void Can_Set_And_Get_String()
{
const string value = "value";
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
redis["key"] = value;
string valueString = redis["key"];
@@ -24,7 +24,7 @@ public void Can_Set_And_Get_Key_With_Spaces()
{
const string key = "key with spaces";
const string value = "value";
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
redis.Set(key, value);
var valueString = (string) redis[key];
@@ -44,7 +44,7 @@ public void Can_Set_And_Get_Key_With_All_Byte_Values()
value[i] = (byte)i;
}
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
redis[key] = value;
byte[] resultValue = redis[key];
@@ -66,7 +66,7 @@ public void Can_Store_An_Arbitrary_Byte_Array()
}
}
- using (RedisClient client = GetClient())
+ using (RedisClient client = CreateClient())
{
client.Set("BigBuffer", bigBuffer);
@@ -102,7 +102,7 @@ public void GetKeys_Returns_Matching_Collection()
[Test]
public void GetKeys_On_Non_Existent_Keys_Returns_Empty_Collection()
{
- using (var redis = GetClient())
+ using (var redis = CreateClient())
{
var matchingKeys = redis.GetKeys("ss-tests:NOTEXISTS");
View
7 Guanima.Redis.Tests/TransactionTests.cs
@@ -1,12 +1,11 @@
-using System;
-using System.Linq;
+using System.Linq;
using Guanima.Redis.Commands;
using NUnit.Framework;
namespace Guanima.Redis.Tests
{
[TestFixture]
- public class TransactionTests : BaseRedisClientTests
+ public class TransactionTests : RedisClientTestFixture
{
[Test]
public void Test_Transaction_Basics()
@@ -25,7 +24,7 @@ public void Test_Transaction_Basics()
lRangeCommand = cmds[0];
}
var expected = new[] {"a", "b", "c"};
- AssertListsAreEqual(expected, lRangeCommand.Result);
+ AssertListsAreEqual(expected, lRangeCommand.Value);
}
}
View
2 Guanima.Redis.Tests/Transcoders/DefaultTranscoderTestFixture.cs
@@ -67,7 +67,7 @@ public void Should_Persist_Primitive_Types_Properly()
public void Can_Properly_Serialize_Arbitrary_Classes()
{
var now = DateTime.Now;
- var user = new TestClass()
+ var user = new TestClass
{
Credits = 1.0,
IsActive = false,
View
1 Guanima.Redis/ClassDiagram1.cd
@@ -1 +0,0 @@
-
View
95 Guanima.Redis/Client/ClientAsynchState.cs
@@ -0,0 +1,95 @@
+using System;
+using System.Net.Sockets;
+using Guanima.Redis.Commands;
+using Guanima.Redis.Protocol;
+
+namespace Guanima.Redis
+{
+ public class ClientAsyncState
+ {
+ private PooledSocket Socket;
+ public IRedisNode Node;
+ // Client socket.
+ public Socket WorkSocket = null;
+ }
+
+ public class ClientAsyncWriteState : ClientAsyncState
+ {
+ // Size of receive buffer.
+ public const int BufferSize = 256;
+
+ // Receive buffer.
+ public byte[] Buffer = new byte[BufferSize];
+
+ public delegate void CommandSentHandler(object args, RedisCommand command);
+
+ // An event that clients can use to be notified whenever a
+ // value is retrieved from the socket.
+ public event CommandSentHandler CommandSent;
+
+ public object CallbackArg { get; set; }
+
+ internal void OnCommandWritten(RedisCommand command)
+ {
+ if (CommandSent != null)
+ CommandSent.Invoke(CallbackArg, command);
+ }
+ }
+
+
+ public class ClientAsyncReadState : ClientAsyncState
+ {
+ private RedisReplyParser _parser;
+ private int _expectedReplies = 1;
+ private int _receivedReplies;
+
+ public object CallbackArg { get; set; }
+
+ // Size of receive buffer.
+ public const int BufferSize = 1024;
+
+ // Receive buffer.
+ public byte[] Buffer = new byte[BufferSize];
+
+ public delegate void ValueReceivedHandler(ClientAsyncReadState state, object args, RedisValue value);
+
+ // An event that clients can use to be notified whenever a
+ // value is retrieved from the socket.
+ public event ValueReceivedHandler ValueReceived;
+
+ public int ExpectedReplies
+ {
+ get { return _expectedReplies; }
+ set { _expectedReplies = value; }
+ }
+
+ public int ReceivedReplies
+ {
+ get { return _receivedReplies; }
+ }
+
+ public bool IsComplete
+ {
+ get { return ReceivedReplies == ExpectedReplies; }
+ }
+
+ public RedisReplyParser ReplyParser
+ {
+ get
+ {
+ return _parser ?? (_parser = new RedisReplyParser(ReplyReceived, CallbackArg));
+ }
+ set
+ {
+ _parser = value;
+ }
+ }
+
+ private void ReplyReceived(object data, RedisValue value)
+ {
+ _receivedReplies++;
+ if (ValueReceived != null)
+ ValueReceived.Invoke(this, data, value);
+ }
+ }
+}
View
0 Guanima.Redis/IRedisClientTransaction.cs → ...a.Redis/Client/IRedisClientTransaction.cs
File renamed without changes.
View
0 Guanima.Redis/IRedisPipeline.cs → Guanima.Redis/Client/IRedisPipeline.cs
File renamed without changes.
View
8 Guanima.Redis/RedisClient.Connection.cs → ...ma.Redis/Client/RedisClient.Connection.cs
@@ -9,7 +9,7 @@ public partial class RedisClient
public bool Ping()
{
bool alive = true;
- ForeachServer(node=>
+ ForEachServer(node=>
{
if (alive)
alive = Ping(node);
@@ -35,7 +35,7 @@ public bool Ping(IRedisNode node)
public void Auth()
{
- ForeachServer(node =>
+ ForEachServer(node =>
{
if (!String.IsNullOrEmpty(node.Password))
{
@@ -47,7 +47,7 @@ public void Auth()
public void Auth(string password)
{
- ForeachServer(node => Execute(node, new AuthCommand(node.Password)));
+ ForEachServer(node => Execute(node, new AuthCommand(node.Password)));
}
public void Auth(IRedisNode node, string password)
@@ -62,7 +62,7 @@ public void Quit(IRedisNode node)
public void Quit()
{
- ForeachServer(node => Execute(node, new QuitCommand()));
+ ForEachServer(node => Execute(node, new QuitCommand()));
}
}
}
View
12 Guanima.Redis/RedisClient.Control.cs → Guanima.Redis/Client/RedisClient.Control.cs
@@ -14,7 +14,7 @@ public DateTime LastSave()
{
var command = new LastSaveCommand();
long timestamp = 0;
- ForeachServer(node=>
+ ForEachServer(node=>
{
var temp = ExecuteInt(node, command);
timestamp = Math.Max(temp, timestamp);
@@ -25,22 +25,22 @@ public DateTime LastSave()
public void Save()
{
- ForeachServer(new SaveCommand());
+ ForEachServer(new SaveCommand());
}
public void BgSave()
{
- ForeachServer(new BGSaveCommand());
+ ForEachServer(new BGSaveCommand());
}
public void BgRewriteAof()
{
- ForeachServer(new BGRewriteAOFCommand());
+ ForEachServer(new BGRewriteAOFCommand());
}
public void Shutdown()
{
- ForeachServer(new ShutdownCommand());
+ ForEachServer(new ShutdownCommand());
}
private Dictionary<String,RedisValue> GetInfo(IRedisNode node)
@@ -61,7 +61,7 @@ public void Shutdown()
{
// TODO: Crap out if more than one server in pool
Dictionary<string, RedisValue> info = null;
- ForeachServer(
+ ForEachServer(
node => { if (info == null) info = GetInfo(node); }
);
return info;
View
10 Guanima.Redis/RedisClient.Generic.cs → Guanima.Redis/Client/RedisClient.Generic.cs
@@ -72,7 +72,7 @@ public bool ExpireAt(string key, int unixTime)
public int DBSize()
{
int result = 0;
- ForeachServer(node => result += ExecuteInt(node, new DBSizeCommand()));
+ ForEachServer(node => result += ExecuteInt(node, new DBSizeCommand()));
return result;
}
@@ -92,7 +92,7 @@ public RedisValue Echo(RedisValue value)
// Maybe error out if more than 1 server ?
int count = 0;
var result = RedisValue.Empty;
- ForeachServer(node=>
+ ForEachServer(node=>
{
if (count == 0)
{
@@ -106,7 +106,7 @@ public RedisValue Echo(RedisValue value)
public RedisClient FlushAll()
{
- ForeachServer(node => Execute(node, new FlushAllCommand()));
+ ForEachServer(node => Execute(node, new FlushAllCommand()));
return this;
}
@@ -119,7 +119,7 @@ public RedisClient FlushAll(IRedisNode node)
public RedisClient FlushDB()
{
// TODO: cluster command
- ForeachServer(node => Execute(node, new FlushDBCommand()));
+ ForEachServer(node => Execute(node, new FlushDBCommand()));
return this;
}
@@ -196,7 +196,7 @@ public RedisClient Select(int db)
{
// raise warning
}
- ForeachServer(node=>Execute(node, new SelectCommand(db)));
+ ForEachServer(node=>Execute(node, new SelectCommand(db)));
_currentDb = db;
return this;
}
View
0 Guanima.Redis/RedisClient.Hashes.cs → Guanima.Redis/Client/RedisClient.Hashes.cs
File renamed without changes.
View
0 Guanima.Redis/RedisClient.Lists.cs → Guanima.Redis/Client/RedisClient.Lists.cs
File renamed without changes.
View
0 Guanima.Redis/RedisClient.Locks.cs → Guanima.Redis/Client/RedisClient.Locks.cs
File renamed without changes.
View
131 Guanima.Redis/Client/RedisClient.Pipeline.cs
@@ -0,0 +1,131 @@
+using System;
+using System.Collections.Generic;
+using Guanima.Redis.Client;
+using Guanima.Redis.Commands;
+
+namespace Guanima.Redis
+{
+ partial class RedisClient
+ {
+ // list of commands ordered by server and insertion order
+ private Dictionary<IRedisNode, RedisCommandQueue> _commandQueues;
+ private bool _pipelining = false;
+
+ // Globals list of queued commands in the order that they were added
+ private List<RedisCommand> _queuedCommandList;
+
+ public bool Pipelining
+ {
+ get { return _pipelining || InTransaction; }
+ }
+
+ public IDisposable BeginPipeline()
+ {
+ return BeginPipeline(false);
+ }
+
+ public IDisposable BeginPipeline(bool transactional)
+ {
+ if (InTransaction)
+ throw new RedisClientException("Pipelining not applicable in transactions.");
+ if (Pipelining)
+ throw new RedisException("Already pipelining.");
+ EnsureCommandQueue();
+ foreach(var queue in _commandQueues.Values)
+ {
+ queue.ReadAllResults();
+ }
+ _pipelining = true;
+ return new RedisPipeline(this, transactional);
+ }
+
+ private void EnsureCommandQueue()
+ {
+ if (_queuedCommandList == null)
+ _queuedCommandList = new List<RedisCommand>();
+ if (_commandQueues == null)
+ _commandQueues = new Dictionary<IRedisNode, RedisCommandQueue>();
+ }
+
+ internal void FlushPipeline()
+ {
+ FlushPipeline(false);
+ }
+
+ internal void FlushPipeline(bool transactional)
+ {
+ _pipelining = false;
+ try
+ {
+ foreach (var queue in _commandQueues.Values)
+ {
+ if (transactional)
+ queue.IsTransactional = false;
+ else
+ {
+ queue.ReadAllResults();
+ }
+ }
+ }
+ finally
+ {
+ _queuedCommandList = null;
+ }
+ }
+
+ internal void ClearPipeline()
+ {
+ if (_commandQueues != null)
+ {
+ foreach (var q in _commandQueues.Values)
+ {
+ q.Clear();
+ }
+ }
+ _queuedCommandList = null;
+ }
+
+ internal RedisCommandQueue GetCommandQueue(IRedisNode node)
+ {
+ EnsureCommandQueue();
+ RedisCommandQueue q;
+ if (!_commandQueues.TryGetValue(node, out q)) {
+ q = new RedisCommandQueue(node, InTransaction);
+ q.CurrentDB = this.CurrentDB;
+ _commandQueues[node] = q;
+ }
+ else
+ {
+ q.IsTransactional = InTransaction;
+ }
+ return q;
+ }
+
+ public void EnqueueCommand(IRedisNode node, RedisCommand command)
+ {
+ RedisCommandQueue q = GetCommandQueue(node);
+ q.Enqueue(command);
+ // if (Pipelined || InTransaction)
+ _queuedCommandList.Add(command);
+ }
+
+ public void Flush()
+ {
+ foreach (var q in _commandQueues.Values)
+ {
+ q.Flush();
+ }
+ }
+
+ internal IEnumerable<RedisCommand> QueuedCommands
+ {
+ get
+ {
+ if (_queuedCommandList == null)
+ return new List<RedisCommand>();
+ return _queuedCommandList;
+ }
+ }
+
+ }
+}
View
0 Guanima.Redis/RedisClient.Poco.cs → Guanima.Redis/Client/RedisClient.Poco.cs
File renamed without changes.
View
84 Guanima.Redis/Client/RedisClient.PubSub.cs
@@ -0,0 +1,84 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Guanima.Redis.Client;
+using Guanima.Redis.Commands.PubSub;
+using Guanima.Redis.Extensions;
+using Guanima.Redis.Protocol;
+
+namespace Guanima.Redis
+{
+ public partial class RedisClient
+ {
+ public int Publish(IRedisNode node, string toChannel, RedisValue message)
+ {
+ return ExecuteInt(node, new PublishCommand(toChannel, message));
+ }
+
+ public int Publish(String toChannel, RedisValue message)
+ {
+ int count = 0;
+ ForEachServer(node => count+=Publish(node, toChannel, message));
+ return count;
+ }
+
+ public RedisValue ReceivePublishedMessage(IRedisNode node)
+ {
+ using (var socket = node.Acquire())
+ {
+ return socket.ExpectMultiBulkReply();
+ }
+ }
+
+ internal RedisValue ReceivePublishedMessage(PooledSocket socket, ref string channel)
+ {
+ byte[][] result = socket.ExpectMultiBulkReply();
+ channel = result[1].FromUtf8();
+ return result[2];
+ }
+
+ public RedisValue Subscribe(IRedisNode node, IEnumerable<String> toChannels)
+ {
+ if (toChannels.Count() == 0)
+ throw new ArgumentNullException("toChannels");
+ return ExecValue(node, new SubscribeCommand(toChannels));
+ }
+
+ public RedisValue Subscribe(IRedisNode node, params string[] toChannels)
+ {
+ if (toChannels.Length == 0)
+ throw new ArgumentNullException("toChannels");
+ return ExecValue(node, new SubscribeCommand(toChannels));
+ }
+
+ public RedisValue UnSubscribe(IRedisNode node, params string[] fromChannels)
+ {
+ return ExecValue(node, new UnsubscribeCommand(fromChannels));
+ }
+
+ public RedisValue PSubscribe(IRedisNode node, IEnumerable<String> toChannelsMatchingPatterns)
+ {
+ if (toChannelsMatchingPatterns.Count() == 0)
+ throw new ArgumentNullException("toChannelsMatchingPatterns");
+ return ExecValue(node, new PSubscribeCommand(toChannelsMatchingPatterns));
+ }
+
+ public RedisValue PSubscribe(IRedisNode node, params string[] toChannelsMatchingPatterns)
+ {
+ if (toChannelsMatchingPatterns.Length == 0)
+ throw new ArgumentNullException("toChannelsMatchingPatterns");
+ return ExecValue(node, new PSubscribeCommand(toChannelsMatchingPatterns));
+ }
+
+ public RedisValue PUnSubscribe(IRedisNode node, params string[] fromChannelsMatchingPatterns)
+ {
+ return ExecValue(node, new PUnsubscribeCommand(fromChannelsMatchingPatterns));
+ }
+
+ public IRedisSubscription CreateSubscription()
+ {
+ return new RedisSubscription(this);
+ }
+
+ }
+}
View
0 Guanima.Redis/RedisClient.Sets.cs → Guanima.Redis/Client/RedisClient.Sets.cs
File renamed without changes.
View
2 Guanima.Redis/RedisClient.SortedSets.cs → ...ma.Redis/Client/RedisClient.SortedSets.cs
@@ -108,7 +108,6 @@ public double ZScore(string key, RedisValue value)
public int ZInter(string destKey, IEnumerable<string> keys)
{
- // TODO: Cluster warning
var transformedKey = TransformKey(destKey);
var transformedKeys = TransformKeys(keys);
@@ -120,7 +119,6 @@ public int ZInter(string destKey, IEnumerable<string> keys)
public int ZInter(string destKey, IEnumerable<string> keys, IEnumerable<double> weights, AggregateType aggregateType)
{
- // TODO: Cluster warning
var transformedKey = TransformKey(destKey);
var transformedKeys = TransformKeys(keys);
View
4 Guanima.Redis/RedisClient.Strings.cs → Guanima.Redis/Client/RedisClient.Strings.cs
@@ -75,13 +75,13 @@ public IList<String> GetKeys(String pattern)
return result;
}
- public int Incr(string key)
+ public long Incr(string key)
{
var transformedKey = TransformKey(key);
return ExecuteInt(transformedKey, new IncrCommand(transformedKey));
}
- public int Incr(string key, int delta)
+ public long Incr(string key, long delta)
{
var transformedKey = TransformKey(key);
return ExecuteInt(transformedKey, new IncrByCommand(transformedKey, delta));
View
18 Guanima.Redis/RedisClient.Transactions.cs → ....Redis/Client/RedisClient.Transactions.cs
@@ -1,6 +1,4 @@
-using Guanima.Redis.Commands.Transactions;
-
-namespace Guanima.Redis
+namespace Guanima.Redis
{
partial class RedisClient
{
@@ -26,21 +24,15 @@ public IRedisClientTransaction BeginTransaction()
// dont know if i like this
internal void FinishTransaction()
{
- if (InTransaction && _queuedCommands != null)
+ if (InTransaction && _queuedCommandList != null)
{
try
{
- foreach (var kvp in _queuedCommands)
+ foreach (var kvp in _commandQueues)
{
var node = kvp.Key;
- var commandsForThisNode = kvp.Value;
-
- using(var socket = node.Acquire())
- {
- var command = new MultiExecCommand(commandsForThisNode);
- protocolHandler.Socket = socket;
- command.Execute(protocolHandler);
- }
+ // TODO: Fix this....
+ kvp.Value.ReadAllResults();
}
}
catch(RedisClientException ex)
View
138 Guanima.Redis/RedisClient.cs → Guanima.Redis/Client/RedisClient.cs
@@ -3,11 +3,9 @@
using System.Configuration;
using System.Linq;
using System.Text;
-using Guanima.Redis.Commands.Connection;
-using Guanima.Redis.Commands.Generic;
+using Guanima.Redis.Client;
using Guanima.Redis.Configuration;
using Guanima.Redis.Commands;
-using Guanima.Redis.Protocol;
using Guanima.Redis.Utils;
namespace Guanima.Redis
@@ -24,7 +22,6 @@ public partial class RedisClient : Disposable
private IServerPool _serverPool;
private Stack<IRedisNode> _onNodeStack;
- private RedisProtocol protocolHandler = new RedisProtocol();
#region Constructor
@@ -69,6 +66,7 @@ private void Initialize(IRedisClientConfiguration configuration)
Initialize(pool);
_serverPool = pool;
_currentDb = configuration.DefaultDB; // ???
+ _commandQueues = new Dictionary<IRedisNode, RedisCommandQueue>();
}
private static void Initialize(IServerPool pool)
@@ -85,6 +83,15 @@ private static void Initialize(IServerPool pool)
#region Nodes/Servers
+ internal IEnumerable<IRedisNode> GetNodes()
+ {
+ if (_onNodeStack != null && _onNodeStack.Count > 0)
+ {
+ return new IRedisNode[]{ _onNodeStack.Peek() };
+ }
+ return _serverPool.GetServers();
+ }
+
public IDisposable On(string alias)
{
var node = GetNodeByAlias(alias);
@@ -200,7 +207,7 @@ protected bool Clustering
}
}
- protected void ForeachServer(Action<IRedisNode> action)
+ protected void ForEachServer(Action<IRedisNode> action)
{
if (_onNodeStack != null && _onNodeStack.Count > 0)
{
@@ -216,69 +223,27 @@ protected void ForeachServer(Action<IRedisNode> action)
}
}
- protected void ForeachServer(RedisCommand command)
+ protected void ForEachServer(RedisCommand command)
{
if (_onNodeStack != null && _onNodeStack.Count > 0)
{
Execute(_onNodeStack.Peek(), command);
return;
}
- ForeachServer(node => Execute(node, command));
+ ForEachServer(node => Execute(node, command));
}
#endregion
#region Sockets
- protected void DisposeSocket(PooledSocket socket)
+ internal void DisposeSocket(PooledSocket socket)
{
if (socket != null)
((IDisposable)socket).Dispose();
}
-
-
- protected PooledSocket AcquireSocket(IRedisNode node)
- {
- if (CheckDisposed(true))
- return null;
- var socket = node.Acquire();
- if (socket == null)
- throw new RedisClientException("Unable to acquire socket for node : '" + node.EndPoint + "'");
-
- try
- {
- if (!String.IsNullOrEmpty(node.Password) && !socket.IsAuthorized)
- {
- protocolHandler.Socket = socket;
- var command = new AuthCommand(node.Password);
- string status = command.Execute(protocolHandler);
- if (status != "OK")
- {
- throw new RedisAuthenticationException("Invalid credentials for node : " + node.Alias);
- }
- socket.IsAuthorized = true;
- }
- // Select proper db if specified in config or (socket.CurrentDB <> currentDB)
- // Read the comments on Select to get some background on the following.
- // Im not sure i like this.
- if (socket.CurrentDb != CurrentDB)
- {
- protocolHandler.Socket = socket;
- var command = new SelectCommand(CurrentDB);
- command.Execute(protocolHandler);
- socket.CurrentDb = CurrentDB;
- }
- }
- catch
- {
- DisposeSocket(socket);
- throw;
- }
-
- return socket;
- }
-
+
#endregion
#region Command Execution
@@ -319,16 +284,6 @@ private void EnsureNotClustered(string commandName, string transformedDestKey, I
}
//
- private bool CheckEnqueue(IRedisNode node, RedisCommand command)
- {
- if (Pipelining || InTransaction)
- {
- EnqueueCommand(node, command);
- return true;
- }
- return false;
- }
-
protected void HandleException(Exception ex)
{
@@ -344,21 +299,14 @@ protected void Execute(String key, RedisCommand command)
}
- public RedisClient Execute(IRedisNode node, RedisCommand command)
- {
- Execute(node, command, true);
- return this;
- }
-
- internal void Execute(IRedisNode node, RedisCommand command, bool possiblyQueued)
+ internal void Execute(IRedisNode node, RedisCommand command)
{
- if (possiblyQueued && CheckEnqueue(node, command))
+ EnqueueCommand(node, command);
+ if (Pipelining || InTransaction)
return;
- var socket = AcquireSocket(node);
try
{
- protocolHandler.Socket = socket;
- command.Execute(protocolHandler);
+ var temp = command.Value;
}
catch (Exception e)
{
@@ -367,10 +315,6 @@ internal void Execute(IRedisNode node, RedisCommand command, bool possiblyQueued
HandleException(e);
throw;
}
- finally
- {
- DisposeSocket(socket);
- }
}
@@ -379,22 +323,17 @@ protected RedisValue ExecValue(String key, RedisCommand command)
var node = GetNodeForTransformedKey(key);
return ExecValue(node, command);
}
-
- public RedisValue ExecValue(IRedisNode node, RedisCommand command)
- {
- return ExecValue(node, command, true);
- }
-
- private RedisValue ExecValue(IRedisNode node, RedisCommand command, bool possiblyQueued)
+
+ private RedisValue ExecValue(IRedisNode node, RedisCommand command)
{
- if (possiblyQueued && CheckEnqueue(node, command))
+ EnqueueCommand(node, command);
+
+ if (Pipelining || InTransaction)
return RedisValue.Empty;
- var socket = AcquireSocket(node);
+
try
{
- protocolHandler.Socket = socket;
- command.Execute(protocolHandler);
- return command.Result;
+ return command.Value;
}
catch (Exception e)
{
@@ -403,27 +342,19 @@ private RedisValue ExecValue(IRedisNode node, RedisCommand command, bool possibl
HandleException(e);
throw;
}
- finally
- {
- DisposeSocket(socket);
- }
}
protected int ExecuteInt(String key, RedisCommand command)
{
var val = ExecValue(key, command);
- if (Pipelining)
- return 0;
- return (int)val;
+ return Pipelining ? 0 : (int) val;
}
protected int ExecuteInt(IRedisNode node, RedisCommand command)
{
var val = ExecValue(node, command);
- if (Pipelining)
- return 0;
- return (int)val;
+ return Pipelining ? 0 : (int) val;
}
@@ -481,11 +412,18 @@ protected override void Release()
throw new ObjectDisposedException("RedisClient");
try
{
- _serverPool.Dispose();
+ FlushPipeline();
}
finally
{
- _serverPool = null;
+ try
+ {
+ _serverPool.Dispose();
+ }
+ finally
+ {
+ _serverPool = null;
+ }
}
}
View
0 Guanima.Redis/RedisClientTransaction.cs → ...ma.Redis/Client/RedisClientTransaction.cs
File renamed without changes.
View
482 Guanima.Redis/Client/RedisCommandQueue.cs
@@ -0,0 +1,482 @@
+using System;
+using System.Collections.Generic;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using Guanima.Redis.Commands;
+using Guanima.Redis.Commands.Connection;
+using Guanima.Redis.Commands.Generic;
+using Guanima.Redis.Commands.Transactions;
+using Guanima.Redis.Extensions;
+using Guanima.Redis.Utils;
+
+namespace Guanima.Redis.Client
+{
+ public class RedisCommandQueue : Disposable
+ {
+ private readonly byte[] CrLf = new[] { (byte)'\r', (byte)'\n' };
+ private static log4net.ILog log = log4net.LogManager.GetLogger(typeof(RedisCommandQueue));
+
+ private readonly Queue<RedisCommand> _sendQueue;
+ private readonly Queue<RedisCommand> _receiveQueue;
+ private readonly ManualResetEvent _sendMre = new ManualResetEvent(false);
+ private readonly ManualResetEvent _receiveMre = new ManualResetEvent(false);
+ private CommandBuffer _commandBuffer;
+ private PooledSocket _socket;
+ private readonly IRedisNode _node;
+
+ private bool _sendAllOnRead;
+ private bool _transactional;
+ private int _currentDB;
+ private int _multiCount;
+
+ public RedisCommandQueue(IRedisNode node)
+ :this(node, false)
+ {
+ }
+
+ public RedisCommandQueue(IRedisNode node, bool transactional)
+ {
+ _transactional = transactional;
+ _node = node;
+ _sendQueue = new Queue<RedisCommand>();
+ _receiveQueue = new Queue<RedisCommand>();
+ _commandBuffer = new CommandBuffer();
+ }
+
+ public IRedisNode Node
+ {
+ get { return _node; }
+ }
+
+ public PooledSocket Socket
+ {
+ get
+ {
+ _socket = _socket ?? AcquireSocket();
+ return _socket;
+ }
+ }
+
+ public bool IsTransactional
+ {
+ get { return _transactional; }
+ set { SetTransactional(value); }
+ }
+
+ public int CurrentDB
+ {
+ get { return _currentDB; }
+ set { SetCurrentDB(value); }
+ }
+
+ private void SetCurrentDB(int db)
+ {
+ if (_currentDB != db)
+ {
+ _currentDB = db;
+ Flush();
+ }
+ }
+
+ private void SetTransactional(bool value)
+ {
+ if (value != _transactional)
+ {
+ // flush what we have now
+ Flush();
+ _transactional = value;
+ }
+ }
+
+ public bool SendAllOnRead
+ {
+ get { return _sendAllOnRead; }
+ set { _sendAllOnRead = value; }
+ }
+
+ public int Length
+ {
+ get { return _sendQueue.Count + _receiveQueue.Count; }
+ }
+
+ public RedisValue ExecValue(RedisCommand command)
+ {
+ Enqueue(command);
+ ReadResultForCommand(command);
+ return command.Value;
+ }
+
+ public RedisCommand EnqueueQueue(params RedisValue[] elements)
+ {
+ return Enqueue( new AdHocCommand(elements) );
+ }
+
+
+ public RedisCommand Enqueue(RedisCommand command)
+ {
+ var empty = _sendQueue.Count == 0;
+ if (empty)
+ {
+ var socket = Socket;
+ RedisCommand cmd = null;
+
+ if (!socket.IsAuthorized && !String.IsNullOrEmpty(Node.Password) && !IsAuth(command))
+ {
+ cmd = new AuthCommand(Node.Password);
+ _sendQueue.Enqueue( cmd );
+ }
+
+ // Select proper db if specified in config or (socket.CurrentDB <> currentDB)
+ // Read the comments on Select to get some background on the following.
+ // Im not sure i like this.
+ if (socket.CurrentDb != CurrentDB && !IsSelect(command))
+ {
+ cmd = new SelectCommand(CurrentDB);
+ _sendQueue.Enqueue(cmd);
+ }
+ if (cmd != null)
+ cmd.Queue = this;
+ }
+ if (empty && IsTransactional)
+ {
+ _sendQueue.Enqueue(new MultiCommand());
+ _multiCount++;
+ }
+ if (IsMulti(command))
+ {
+ if (IsTransactional)
+ throw new RedisClientException("Cannot nest transactions");
+ _multiCount++;
+ }
+ if (IsExec(command))
+ {
+ //if (_multiCount == 1)
+ }
+ _sendQueue.Enqueue(command);
+ command.Queue = this;
+ return command;
+ }
+
+ public void SendAllCommands()
+ {
+ if (_sendQueue.Count == 0)
+ return;
+
+ if (IsTransactional)
+ {
+ _sendQueue.Enqueue(new ExecCommand());
+ }
+ while (_sendQueue.Count != 0)
+ {
+ var command = _sendQueue.Dequeue();
+ BufferOutput(command);
+ _receiveQueue.Enqueue(command);
+ }
+ FlushCommandBuffer();
+ }
+
+ public void ReadAllResults()
+ {
+ SendAllCommands();
+ BeginReadReplies();
+ }
+
+ void SendCommand(RedisCommand command)
+ {
+ if (_sendQueue.Contains(command))
+ {
+ RedisCommand dequeued;
+ do
+ {
+ dequeued = _sendQueue.Dequeue();
+ BufferOutput(dequeued);
+ _receiveQueue.Enqueue(dequeued);
+ } while (command != dequeued);
+ FlushCommandBuffer();
+ }
+ }
+
+ internal void ReadResultForCommand(RedisCommand command)
+ {
+ if (_sendQueue.Contains(command))
+ {
+ if (_sendAllOnRead)
+ SendAllCommands();
+ else
+ SendCommand(command);
+ }
+ if (_receiveQueue.Contains(command))
+ {
+ BeginReadReplies();
+ }
+ }
+
+
+ public void Clear()
+ {
+ // Drain pending responses
+ BeginReadReplies();
+ // kill unsent commands
+ _sendQueue.Clear();
+ }
+
+ public void Flush()
+ {
+ SendAllCommands();
+ ReadAllResults();
+ }
+
+
+ protected override void Release()
+ {
+ try
+ {
+ Flush();
+ }
+ finally
+ {
+ DisposeSocket(_socket);
+ }
+ }
+
+
+ static void DisposeSocket(IDisposable socket)
+ {
+ if (socket != null)
+ socket.Dispose();
+ }
+
+
+ PooledSocket AcquireSocket()
+ {
+ var socket = Node.Acquire();
+ if (socket == null)
+ throw new RedisClientException("Unable to acquire socket for node : '" + Node.EndPoint + "'");
+ return socket;
+ }
+
+ #region Utils
+
+ static bool CommandNameIs(RedisCommand command, string name)
+ {
+ var cmdName = command.Arguments[0].Text;
+ return cmdName.Equals(name, StringComparison.OrdinalIgnoreCase);
+ }
+
+ static bool CommandNameIs(RedisCommand command, byte[] name)
+ {
+ var cmdName = command.Arguments[0];
+ if (name.IsEqualTo(cmdName))
+ return true;
+ var temp = Encoding.UTF8.GetString(name);
+ return CommandNameIs(command, temp);
+ }
+
+ static bool IsMulti(RedisCommand command)
+ {
+ return (command is MultiCommand) || CommandNameIs(command, Command.Multi);
+ }
+
+ static bool IsExec(RedisCommand command)
+ {
+ return (command is ExecCommand) || CommandNameIs(command, Command.Exec);
+ }
+
+ static bool IsAuth(RedisCommand command)
+ {
+ return (command is AuthCommand) || CommandNameIs(command, Command.Auth);
+ }
+
+ static bool IsSelect(RedisCommand command)
+ {
+ return (command is SelectCommand) || CommandNameIs(command, Command.Select);
+ }
+
+
+ /// <summary>
+ /// Command to set multiple binary safe arguments
+ /// </summary>
+ /// <param name="command"></param>
+ /// <returns></returns>
+ void BufferOutput(RedisCommand command)
+ {
+ _commandBuffer.Append(command);
+ }
+
+ private void FlushCommandBuffer()
+ {
+ if (_commandBuffer.Size > 0)
+ {
+ CheckDisposed();
+ var state = new ClientAsyncWriteState
+ {
+ WorkSocket = Socket.Socket,
+ Buffer = _commandBuffer.Data,
+ CallbackArg = this
+ };
+
+ state.WorkSocket.BeginSend(state.Buffer, 0, _commandBuffer.Size,
+ SocketFlags.None,
+ new AsyncCallback(EndSendCommandBuffer), state);
+
+ _commandBuffer.Reset(); // important that this DOES NOT deallocate buffer
+ _sendMre.WaitOne();
+ _sendMre.Reset();
+ }
+ }
+
+ static void EndSendCommandBuffer(IAsyncResult ar)
+ {
+ var state = (ClientAsyncWriteState)ar.AsyncState;
+ var client = state.WorkSocket;
+ client.EndSend(ar);
+ var self = (RedisCommandQueue) state.CallbackArg;
+ self._sendMre.Set();
+ }
+
+
+ private ClientAsyncReadState CreateAsyncReadState()
+ {
+ int expectedReplies = _receiveQueue.Count;
+ var state = new ClientAsyncReadState
+ {
+ WorkSocket = Socket.Socket,
+ ExpectedReplies = expectedReplies,
+ CallbackArg = this
+ };
+
+ bool execExpected = false;
+ state.ValueReceived += (stateObject, data, value) => HandleValueReceived(stateObject, data, value, ref execExpected);
+
+ return state;
+ }
+
+ private static void HandleValueReceived(ClientAsyncReadState state, object data, RedisValue value, ref bool execExpected)
+ {
+ var self = (RedisCommandQueue)data;
+
+ var dequeued = self._receiveQueue.Peek();
+
+ if (value.Type == RedisValueType.Error)
+ {
+ if (IsAuth(dequeued))
+ {
+ self._receiveQueue.Dequeue();
+ throw new RedisAuthenticationException(value.Text);
+ }
+ throw new RedisClientException(value.Text);
+ }
+
+ var socket = self.Socket;
+
+ if (IsMulti(dequeued))
+ {
+ execExpected = true;
+ // value here should be "OK"
+ dequeued.Value = value;
+ return;
+ }
+
+ if (IsExec(dequeued))
+ {
+ if (!execExpected)
+ {
+ // throw
+ throw new RedisClientException("EXEC found without a matching MULTI");
+ }
+ execExpected = false;
+ self._multiCount--;
+ }
+ else if (execExpected)
+ {
+ // value should be "QUEUED" for each command between multi and exec
+ return;
+ }
+
+ self._receiveQueue.Dequeue();
+
+ if (!socket.IsAuthorized && IsAuth(dequeued))
+ {
+ if (value.Text != "OK")
+ throw new RedisAuthenticationException("Invalid credentials for node : " + self.Socket);
+ socket.IsAuthorized = true;
+ dequeued.Value = value;
+ }
+ else if (socket.CurrentDb != self.CurrentDB && IsSelect(dequeued))
+ {
+ socket.CurrentDb = self.CurrentDB;
+ }
+
+ dequeued.Value = value;
+ }
+
+
+ void BeginReadReplies()
+ {
+ if (_receiveQueue.Count > 0)
+ {
+ var state = CreateAsyncReadState();
+ BeginReadReplies(state);
+ }
+ }
+
+ static void BeginReadReplies(ClientAsyncReadState state)
+ {
+ var client = state.WorkSocket;
+
+ client.BeginReceive(state.Buffer, 0, ClientAsyncReadState.BufferSize, SocketFlags.None,
+ new AsyncCallback(EndReadReplies), state);
+
+ var self = (RedisCommandQueue)state.CallbackArg;
+ self._receiveMre.WaitOne();
+ self._receiveMre.Reset();
+ }
+
+ static void EndReadReplies(IAsyncResult ar)
+ {
+ var state = (ClientAsyncReadState) ar.AsyncState;
+ var self = (RedisCommandQueue)state.CallbackArg;
+ SocketError errorCode;
+ int bytesRead = state.WorkSocket.EndReceive(ar, out errorCode);
+
+ if (errorCode != SocketError.Success)
+ {
+ throw new RedisClientException("Socket error reading reply : " + Enum.GetName(typeof(SocketError), errorCode));
+ }
+
+ if (bytesRead > 0 && self._receiveQueue.Count > 0)
+ {
+ state.ReplyParser.Update(state.Buffer, 0, bytesRead);
+ if (self._receiveQueue.Count > 0)
+ BeginReadReplies(state);
+ else
+ {
+ // TODO: Do we need to see if there is more data pending ?
+ self._receiveMre.Set();
+ }
+ }
+ else
+ {
+ self._receiveMre.Set();
+ if (self._receiveQueue.Count > 0)
+ {
+ throw new RedisClientException(
+ String.Format("Invalid number of bulk responses. Expected {0}, Got {1}",
+ state.ExpectedReplies, state.ReceivedReplies));
+ }
+ self._multiCount = 0;
+ }
+ }
+
+
+ private void CheckDisposed()
+ {
+ if (Socket == null || Socket.Socket == null)
+ throw new ObjectDisposedException("PooledSocket");
+ }
+
+
+ #endregion
+ }
+}
View
0 Guanima.Redis/RedisLockToken.cs → Guanima.Redis/Client/RedisLockToken.cs
File renamed without changes.
View
546 Guanima.Redis/Client/RedisSubscription.cs
@@ -0,0 +1,546 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net.Sockets;
+using System.Threading;
+using Guanima.Redis.Commands;
+using Guanima.Redis.Commands.PubSub;
+using Guanima.Redis.Extensions;
+using Guanima.Redis.Utils;
+using Guanima.Redis.Protocol;
+
+namespace Guanima.Redis.Client
+{
+ public class SubscriptionState : Disposable
+ {
+ private readonly object _lockObject = new object();
+ private PooledSocket _socket;
+ private RedisReplyParser _parser;
+ private ArraySegment<byte>? _readBuffer;
+ private readonly IRedisNode _node;
+ private readonly CommandBuffer _commandBuffer = new CommandBuffer();
+ private readonly List<string> _activeChannels = new List<string>();
+ public RedisSubscription Subscription;
+ public IRedisNode Node { get { return _node; } }
+
+ public PooledSocket Socket
+ {
+ get { return (_socket ?? (_socket = Node.Acquire())); }
+ }
+
+ public Socket RawSocket { get { return Socket.Socket; } }
+
+ public CommandBuffer CommandBuffer { get { return _commandBuffer; } }
+
+ public ArraySegment<byte> ReadBuffer
+ {
+ get
+ {
+ if (_readBuffer == null)
+ {
+ int dummy;
+ _readBuffer = Subscription.GetBuffer(out dummy);
+ }
+ return _readBuffer.Value;
+ }
+ }
+
+ public void FreeReadBuffer()
+ {
+ if (_readBuffer != null)
+ {
+ Subscription.FreeBuffer(_readBuffer.Value);
+ _readBuffer = null;
+ }
+ }
+
+ public ManualResetEvent NoMoreSubscriptions;
+ public ManualResetEvent SendDone;
+
+ public List<String> Channels { get { return _activeChannels; } }
+
+ public int SubscriptionCount
+ {
+ get
+ {
+ return (_activeChannels == null) ? 0 : _activeChannels.Count;
+ }
+ }
+
+
+ public SubscriptionState(IRedisNode node)
+ {
+ _node = node;
+ NoMoreSubscriptions = new ManualResetEvent(false);
+ SendDone = new ManualResetEvent(false);
+ }
+
+ public void BufferCommand(byte[] name, IEnumerable<String> keys)
+ {
+ if (keys == null || keys.Count() == 0)
+ {
+ CommandBuffer.Append(new byte[][] {name});
+ }
+ else
+ {
+ var args = CommandUtils.ConstructParameters(name, keys);
+ CommandBuffer.Append(args);
+ }
+ }
+
+ public void BufferCommand(RedisCommand command)
+ {
+ CommandBuffer.Append(command);
+ }
+
+ public RedisReplyParser ReplyParser
+ {
+ get { return _parser ?? (_parser = new RedisReplyParser(ReplyReceived, null));}
+ }
+
+ private void ReplyReceived(object data, RedisValue value)
+ {
+ //
+ if (value.Type == RedisValueType.Error)
+ {
+ // set
+ throw new RedisClientException(value.Text);
+ }
+ Subscription.HandleMessage(this, value.MultiBulkValues);
+ }
+
+
+ public void DoLock(Action action)
+ {
+ if (action != null)
+ {
+ lock (_lockObject)
+ {
+ action();
+ }
+ }
+ }
+
+ protected override void Release()
+ {
+ if (Socket != null)
+ {
+ ((IDisposable)Socket).Dispose();
+ }
+ FreeReadBuffer();
+ }
+ }
+
+
+ public class RedisSubscription : Disposable, IRedisSubscription
+ {
+ // TODO: Make this configurable
+ public const int ReadBufferSize = 2048;
+
+ private readonly RedisClient _redisClient;
+ private readonly List<IRedisNode> _nodes = new List<IRedisNode>();
+ private readonly IDictionary<IRedisNode, SubscriptionState> _states = new Dictionary<IRedisNode, SubscriptionState>();
+ private List<string> _activeChannels;
+ private readonly ManualResetEvent _shutdownCompleted = new ManualResetEvent(false);
+ private readonly ManualResetEvent _messageReceived = new ManualResetEvent(false);
+ private readonly ManualResetEvent _noSubscriptions = new ManualResetEvent(false);
+
+ private readonly BufferManager _bufferManager;
+
+ private int _subscriptionCount = 0;
+ private int _shutdown = 0;
+
+ private static readonly byte[] SubscribeWord = "subscribe".ToUtf8ByteArray();
+ private static readonly byte[] UnSubscribeWord = "unsubscribe".ToUtf8ByteArray();
+ private static readonly byte[] MessageWord = "message".ToUtf8ByteArray();
+ private readonly object _lockObject = new object();
+
+ public RedisSubscription(RedisClient client)
+ {
+ _redisClient = client;
+ SubscriptionCount = 0;
+ _activeChannels = new List<string>();
+ _nodes.AddRange(client.GetNodes());
+ _bufferManager = new BufferManager(10, ReadBufferSize);
+ }
+
+ public RedisSubscription(RedisClient client, params IRedisNode[] nodes)
+ {
+ _redisClient = client;
+ SubscriptionCount = 0;
+ _activeChannels = new List<string>();
+ _nodes.AddRange(nodes);
+ }
+
+ public Action<string> OnSubscribe { get; set; }
+ public Action<string, byte[]> OnMessage { get; set; }
+ public Action<string> OnUnSubscribe { get; set; }
+ public Action<Exception> OnException { get; set; }
+
+ public int SubscriptionCount
+ {
+ get { return _subscriptionCount;}
+ private set { _subscriptionCount = value; }
+ }
+
+ public void Subscribe(params string[] channels)
+ {
+ Subscribe((IEnumerable<string>)channels);
+ }
+
+ public void Subscribe(IEnumerable<string> channels)
+ {
+ var chans = new List<string>();
+ var patterns = new List<string>();
+ SplitChannels(channels, chans, patterns);
+
+ int count = chans.Count + patterns.Count;
+ if (count > 0)
+ {
+ foreach (var node in _nodes)
+ {
+ var state = GetSubscriptionState(node);
+ if (patterns.Count > 0)
+ {
+ state.BufferCommand(Command.PSubscribe, patterns);
+ }
+ if (chans.Count > 0)
+ {
+ state.BufferCommand(Command.Subscribe, chans);
+ }
+ BeginSend(state);
+ }
+ }
+ }
+
+
+ public void UnSubscribe(params string[] channels)
+ {
+ if (channels.Length == 0)
+ {
+ UnSubscribeAll();
+ return;
+ }
+ UnSubscribe((IEnumerable<string>)channels);
+ }
+
+ public void UnSubscribe(IEnumerable<string> channels)
+ {
+ var chans = new List<string>();
+ var patterns = new List<string>();
+ SplitChannels(channels, chans, patterns);
+
+ var handles = new List<ManualResetEvent>();
+ int count = chans.Count + patterns.Count;
+ if (count > 0)
+ {
+ foreach (var node in _nodes)
+ {
+ var state = GetSubscriptionState(node);
+ state.CommandBuffer.Reset();
+
+ if (patterns.Count > 0)
+ {
+ state.BufferCommand(Command.PUnSubscribe, patterns);
+ }
+ if (chans.Count > 0)
+ {
+ state.BufferCommand(Command.UnSubscribe, chans);
+ }
+
+ BeginSend(state);
+ }
+ }
+ }
+
+ internal void CallSubscribeHandler(int delta)
+ {
+
+ }
+
+ public void UnSubscribeAll()
+ {
+ //if (_activeChannels.Count == 0) return;
+ if (HasSubscriptions)
+ {
+ foreach (var node in _nodes)
+ {
+ var state = GetSubscriptionState(node);
+ if (state.Channels.Count > 0)
+ {
+ state.BufferCommand( new UnsubscribeCommand());
+ BeginSend(state);
+ }
+ }
+ _noSubscriptions.WaitOne();
+ }
+ _noSubscriptions.Reset();
+ _activeChannels = new List<string>();
+ }
+
+
+ protected override void Release()
+ {
+ UnSubscribeAll();
+ }
+
+ #region Utils
+