Skip to content
This repository has been archived by the owner on Dec 24, 2022. It is now read-only.

Commit

Permalink
handling incrments in tests better for race conditions, adding helper…
Browse files Browse the repository at this point in the history
… function to wait only as long as necessary when possible for wait conditions with a specified timeout vs. relying on sleep entirely for functionality (speeds up tests in most scenarios)
  • Loading branch information
boydc7 committed Aug 4, 2015
1 parent e397a5d commit 6471812
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 54 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion tests/ServiceStack.Aws.Tests/Sqs/FakeAmazonSqsTests.cs
Expand Up @@ -269,7 +269,7 @@ public void Received_message_no_ack_gets_requeued()
Assert.AreEqual(0, response.Messages.Count);

Thread.Sleep(1000);

// Should be a single item (same one) back on q again
var messageRepeat = helper.ReceiveSingle(newQueueUrl);
Assert.AreEqual(message.MessageId, messageRepeat.MessageId);
Expand Down
166 changes: 113 additions & 53 deletions tests/ServiceStack.Aws.Tests/Sqs/SqsMqServerTests.cs
Expand Up @@ -15,7 +15,7 @@ namespace ServiceStack.Aws.Tests.Sqs
public class SqsMqServerTests
{
private SqsQueueManager sqsQueueManager;

[TestFixtureSetUp]
public void FixtureSetUp()
{
Expand Down Expand Up @@ -128,16 +128,17 @@ public void Does_process_messages_sent_before_it_was_started()
var mqHost = CreateMqServer();
mqHost.RegisterHandler<Reverse>(x =>
{
reverseCalled++;
Interlocked.Increment(ref reverseCalled);
return x.GetBody().Value.Reverse();
});

var mqClient = mqHost.MessageFactory.CreateMessageQueueClient();
Publish_4_messages(mqClient);

mqHost.Start();
Thread.Sleep(3000);

SqsTestAssert.WaitUntilTrueOrTimeout(() => reverseCalled >= 4, timeoutSeconds: 3);

Assert.That(mqHost.GetStats().TotalMessagesProcessed, Is.EqualTo(4));
Assert.That(reverseCalled, Is.EqualTo(4));

Expand All @@ -157,12 +158,14 @@ public void Does_process_all_messages_and_Starts_Stops_correctly_with_multiple_t

mqHost.RegisterHandler<Reverse>(x =>
{
"Processing Reverse {0}...".Print(++reverseCalled);
Interlocked.Increment(ref reverseCalled);
"Processing Reverse {0}...".Print(reverseCalled);
return x.GetBody().Value.Reverse();
});
mqHost.RegisterHandler<Rot13>(x =>
{
"Processing Rot13 {0}...".Print(++rot13Called);
Interlocked.Increment(ref rot13Called);
"Processing Rot13 {0}...".Print(rot13Called);
return x.GetBody().Value.ToRot13();
});

Expand All @@ -181,7 +184,9 @@ public void Does_process_all_messages_and_Starts_Stops_correctly_with_multiple_t
});

mqHost.Start();
Thread.Sleep(4000);

SqsTestAssert.WaitUntilTrueOrTimeout(() => reverseCalled >= 2 && rot13Called >= 1, timeoutSeconds: 4);

Assert.That(mqHost.GetStatus(), Is.EqualTo("Started"));
Assert.That(mqHost.GetStats().TotalMessagesProcessed, Is.EqualTo(3));

Expand All @@ -198,11 +203,23 @@ public void Does_process_all_messages_and_Starts_Stops_correctly_with_multiple_t
Assert.That(mqHost.GetStatus(), Is.EqualTo("Started"));

5.Times(x => ThreadPool.QueueUserWorkItem(y => mqHost.Stop()));
Thread.Sleep(2000);

SqsTestAssert.WaitUntilTrueOrTimeout(() =>
{
var status = mqHost.GetStatus();
return status != null && status.Equals("Stopped");
}, timeoutSeconds: 2);

