Skip to content

Commit

Permalink
Add new CreateConnectionAndStart() helper function to streamline test…
Browse files Browse the repository at this point in the history
… creation and syntax.

Update AsyncConsumeTest.cs with new API as example code.
  • Loading branch information
Jim Gomes committed Oct 8, 2014
1 parent da1b979 commit 664beca
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 104 deletions.
190 changes: 86 additions & 104 deletions src/test/csharp/AsyncConsumeTest.cs
Expand Up @@ -16,7 +16,6 @@
*/

using System.Threading;
using Apache.NMS.Util;
using NUnit.Framework;

namespace Apache.NMS.Test
Expand Down Expand Up @@ -52,25 +51,22 @@ public override void TearDown()
[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
MsgDeliveryMode deliveryMode)
{
using(IConnection connection = CreateConnection(GetTestClientId()))
using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
producer.DeliveryMode = deliveryMode;
consumer.Listener += new MessageListener(OnMessage);
producer.DeliveryMode = deliveryMode;
consumer.Listener += new MessageListener(OnMessage);

IMessage request = session.CreateMessage();
request.NMSCorrelationID = "AsyncConsume";
request.NMSType = "Test";
producer.Send(request);
IMessage request = session.CreateMessage();
request.NMSCorrelationID = "AsyncConsume";
request.NMSType = "Test";
producer.Send(request);

WaitForMessageToArrive();
Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
}
WaitForMessageToArrive();
Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
}
}

Expand All @@ -79,26 +75,26 @@ public override void TearDown()
[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
MsgDeliveryMode deliveryMode)
{
using(IConnection connection = CreateConnection(GetTestClientId()))
using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
string correlationId = "AsyncConsumeAfterSend";

using(IMessageProducer producer = session.CreateProducer(destination))
{
producer.DeliveryMode = deliveryMode;

IMessage request = session.CreateMessage();
request.NMSCorrelationID = "AsyncConsumeAfterSend";
request.NMSCorrelationID = correlationId;
request.NMSType = "Test";
producer.Send(request);
}

using(IMessageConsumer consumer = session.CreateConsumer(destination))
{
consumer.Listener += new MessageListener(OnMessage);
WaitForMessageToArrive();
Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
}
using(IMessageConsumer consumer = session.CreateConsumer(destination))
{
consumer.Listener += new MessageListener(OnMessage);
WaitForMessageToArrive();
Assert.AreEqual(correlationId, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
}
}
}
Expand All @@ -108,26 +104,23 @@ public override void TearDown()
[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
MsgDeliveryMode deliveryMode)
{
using(IConnection connection = CreateConnection(GetTestClientId()))
using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
producer.DeliveryMode = deliveryMode;
producer.DeliveryMode = deliveryMode;

IMessage request = session.CreateMessage();
request.NMSCorrelationID = "AsyncConsumeAfterSendLateListener";
request.NMSType = "Test";
producer.Send(request);
IMessage request = session.CreateMessage();
request.NMSCorrelationID = "AsyncConsumeAfterSendLateListener";
request.NMSType = "Test";
producer.Send(request);

// now lets add the listener
consumer.Listener += new MessageListener(OnMessage);
WaitForMessageToArrive();
Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
}
// now lets add the listener
consumer.Listener += new MessageListener(OnMessage);
WaitForMessageToArrive();
Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
}
}

Expand All @@ -136,32 +129,27 @@ public override void TearDown()
[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
MsgDeliveryMode deliveryMode)
{
using(IConnection connection = CreateConnection(GetTestClientId()))
using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
using(IMessageConsumer consumer = session.CreateConsumer(destination))
{
consumer.Listener += new MessageListener(OnMessage);
using(IMessageProducer producer = session.CreateProducer(destination))
{
producer.DeliveryMode = deliveryMode;

ITextMessage request = session.CreateTextMessage("Hello, World!");
request.NMSCorrelationID = "AsyncConsumeTextMessage";
request.Properties["NMSXGroupID"] = "cheese";
request.Properties["myHeader"] = "James";

producer.Send(request);

WaitForMessageToArrive();
Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
Assert.AreEqual(request.Properties["NMSXGroupID"], receivedMsg.Properties["NMSXGroupID"], "Invalid NMSXGroupID.");
Assert.AreEqual(request.Properties["myHeader"], receivedMsg.Properties["myHeader"], "Invalid myHeader.");
Assert.AreEqual(request.Text, ((ITextMessage) receivedMsg).Text, "Invalid text body.");
}
}
consumer.Listener += new MessageListener(OnMessage);
producer.DeliveryMode = deliveryMode;

ITextMessage request = session.CreateTextMessage("Hello, World!");
request.NMSCorrelationID = "AsyncConsumeTextMessage";
request.Properties["NMSXGroupID"] = "cheese";
request.Properties["myHeader"] = "James";

producer.Send(request);

WaitForMessageToArrive();
Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
Assert.AreEqual(request.Properties["NMSXGroupID"], receivedMsg.Properties["NMSXGroupID"], "Invalid NMSXGroupID.");
Assert.AreEqual(request.Properties["myHeader"], receivedMsg.Properties["myHeader"], "Invalid myHeader.");
Assert.AreEqual(request.Text, ((ITextMessage) receivedMsg).Text, "Invalid text body.");
}
}

