Permalink
Browse files

Support pattern subscriptions as well

  • Loading branch information...
mythz committed May 26, 2012
1 parent 259f623 commit a60d66708213555eca0f6eb38050638fd0bd71b6
Showing with 245 additions and 189 deletions.
  1. +1 −1 build/build.bat
  2. +38 −11 src/ServiceStack.Redis/RedisSubscription.cs
  3. +206 −177 tests/ServiceStack.Redis.Tests/RedisPubSubTests.cs
View
@@ -5,7 +5,7 @@ COPY ..\src\ServiceStack.Redis\bin\%BUILD%\ServiceStack.Redis.* ..\NuGet\lib\net
COPY ..\src\ServiceStack.Redis\bin\%BUILD%\ServiceStack.* ..\..\ServiceStack\release\latest\ServiceStack.Redis
-COPY ..\src\ServiceStack.Redis\bin\%BUILD%\ServiceStack.* ..\..\ServiceStack\lib\tests
+COPY ..\src\ServiceStack.Redis\bin\%BUILD%\ServiceStack.* ..\..\ServiceStack\lib
COPY ..\src\ServiceStack.Redis\bin\%BUILD%\ServiceStack.* ..\..\ServiceStack.Examples\lib
COPY ..\src\ServiceStack.Redis\bin\%BUILD%\ServiceStack.Redis.* ..\..\ServiceStack.Contrib\lib
COPY ..\src\ServiceStack.Redis\bin\%BUILD%\ServiceStack.Redis.* ..\..\ServiceStack.RedisWebServices\lib
@@ -11,10 +11,14 @@ public class RedisSubscription
private readonly IRedisNativeClient redisClient;
private List<string> activeChannels;
public int SubscriptionCount { get; private set; }
+ public bool IsPSubscription { get; private set; }
private static readonly byte[] SubscribeWord = "subscribe".ToUtf8Bytes();
+ private static readonly byte[] PSubscribeWord = "psubscribe".ToUtf8Bytes();
private static readonly byte[] UnSubscribeWord = "unsubscribe".ToUtf8Bytes();
+ private static readonly byte[] PUnSubscribeWord = "punsubscribe".ToUtf8Bytes();
private static readonly byte[] MessageWord = "message".ToUtf8Bytes();
+ private static readonly byte[] PMessageWord = "pmessage".ToUtf8Bytes();
public RedisSubscription(IRedisNativeClient redisClient)
{
@@ -42,7 +46,7 @@ public void SubscribeToChannels(params string[] channels)
public void SubscribeToChannelsMatching(params string[] patterns)
{
- var multiBytes = redisClient.Subscribe(patterns);
+ var multiBytes = redisClient.PSubscribe(patterns);
ParseSubscriptionResults(multiBytes);
while (this.SubscriptionCount > 0)
@@ -54,14 +58,18 @@ public void SubscribeToChannelsMatching(params string[] patterns)
private void ParseSubscriptionResults(byte[][] multiBytes)
{
- for (var i = 0; i < multiBytes.Length; i += 3)
+ int componentsPerMsg = IsPSubscription ? 4 : 3;
+ int msgIndex = IsPSubscription ? 3 : 2;
+ for (var i = 0; i < multiBytes.Length; i += componentsPerMsg)
{
var messageType = multiBytes[i];
var channel = multiBytes[i + 1].FromUtf8Bytes();
-
- if (SubscribeWord.AreEqual(messageType))
+ if (SubscribeWord.AreEqual(messageType)
+ || PSubscribeWord.AreEqual(messageType))
{
- this.SubscriptionCount = int.Parse(multiBytes[i + 2].FromUtf8Bytes());
+ IsPSubscription = PSubscribeWord.AreEqual(messageType);
+
+ this.SubscriptionCount = int.Parse(multiBytes[i + msgIndex].FromUtf8Bytes());
activeChannels.Add(channel);
@@ -70,7 +78,8 @@ private void ParseSubscriptionResults(byte[][] multiBytes)
this.OnSubscribe(channel);
}
}
- else if (UnSubscribeWord.AreEqual(messageType))
+ else if (UnSubscribeWord.AreEqual(messageType)
+ || PUnSubscribeWord.AreEqual(messageType))
{
this.SubscriptionCount = int.Parse(multiBytes[i + 2].FromUtf8Bytes());
@@ -81,9 +90,10 @@ private void ParseSubscriptionResults(byte[][] multiBytes)
this.OnUnSubscribe(channel);
}
}
- else if (MessageWord.AreEqual(messageType))
+ else if (MessageWord.AreEqual(messageType)
+ || PMessageWord.AreEqual(messageType))
{
- var message = multiBytes[i + 2].FromUtf8Bytes();
+ var message = multiBytes[i + msgIndex].FromUtf8Bytes();
if (this.OnMessage != null)
{
@@ -93,7 +103,7 @@ private void ParseSubscriptionResults(byte[][] multiBytes)
else
{
throw new RedisException(
- "Invalid state. Expected [subscribe|unsubscribe|message] got: " + messageType);
+ "Invalid state. Expected [[p]subscribe|[p]unsubscribe|message] got: " + messageType.FromUtf8Bytes());
}
}
}
@@ -108,6 +118,16 @@ public void UnSubscribeFromAllChannels()
this.activeChannels = new List<string>();
}
+ public void UnSubscribeFromAllChannelsMatchingAnyPatterns()
+ {
+ if (activeChannels.Count == 0) return;
+
+ var multiBytes = redisClient.PUnSubscribe();
+ ParseSubscriptionResults(multiBytes);
+
+ this.activeChannels = new List<string>();
+ }
+
public void UnSubscribeFromChannels(params string[] channels)
{
var multiBytes = redisClient.UnSubscribe(channels);
@@ -116,13 +136,20 @@ public void UnSubscribeFromChannels(params string[] channels)
public void UnSubscribeFromChannelsMatching(params string[] patterns)
{
- var multiBytes = redisClient.UnSubscribe(patterns);
+ var multiBytes = redisClient.PUnSubscribe(patterns);
ParseSubscriptionResults(multiBytes);
}
public void Dispose()
{
- UnSubscribeFromAllChannels();
+ if (IsPSubscription)
+ {
+ UnSubscribeFromAllChannelsMatchingAnyPatterns();
+ }
+ else
+ {
+ UnSubscribeFromAllChannels();
+ }
}
}
}
Oops, something went wrong.

0 comments on commit a60d667

Please sign in to comment.