Assert.That(mqHost.GetStatus(), Is.EqualTo("Stopped"));

10.Times(x => ThreadPool.QueueUserWorkItem(y => mqHost.Start()));
Thread.Sleep(4000);

SqsTestAssert.WaitUntilTrueOrTimeout(() =>
{
var status = mqHost.GetStatus();
return reverseCalled >= 3 && rot13Called >= 2 && status != null && status.Equals("Started");
}, timeoutSeconds: 4);

Assert.That(mqHost.GetStatus(), Is.EqualTo("Started"));

Debug.WriteLine("\n" + mqHost.GetStats());
Expand All @@ -223,16 +240,34 @@ public void Only_allows_1_BgThread_to_run_at_a_time()
mqHost.RegisterHandler<Rot13>(x => x.GetBody().Value.ToRot13());

5.Times(x => ThreadPool.QueueUserWorkItem(y => mqHost.Start()));
Thread.Sleep(1000);

SqsTestAssert.WaitUntilTrueOrTimeout(() =>
{
var status = mqHost.GetStatus();
return status != null && status.Equals("Started");
}, timeoutSeconds: 1);

Assert.That(mqHost.GetStatus(), Is.EqualTo("Started"));
Assert.That(mqHost.BgThreadCount, Is.EqualTo(1));

10.Times(x => ThreadPool.QueueUserWorkItem(y => mqHost.Stop()));
Thread.Sleep(1000);

SqsTestAssert.WaitUntilTrueOrTimeout(() =>
{
var status = mqHost.GetStatus();
return status != null && status.Equals("Stopped");
}, timeoutSeconds: 1);

Assert.That(mqHost.GetStatus(), Is.EqualTo("Stopped"));

ThreadPool.QueueUserWorkItem(y => mqHost.Start());
Thread.Sleep(1000);

SqsTestAssert.WaitUntilTrueOrTimeout(() =>
{
var status = mqHost.GetStatus();
return status != null && status.Equals("Started");
}, timeoutSeconds: 1);

Assert.That(mqHost.GetStatus(), Is.EqualTo("Started"));

Assert.That(mqHost.BgThreadCount, Is.EqualTo(2));
Expand Down Expand Up @@ -260,7 +295,12 @@ public void Cannot_Stop_a_Disposed_MqHost()

mqHost.RegisterHandler<Reverse>(x => x.GetBody().Value.Reverse());
mqHost.Start();
Thread.Sleep(1000);

SqsTestAssert.WaitUntilTrueOrTimeout(() =>
{
var status = mqHost.GetStatus();
return status != null && status.Equals("Started");
}, timeoutSeconds: 2);

mqHost.Dispose();

Expand All @@ -286,18 +326,24 @@ public void Does_retry_messages_with_errors_by_RetryCount()

var reverseCalled = 0;
var rot13Called = 0;
var alwaysThrowsCalled = 0;

mqHost.RegisterHandler<Reverse>(x =>
{
reverseCalled++;
Interlocked.Increment(ref reverseCalled);
return x.GetBody().Value.Reverse();
});
mqHost.RegisterHandler<Rot13>(x =>
{
rot13Called++;
Interlocked.Increment(ref rot13Called);
return x.GetBody().Value.ToRot13();
});
mqHost.RegisterHandler<AlwaysThrows>(x => { throw new Exception("Always Throwing! " + x.GetBody().Value); });
mqHost.RegisterHandler<AlwaysThrows>(x =>
{
Interlocked.Increment(ref alwaysThrowsCalled);
throw new Exception("Always Throwing! " + x.GetBody().Value);
});

mqHost.Start();

var mqClient = mqHost.MessageFactory.CreateMessageQueueClient();
Expand All @@ -318,7 +364,8 @@ public void Does_retry_messages_with_errors_by_RetryCount()
Value = "ServiceStack"
});

