Skip to content
Browse files

Finalized subscriber and monitor interplay for resilience against Red…

…is outages.
  • Loading branch information...
1 parent eb6f827 commit 9be10bebeeea65c6fd4e46060360aafa9c149468 @yiannisk yiannisk committed Mar 1, 2012
View
59 RedisQueue.Net.ServiceProvider.Tests/RedisMonitorTests.cs
@@ -326,7 +326,7 @@ public void TestCantResolveNonexistentAssemly()
}
[Test]
- public void TestMonitorCanReceiveMessageFromChannel()
+ public void TestMonitorWakesUpOnMessageReceived()
{
var task = new TaskMessage
{
@@ -346,6 +346,9 @@ public void TestMonitorCanReceiveMessageFromChannel()
var monitor = new RedisMonitor();
monitor.Performer = performerMock.Object;
+
+ var start = DateTime.Now;
+
monitor.Start();
Assert.IsTrue(monitor.Running);
@@ -354,6 +357,60 @@ public void TestMonitorCanReceiveMessageFromChannel()
monitor.Stop();
Assert.IsFalse(monitor.Running);
+
+ var stop = DateTime.Now;
+ Assert.Less((stop - start).TotalSeconds, 60);
+ }
+
+ [Test]
+ public void TestMonitorCanRecoverFromRedisOutage()
+ {
+ var task = new TaskMessage
+ {
+ Parameters = "blah",
+ Queue = "TestQueue"
+ };
+
+ var task2 = new TaskMessage
+ {
+ Parameters = "blah",
+ Queue = "TestQueue"
+ };
+
+ var performerMock = new Mock<Performer>();
+ performerMock.SetupGet(x => x.Status).Returns(new PerformResult
+ {
+ Data = string.Empty,
+ Outcome = Outcome.Success,
+ Reason = string.Empty
+ });
+
+ performerMock.Setup(x => x.Perform(task.Parameters));
+ performerMock.Setup(x => x.Perform(task2.Parameters));
+
+ var monitor = new RedisMonitor();
+ monitor.Performer = performerMock.Object;
+
+ monitor.Start();
+ Assert.IsTrue(monitor.Running);
+
+ using (var client = new QueueClient())
+ client.Enqueue(task);
+
+ Thread.Sleep(1500);
+ RedisServer.Kill();
+
+ Thread.Sleep(100);
+ RedisServer.Start();
+
+ Thread.Sleep(2000);
+ using (var client = new QueueClient())
+ client.Enqueue(task2);
+
+ Thread.Sleep(10000);
+ monitor.Stop();
+
+ Assert.IsFalse(monitor.Running);
}
}
}
View
2 RedisQueue.Net.ServiceProvider.Tests/fixtures/Redis.64bit/redis.conf
@@ -83,7 +83,7 @@ databases 16
#
# Note: you can disable saving at all commenting all the "save" lines.
-# save 900 1
+save 1 1
# save 300 10
# save 60 10000
View
21 RedisQueue.Net.ServiceProvider/App.config
@@ -15,7 +15,7 @@
<value>TestQueue</value>
</setting>
<setting name="MonitorSleepIntervalInMilliseconds" serializeAs="String">
- <value>60000</value>
+ <value>3000</value>
</setting>
<setting name="WorkerAssemblyPath" serializeAs="String">
<value>SampleWorker.dll</value>
@@ -27,6 +27,25 @@
<value>3</value>
</setting>
</RedisQueue.Net.ServiceProvider.Properties.Settings>
+
+ <RedisQueue.Net.Clients.Properties.Settings>
+ <setting name="RedisHost" serializeAs="String">
+ <value>127.0.0.1</value>
+ </setting>
+ <setting name="RedisPort" serializeAs="String">
+ <value>6379</value>
+ </setting>
+ <setting name="TaskRecycling" serializeAs="String">
+ <value>True</value>
+ </setting>
+ <setting name="MaxTaskRetries" serializeAs="String">
+ <value>10</value>
+ </setting>
+ <setting name="PurgeSuccessfulTasks" serializeAs="String">
+ <value>True</value>
+ </setting>
+ </RedisQueue.Net.Clients.Properties.Settings>
+
<ServiceWrapper.Properties.Settings>
<setting name="ServiceName" serializeAs="String">
<value>Redis Service Base</value>
View
62 RedisQueue.Net.ServiceProvider/RedisMonitor.cs
@@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
-using System.Net.Sockets;
using System.Reflection;
using System.Threading;
using RedisQueue.Net.Clients;
@@ -44,7 +43,8 @@ public RedisMonitor()
{
InitializeWorker();
InitializeThreads();
- InitializeClients();
+ InitializeMonitorClient();
+ InitializeSubscriptionClient();
}
public RedisMonitor(string host, int port)
@@ -54,23 +54,33 @@ public RedisMonitor(string host, int port)
InitializeWorker();
InitializeThreads();
- InitializeClients();
+ InitializeMonitorClient();
+ InitializeSubscriptionClient();
}
#endregion
#region Initialization
- protected void InitializeClients()
+ protected void InitializeMonitorClient()
{
if (!string.IsNullOrWhiteSpace(RedisHost) && RedisPort > 0)
{
MonitorClient = new QueueClient(RedisHost, RedisPort);
- SubscriptionClient = new QueueClient(RedisHost, RedisPort);
return;
}
MonitorClient = new QueueClient();
+ }
+
+ protected void InitializeSubscriptionClient()
+ {
+ if (!string.IsNullOrWhiteSpace(RedisHost) && RedisPort > 0)
+ {
+ SubscriptionClient = new QueueClient(RedisHost, RedisPort);
+ return;
+ }
+
SubscriptionClient = new QueueClient();
}
@@ -118,20 +128,38 @@ protected virtual void UnSubscribeFromQueue()
protected virtual void DoSubscribe()
{
IAsyncResult handle = null;
+
try
{
- var queueName = new QueueName(Settings.Default.Queue);
- handle = new Action(() => Subscription.SubscribeToChannels(queueName.ChannelName)).BeginInvoke(null, null);
+ handle = new Action(BlockSubscribe).BeginInvoke(null, null);
handle.AsyncWaitHandle.WaitOne();
}
catch(ThreadInterruptedException)
{
- // will throw an ObjectDisposedException on the point of invocation (the BeginInvoke() above).
+ // will throw an ObjectDisposedException at the point of invocation (the BeginInvoke() above).
+ if (handle == null) return;
handle.AsyncWaitHandle.Close();
Log.Debug("Stopped subscription to queue channel.");
}
- catch(ObjectDisposedException)
- {}
+ catch(ObjectDisposedException) {}
+ }
+
+ protected virtual void BlockSubscribe()
+ {
+ try
+ {
+ var queueName = new QueueName(Settings.Default.Queue);
+ Subscription.SubscribeToChannels(queueName.ChannelName);
+ }
+
+ catch (Exception exception)
+ {
+ if (!(exception is RedisException) && !(exception is IOException)) throw;
+
+ Log.Error("IOException while listening for channel messages. Will subscribe again in a second.", exception);
+ Thread.Sleep(1000);
+ BlockSubscribe();
+ }
}
#endregion
@@ -160,18 +188,10 @@ protected internal virtual void Run()
try
{
try { ProcessPendingTasks(); }
- catch (IOException exception)
+ catch (Exception exception)
{
- if (exception.InnerException is SocketException)
- {
- Log.Error("Could not connect to Redis. Will sleep and attempt again in a while.", exception);
-
- UnSubscribeFromQueue();
- MonitorClient.Dispose();
- InitializeClients();
- SubscribeToQueue();
- }
- else throw;
+ if (!(exception is RedisException) && !(exception is IOException)) throw;
+ Log.Error("Could not connect to Redis. Will sleep and attempt again in a while.", exception);
}
Thread.Sleep(Settings.Default.MonitorSleepIntervalInMilliseconds);
View
1 RedisQueue.Net.ServiceProvider/RedisQueue.Net.ServiceProvider.csproj
@@ -42,6 +42,7 @@
<Reference Include="ServiceStack.Interfaces, Version=3.5.5.30073, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\ServiceStack.Common.3.5.5\lib\net35\ServiceStack.Interfaces.dll</HintPath>
</Reference>
+ <Reference Include="ServiceStack.Redis, Version=3.5.2.5612, Culture=neutral, processorArchitecture=MSIL" />
<Reference Include="ServiceStack.Text, Version=3.5.7.9337, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\ServiceStack.Text.3.5.7\lib\net35\ServiceStack.Text.dll</HintPath>
</Reference>

0 comments on commit 9be10be

Please sign in to comment.
Something went wrong with that request. Please try again.