Skip to content
Permalink
Browse files
Refactored unit tests to support replacement environment variables in…
… the configuration file.

Refactored durable consumer tests.
Expanded URI tests.
Fixed broken URI parameter parsing.
Fixes [AMQNET-150]. (See https://issues.apache.org/activemq/browse/AMQNET-150)
  • Loading branch information
Jim Gomes committed Jun 1, 2009
1 parent 93103b5 commit aa087fb10854e5cd74fb190a99b9ba2d9d9135d8
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 135 deletions.
@@ -305,11 +305,6 @@ public static CompositeData parseComposite(Uri uri)
ssp = stripPrefix(ssp, rc.Scheme).Trim();
ssp = stripPrefix(ssp, ":").Trim();
}
else
{
// Fake a composite URL with parenthesis
ssp = "(" + ssp + ")";
}

// Handle the composite components
parseComposite(uri, rc, ssp);
@@ -26,84 +26,24 @@ namespace Apache.NMS.Test
[TestFixture]
public class ConsumerTest : NMSTestSupport
{
protected static string TEST_CLIENT_ID = "ConsumerTestClientId";
protected static string TOPIC = "TestTopicConsumerTest";
protected static string CONSUMER_ID = "ConsumerTestConsumerId";

#if !NET_1_1
[RowTest]
[Row(MsgDeliveryMode.Persistent)]
[Row(MsgDeliveryMode.NonPersistent)]
#endif
public void TestDurableConsumerSelectorChange(MsgDeliveryMode deliveryMode)
{
try
{
using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
ITopic topic = SessionUtil.GetTopic(session, TOPIC);
IMessageProducer producer = session.CreateProducer(topic);
IMessageConsumer consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='red'", false);

producer.DeliveryMode = deliveryMode;

// Send the messages
ITextMessage sendMessage = session.CreateTextMessage("1st");
sendMessage.Properties["color"] = "red";
producer.Send(sendMessage);

ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message.");
Assert.AreEqual("1st", receiveMsg.Text);
Assert.AreEqual(deliveryMode, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");

// Change the subscription.
consumer.Dispose();
consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='blue'", false);

sendMessage = session.CreateTextMessage("2nd");
sendMessage.Properties["color"] = "red";
producer.Send(sendMessage);
sendMessage = session.CreateTextMessage("3rd");
sendMessage.Properties["color"] = "blue";
producer.Send(sendMessage);

// Selector should skip the 2nd message.
receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message.");
Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message.");
Assert.AreEqual(deliveryMode, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");

// Make sure there are no pending messages.
Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription.");
}
}
}
catch(Exception ex)
{
Assert.Fail(ex.Message);
}
finally
{
UnregisterDurableConsumer(TEST_CLIENT_ID, CONSUMER_ID);
}
}
protected static string TEST_CLIENT_ID = "TestConsumerClientId";

// The .NET CF does not have the ability to interrupt threads, so this test is impossible.
#if !NETCF
[Test]
public void TestNoTimeoutConsumer()
[RowTest]
[Row(AcknowledgementMode.AutoAcknowledge)]
[Row(AcknowledgementMode.ClientAcknowledge)]
[Row(AcknowledgementMode.DupsOkAcknowledge)]
[Row(AcknowledgementMode.Transactional)]
public void TestNoTimeoutConsumer(AcknowledgementMode ackMode)
{
// Launch a thread to perform IMessageConsumer.Receive().
// If it doesn't fail in less than three seconds, no exception was thrown.
Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc));
using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(ISession session = connection.CreateSession(ackMode))
{
ITemporaryQueue queue = session.CreateTemporaryQueue();
using(this.timeoutConsumer = session.CreateConsumer(queue))
@@ -147,16 +87,20 @@ protected void TimeoutConsumerThreadProc()
}
}