Thread.Sleep(8000);
SqsTestAssert.WaitUntilTrueOrTimeout(() => reverseCalled >= 2 && rot13Called >= 1 && alwaysThrowsCalled >= 1);

Assert.That(mqHost.GetStats().TotalMessagesFailed, Is.EqualTo(1 * totalRetries));
Assert.That(mqHost.GetStats().TotalMessagesProcessed, Is.EqualTo(2 + 1));

Expand All @@ -340,8 +387,8 @@ public void Does_retry_messages_with_errors_by_RetryCount()
Value = "ServiceStack"
});

Thread.Sleep(5000);

SqsTestAssert.WaitUntilTrueOrTimeout(() => reverseCalled >= 4 && rot13Called >= 2 && alwaysThrowsCalled >= 6, timeoutSeconds: 5);
Debug.WriteLine(mqHost.GetStatsDescription());

Assert.That(mqHost.GetStats().TotalMessagesFailed, Is.EqualTo((1 + 5) * totalRetries));
Expand Down Expand Up @@ -383,8 +430,8 @@ public void Can_receive_and_process_same_reply_responses()
};
mqClient.Publish(incr);

Thread.Sleep(10000);

SqsTestAssert.WaitUntilTrueOrTimeout(() => called >= incr.Value + 1);
Assert.That(called, Is.EqualTo(1 + incr.Value));
}

Expand Down Expand Up @@ -429,8 +476,10 @@ public void Can_receive_and_process_standard_request_reply_combo()
};
mqClient.Publish(dto);

Thread.Sleep(12000);

SqsTestAssert.WaitUntilTrueOrTimeout(() => messageReceived != null &&
messageReceived.Equals("Hello, ServiceStack"),
timeoutSeconds: 12);

Assert.That(messageReceived, Is.EqualTo("Hello, ServiceStack"));
}

Expand All @@ -456,7 +505,7 @@ public void Can_handle_requests_concurrently_in_4_threads()
{
RunHandlerOnMultipleThreads(noOfThreads: 4, msgs: 10);
}

private void RunHandlerOnMultipleThreads(int noOfThreads, int msgs)
{
var timesCalled = 0;
Expand All @@ -466,7 +515,7 @@ private void RunHandlerOnMultipleThreads(int noOfThreads, int msgs)

mqHost.RegisterHandler<Wait>(m =>
{
timesCalled++;
Interlocked.Increment(ref timesCalled);
Thread.Sleep(m.GetBody().ForMs);
return null;
}, noOfThreads);
Expand All @@ -481,11 +530,7 @@ private void RunHandlerOnMultipleThreads(int noOfThreads, int msgs)
};
msgs.Times(i => mqClient.Publish(dto));

const double buffer = 2.2;
var sleepForMs = (int)((msgs * 1000 / (double)noOfThreads) * buffer);

"Sleeping for {0}ms...".Print(sleepForMs);
Thread.Sleep(sleepForMs);
SqsTestAssert.WaitUntilTrueOrTimeout(() => timesCalled >= msgs, timeoutSeconds: 12);

mqHost.Dispose();

Expand All @@ -506,39 +551,47 @@ public void Can_publish_and_receive_messages_with_MessageFactory()

var msg = mqClient.Get<Hello>(QueueNames<Hello>.In, TimeSpan.FromSeconds(30));

Assert.IsNotNull(msg, "Should have received msg from IN queue but did not");
Assert.That(msg.GetBody().Name, Is.EqualTo("Foo"));
}
}
}

private class HelloOut
{
public string Name { get; set; }
}