Expand All @@ -170,48 +158,42 @@ public override void TearDown()
[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
MsgDeliveryMode deliveryMode)
{
using(IConnection connection = CreateConnection(GetTestClientId()))
using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(ITemporaryQueue tempReplyDestination = session.CreateTemporaryQueue())
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageConsumer tempConsumer = session.CreateConsumer(tempReplyDestination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(ITemporaryQueue tempReplyDestination = session.CreateTemporaryQueue())
using(IDestination destination = CreateDestination(session, DestinationType.Queue))
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageConsumer tempConsumer = session.CreateConsumer(tempReplyDestination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
producer.DeliveryMode = deliveryMode;
tempConsumer.Listener += new MessageListener(OnMessage);
consumer.Listener += new MessageListener(OnQueueMessage);

IMessage request = session.CreateMessage();
request.NMSCorrelationID = "TemqQueueAsyncConsume";
request.NMSType = "Test";
request.NMSReplyTo = tempReplyDestination;
producer.Send(request);

WaitForMessageToArrive();
Assert.AreEqual("TempQueueAsyncResponse", receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
}
producer.DeliveryMode = deliveryMode;
tempConsumer.Listener += new MessageListener(OnMessage);
consumer.Listener += new MessageListener(OnQueueMessage);

IMessage request = session.CreateMessage();
request.NMSCorrelationID = "TemqQueueAsyncConsume";
request.NMSType = "Test";
request.NMSReplyTo = tempReplyDestination;
producer.Send(request);

WaitForMessageToArrive();
Assert.AreEqual("TempQueueAsyncResponse", receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
}
}

protected void OnQueueMessage(IMessage message)
{
Assert.AreEqual("TemqQueueAsyncConsume", message.NMSCorrelationID, "Invalid correlation ID.");
using(IConnection connection = CreateConnection(RESPONSE_CLIENT_ID))
using(IConnection connection = CreateConnectionAndStart(RESPONSE_CLIENT_ID))
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IMessageProducer producer = session.CreateProducer(message.NMSReplyTo))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(IMessageProducer producer = session.CreateProducer(message.NMSReplyTo))
{
producer.DeliveryMode = message.NMSDeliveryMode;
producer.DeliveryMode = message.NMSDeliveryMode;

ITextMessage response = session.CreateTextMessage("Asynchronous Response Message Text");
response.NMSCorrelationID = "TempQueueAsyncResponse";
response.NMSType = message.NMSType;
producer.Send(response);
}
ITextMessage response = session.CreateTextMessage("Asynchronous Response Message Text");
response.NMSCorrelationID = "TempQueueAsyncResponse";
response.NMSType = message.NMSType;
producer.Send(response);
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/test/csharp/NMSTestSupport.cs
Expand Up @@ -389,6 +389,18 @@ public virtual IConnection CreateConnection(string newClientId)
return newConnection;
}

/// <summary>
/// Create a new connection to the broker, and start it.
/// </summary>
/// <param name="newClientId">Client ID of the new connection.</param>
/// <returns></returns>
public virtual IConnection CreateConnectionAndStart(string newClientId = null)
{
IConnection newConnection = CreateConnection(newClientId);
newConnection.Start();
return newConnection;
}

public IDestination CreateDestination(ISession session, DestinationType type)
{
return CreateDestination(session, type, "");
Expand Down

0 comments on commit 664beca

Please sign in to comment.