[Test]
public void TestSyncReceiveConsumerClose()
[RowTest]
[Row(AcknowledgementMode.AutoAcknowledge)]
[Row(AcknowledgementMode.ClientAcknowledge)]
[Row(AcknowledgementMode.DupsOkAcknowledge)]
[Row(AcknowledgementMode.Transactional)]
public void TestSyncReceiveConsumerClose(AcknowledgementMode ackMode)
{
// Launch a thread to perform IMessageConsumer.Receive().
// If it doesn't fail in less than three seconds, no exception was thrown.
Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc));
using (IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using (ISession session = connection.CreateSession(ackMode))
{
ITemporaryQueue queue = session.CreateTemporaryQueue();
using (this.timeoutConsumer = session.CreateConsumer(queue))
@@ -14,111 +14,161 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using NUnit.Framework;
using NUnit.Framework.Extensions;
using Apache.NMS.Util;

namespace Apache.NMS.Test
{
[TestFixture]
public class DurableTest : NMSTestSupport
{
protected static string TOPIC = "TestTopicDurableConsumer";
protected static string SEND_CLIENT_ID = "SendDurableTestClientId";
protected static string TEST_CLIENT_ID = "DurableTestClientId";
protected static string CONSUMER_ID = "DurableTestConsumerId";
protected static string TEST_CLIENT_ID = "TestDurableConsumerClientId";
protected static string SEND_CLIENT_ID = "TestDurableProducerClientId";
protected static string DURABLE_TOPIC = "TestDurableConsumerTopic";
protected static string CONSUMER_ID = "TestDurableConsumerConsumerId";
protected static string DURABLE_SELECTOR = "2 > 1";

protected void SendPersistentMessage()
{
using(IConnection connection = CreateConnection(SEND_CLIENT_ID))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.DupsOkAcknowledge))
{
ITopic topic = SessionUtil.GetTopic(session, TOPIC);
using(IMessageProducer producer = session.CreateProducer(topic))
{
ITextMessage message = session.CreateTextMessage("Persistent Hello");

producer.DeliveryMode = MsgDeliveryMode.Persistent;
producer.RequestTimeout = receiveTimeout;
producer.Send(message);
}
}
}
}

[Test]
public void TestDurableConsumer()
#if !NET_1_1
[RowTest]
[Row(MsgDeliveryMode.Persistent)]
[Row(MsgDeliveryMode.NonPersistent)]
#endif
public void TestDurableConsumerSelectorChange(MsgDeliveryMode deliveryMode)
{
try
{
RegisterDurableConsumer(TEST_CLIENT_ID, TOPIC, CONSUMER_ID, DURABLE_SELECTOR, false);
SendPersistentMessage();

using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.DupsOkAcknowledge))
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
ITopic topic = SessionUtil.GetTopic(session, TOPIC);
using(IMessageConsumer consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, DURABLE_SELECTOR, false))
{
IMessage msg = consumer.Receive(receiveTimeout);
Assert.IsNotNull(msg, "Did not receive first durable message.");
msg.Acknowledge();

SendPersistentMessage();
msg = consumer.Receive(receiveTimeout);
Assert.IsNotNull(msg, "Did not receive second durable message.");
msg.Acknowledge();
}
ITopic topic = SessionUtil.GetTopic(session, DURABLE_TOPIC);
IMessageProducer producer = session.CreateProducer(topic);
IMessageConsumer consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='red'", false);

producer.DeliveryMode = deliveryMode;

// Send the messages
ITextMessage sendMessage = session.CreateTextMessage("1st");
sendMessage.Properties["color"] = "red";
producer.Send(sendMessage);

ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message.");
Assert.AreEqual("1st", receiveMsg.Text);
Assert.AreEqual(deliveryMode, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
receiveMsg.Acknowledge();

// Change the subscription.
consumer.Dispose();
consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='blue'", false);

sendMessage = session.CreateTextMessage("2nd");
sendMessage.Properties["color"] = "red";
producer.Send(sendMessage);
sendMessage = session.CreateTextMessage("3rd");
sendMessage.Properties["color"] = "blue";
producer.Send(sendMessage);

// Selector should skip the 2nd message.
receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message.");
Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message.");
Assert.AreEqual(deliveryMode, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
receiveMsg.Acknowledge();

// Make sure there are no pending messages.
Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription.");
}
}
}
catch(Exception ex)
{
Assert.Fail(ex.Message);
}
finally
{
UnregisterDurableConsumer(TEST_CLIENT_ID, CONSUMER_ID);
}
}