[Test]
public void Messages_with_null_Response_is_published_to_OutMQ()
{
sqsQueueManager.PurgeQueues(QueueNames<HelloOut>.AllQueueNames);
var msgsReceived = 0;
var mqServer = CreateMqServer();

mqServer.RegisterHandler<Hello>(m =>

using (var mqServer = CreateMqServer())
{
msgsReceived++;
return null;
});
mqServer.RegisterHandler<HelloOut>(m =>
{
Interlocked.Increment(ref msgsReceived);
return null;
});

mqServer.Start();
mqServer.Start();

using (mqServer)
{
using (var mqClient = mqServer.MessageFactory.CreateMessageQueueClient())
{
mqClient.Publish(new Hello
mqClient.Publish(new HelloOut
{
Name = "Into the Void"
});

var msg = mqClient.Get<Hello>(QueueNames<Hello>.Out, TimeSpan.FromSeconds(30));
var msg = mqClient.Get<HelloOut>(QueueNames<HelloOut>.Out, TimeSpan.FromSeconds(30));

Assert.IsNotNull(msg, "Should have received msg from OUT queue but did not");

var response = msg.GetBody();

Thread.Sleep(100);
SqsTestAssert.WaitUntilTrueOrTimeout(() => msgsReceived >= 1, timeoutSeconds: 1);

Assert.That(response.Name, Is.EqualTo("Into the Void"));
Assert.That(msgsReceived, Is.EqualTo(1));
Expand All @@ -547,38 +600,45 @@ public void Messages_with_null_Response_is_published_to_OutMQ()

}

private class HelloReply
{
public string Name { get; set; }
}

[Test]
public void Messages_with_null_Response_is_published_to_ReplyMQ()
{
sqsQueueManager.PurgeQueues(QueueNames<HelloReply>.AllQueueNames);
var msgsReceived = 0;
var mqServer = CreateMqServer();

mqServer.RegisterHandler<Hello>(m =>

using (var mqServer = CreateMqServer())
{
msgsReceived++;
return null;
});
mqServer.RegisterHandler<HelloReply>(m =>
{
Interlocked.Increment(ref msgsReceived);
return null;
});

mqServer.Start();
mqServer.Start();

using (mqServer)
{
using (var mqClient = mqServer.MessageFactory.CreateMessageQueueClient())
{
var replyMq = mqClient.GetTempQueueName();
mqClient.Publish(new Message<Hello>(new Hello
mqClient.Publish(new Message<HelloReply>(new HelloReply
{
Name = "Into the Void"
})
{
ReplyTo = replyMq
});

var msg = mqClient.Get<Hello>(replyMq, TimeSpan.FromSeconds(30));
var msg = mqClient.Get<HelloReply>(replyMq, TimeSpan.FromSeconds(30));

Assert.IsNotNull(msg, "Should have received msg from REPLY queue but did not");

var response = msg.GetBody();

Thread.Sleep(100);
SqsTestAssert.WaitUntilTrueOrTimeout(() => msgsReceived >= 1, timeoutSeconds: 1); ;

Assert.That(response.Name, Is.EqualTo("Into the Void"));
Assert.That(msgsReceived, Is.EqualTo(1));
Expand Down
19 changes: 19 additions & 0 deletions tests/ServiceStack.Aws.Tests/Sqs/SqsTestAssert.cs
@@ -1,7 +1,9 @@
using System;
using System.Threading;
using Amazon.SQS;
using NUnit.Framework;
using ServiceStack.Aws.Sqs.Fake;
using ServiceStack.Text;

namespace ServiceStack.Aws.Tests.Sqs
{
Expand Down Expand Up @@ -80,5 +82,22 @@ public static void Throws<T>(TestDelegate code, string realMsgContains = null)

}

public static void WaitUntilTrueOrTimeout(Func<bool> doneWhenTrue, int timeoutSeconds = 10)
{
var timeoutAt = DateTime.UtcNow.AddSeconds(timeoutSeconds);

"Waiting for max of {0}s for processing".Print(timeoutSeconds);

while (DateTime.UtcNow <= timeoutAt)
{
if (doneWhenTrue())
{
break;
}

Thread.Sleep(300);
}
}

}
}

0 comments on commit 6471812

Please sign in to comment.