Skip to content
Permalink
Browse files
  • Loading branch information
Timothy A. Bish committed Aug 1, 2012
1 parent 1b74d90 commit 4f88b4f4901ecbfcc2c89c9842475c12d543e4c7
Showing 1 changed file with 330 additions and 0 deletions.
@@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* The main goal of this test list is to make sure that a producer and a consumer with
* separate connections can communicate properly and consitently despite a waiting time
* between messages TestStability500WithSleep would be the important test, as a delay of
* 30s is executed at each 100 (100, 200, 300, ...).
*/

using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.Test;
using Apache.NMS.Stomp;
using NUnit.Framework;
using System;

namespace Apache.NMS.Stomp.Test
{
[TestFixture]
public class NMSTester : NMSTestSupport
{
// TODO set proper configuration parameters
private const string destination = "test";

private static int numberOfMessages = 0;
private static IConnection producerConnection = null;
private static IConnection consumerConnection = null;
private static Thread consumerThread = null;
private static Thread producerThread = null;
private static long consumerMessageCounter = 0;
private static long producerMessageCounter = 0;
private static string possibleConsumerException = "";
private static string possibleProducerException = "";
private static bool consumerReady = false;

[SetUp]
public void Init()
{
if (producerConnection != null)
{
producerConnection.Close();
}
if (consumerConnection != null)
{
consumerConnection.Close();
}
if (consumerThread != null)
{
consumerThread.Abort();
}
if (producerThread != null)
{
producerThread.Abort();
}

producerConnection = null;
consumerConnection = null;
consumerThread = null;
producerThread = null;

producerConnection = CreateConnection();
consumerConnection = CreateConnection();

numberOfMessages = 0;
consumerMessageCounter = 0;
producerMessageCounter = 0;
possibleConsumerException = "";
possibleProducerException = "";
consumerReady = false;
//Giving time for the topic to clear out
Thread.Sleep(2500);
}

[TearDown]
public void Dispose()
{
if (producerConnection != null)
{
producerConnection.Close();
}
if (consumerConnection != null)
{
consumerConnection.Close();
}

if (consumerThread != null)
{
consumerThread.Abort();
}
if (producerThread != null)
{
producerThread.Abort();
}

producerConnection = null;
consumerConnection = null;
consumerThread = null;
producerThread = null;
}

[Test]
public void TestStability5Continuous()
{
numberOfMessages = 5;

consumerThread = new Thread(new ThreadStart(NMSTester.ConsumerThread));
consumerThread.Start();

producerThread = new Thread(new ThreadStart(NMSTester.ProducerThreadContinuous));
producerThread.Start();

Thread.Sleep(100);

Assert.IsTrue(consumerThread.IsAlive && producerThread.IsAlive);

while (consumerThread.IsAlive && producerThread.IsAlive)
{
Thread.Sleep(100);
}

Assert.AreEqual("", possibleConsumerException);
Assert.AreEqual("", possibleProducerException);
Assert.AreEqual(numberOfMessages, producerMessageCounter);
Assert.AreEqual(numberOfMessages, consumerMessageCounter);
}

[Test]
public void TestStability50Continuous()
{
numberOfMessages = 50;

consumerThread = new Thread(new ThreadStart(NMSTester.ConsumerThread));
consumerThread.Start();

producerThread = new Thread(new ThreadStart(NMSTester.ProducerThreadContinuous));
producerThread.Start();

Thread.Sleep(100);

Assert.IsTrue(consumerThread.IsAlive && producerThread.IsAlive);

while (consumerThread.IsAlive && producerThread.IsAlive)
{
Thread.Sleep(100);
}

Assert.AreEqual("", possibleConsumerException);
Assert.AreEqual("", possibleProducerException);
Assert.AreEqual(numberOfMessages, producerMessageCounter);
Assert.AreEqual(numberOfMessages, consumerMessageCounter);
}

[Test]
public void TestStability500Continuous()
{
numberOfMessages = 500; //At 100,200,300, ... a delay of 30 seconds is executed in the producer to cause an unexpected disconnect, due to a malformed ACK?

consumerThread = new Thread(new ThreadStart(NMSTester.ConsumerThread));
consumerThread.Start();

producerThread = new Thread(new ThreadStart(NMSTester.ProducerThreadContinuous));
producerThread.Start();

Thread.Sleep(100);

Assert.IsTrue(consumerThread.IsAlive && producerThread.IsAlive);

while (consumerThread.IsAlive && producerThread.IsAlive)
{
Thread.Sleep(100);
}

Assert.AreEqual("", possibleConsumerException);
Assert.AreEqual("", possibleProducerException);
Assert.AreEqual(numberOfMessages, producerMessageCounter);
Assert.AreEqual(numberOfMessages, consumerMessageCounter);
}

[Test]
public void TestStability500WithSleep()
{
numberOfMessages = 500; //At 100,200,300, ... a delay of 30 seconds is executed in the producer to cause an unexpected disconnect, due to a malformed ACK?

consumerThread = new Thread(new ThreadStart(NMSTester.ConsumerThread));
consumerThread.Start();

producerThread = new Thread(new ThreadStart(NMSTester.ProducerThreadWithSleep));
producerThread.Start();

Thread.Sleep(100);

Assert.IsTrue(consumerThread.IsAlive && producerThread.IsAlive);

while (consumerThread.IsAlive && producerThread.IsAlive)
{
Thread.Sleep(100);
}

Assert.AreEqual("", possibleConsumerException);
Assert.AreEqual("", possibleProducerException);
Assert.AreEqual(numberOfMessages, producerMessageCounter);
Assert.AreEqual(numberOfMessages, consumerMessageCounter);
}

public static void Main(string[] args)
{

}

#region Consumer
private static void ConsumerThread()
{
ISession session = consumerConnection.CreateSession();
IMessageConsumer consumer;
IDestination dest = session.GetTopic(destination);
consumer = session.CreateConsumer(dest);
consumer.Listener += new MessageListener(OnMessage);
consumerConnection.ExceptionListener += new ExceptionListener(OnConsumerExceptionListener);
consumerConnection.Start();

consumerReady = true;

while (true)
{
Thread.Sleep(100);
}
}

private static void OnMessage(IMessage receivedMsg)
{
consumerMessageCounter++;
}

public static void OnConsumerExceptionListener(Exception ex)
{
possibleConsumerException = ex.Message;
Thread.CurrentThread.Abort();
}
#endregion Consumer

#region Producer
private static void ProducerThreadWithSleep()
{
ISession session = producerConnection.CreateSession();
IMessageProducer producer;

IDestination dest = session.GetTopic(destination);
producer = session.CreateProducer(dest);
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
producerConnection.ExceptionListener += new ExceptionListener(OnProducerExceptionListener);
producerConnection.Start();

ITextMessage message;

while (!consumerReady)
{
Thread.Sleep(100);
}

for (int c = 0; c < numberOfMessages; c++)
{
message = session.CreateTextMessage(c.ToString());
message.NMSType = "testType";
producer.Send(message);
producerMessageCounter++;
//Focal point of this test; induce a "long" delay between two messages without any other communication on the topic, Note that thse delays occure only at each 100, and that messages can be sent after the delay before a possible disconnect
if ((c + 1) % 100 == 0)
{
Thread.Sleep(30000);
}
else
{
Thread.Sleep(10);
}
}
}

private static void ProducerThreadContinuous()
{
ISession session = producerConnection.CreateSession();
IMessageProducer producer;

IDestination dest = session.GetTopic(destination);
producer = session.CreateProducer(dest);
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
producerConnection.ExceptionListener += new ExceptionListener(OnProducerExceptionListener);
producerConnection.Start();

ITextMessage message;

while (!consumerReady)
{
Thread.Sleep(100);
}

for (int c = 0; c < numberOfMessages; c++)
{
message = session.CreateTextMessage(c.ToString());
message.NMSType = "testType";
producer.Send(message);
producerMessageCounter++;
Thread.Sleep(10);
}
}

public static void OnProducerExceptionListener(Exception ex)
{
possibleProducerException = ex.Message;
Thread.CurrentThread.Abort();
}
#endregion Producer

}
}

0 comments on commit 4f88b4f

Please sign in to comment.