[Test]
public void TestDurableConsumerTransactional()
#if !NET_1_1
[RowTest]
[Row(MsgDeliveryMode.Persistent, AcknowledgementMode.AutoAcknowledge)]
[Row(MsgDeliveryMode.Persistent, AcknowledgementMode.ClientAcknowledge)]
[Row(MsgDeliveryMode.Persistent, AcknowledgementMode.DupsOkAcknowledge)]
[Row(MsgDeliveryMode.Persistent, AcknowledgementMode.Transactional)]

[Row(MsgDeliveryMode.NonPersistent, AcknowledgementMode.AutoAcknowledge)]
[Row(MsgDeliveryMode.NonPersistent, AcknowledgementMode.ClientAcknowledge)]
[Row(MsgDeliveryMode.NonPersistent, AcknowledgementMode.DupsOkAcknowledge)]
[Row(MsgDeliveryMode.NonPersistent, AcknowledgementMode.Transactional)]
#endif
public void TestDurableConsumer(MsgDeliveryMode deliveryMode, AcknowledgementMode ackMode)
{
try
{
RegisterDurableConsumer(TEST_CLIENT_ID, TOPIC, CONSUMER_ID, DURABLE_SELECTOR, false);
RunTestDurableConsumerTransactional();
RunTestDurableConsumerTransactional();
RegisterDurableConsumer(TEST_CLIENT_ID, DURABLE_TOPIC, CONSUMER_ID, DURABLE_SELECTOR, false);
RunTestDurableConsumer(deliveryMode, ackMode);
if(AcknowledgementMode.Transactional == ackMode)
{
RunTestDurableConsumer(deliveryMode, ackMode);
}
}
finally
{
UnregisterDurableConsumer(TEST_CLIENT_ID, CONSUMER_ID);
}
}

protected void RunTestDurableConsumerTransactional()
protected void RunTestDurableConsumer(MsgDeliveryMode deliveryMode, AcknowledgementMode ackMode)
{
SendPersistentMessage();
SendDurableMessage(deliveryMode);

using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
{
ITopic topic = SessionUtil.GetTopic(session, TOPIC);
ITopic topic = SessionUtil.GetTopic(session, DURABLE_TOPIC);
using(IMessageConsumer consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, DURABLE_SELECTOR, false))
{
IMessage msg = consumer.Receive(receiveTimeout);
Assert.IsNotNull(msg, "Did not receive first durable transactional message.");
Assert.IsNotNull(msg, "Did not receive first durable message.");
msg.Acknowledge();
SendPersistentMessage();
SendDurableMessage(deliveryMode);

msg = consumer.Receive(receiveTimeout);
Assert.IsNotNull(msg, "Did not receive second durable transactional message.");
Assert.IsNotNull(msg, "Did not receive second durable message.");
msg.Acknowledge();
session.Commit();

if(AcknowledgementMode.Transactional == ackMode)
{
session.Commit();
}
}
}
}
}

protected void SendDurableMessage(MsgDeliveryMode deliveryMode)
{
using(IConnection connection = CreateConnection(SEND_CLIENT_ID))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.DupsOkAcknowledge))
{
ITopic topic = SessionUtil.GetTopic(session, DURABLE_TOPIC);
using(IMessageProducer producer = session.CreateProducer(topic))
{
ITextMessage message = session.CreateTextMessage("Durable Hello");

producer.DeliveryMode = deliveryMode;
producer.RequestTimeout = receiveTimeout;
producer.Send(message);
}
}
}

0 comments on commit aa087fb

Please sign in to comment.