Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Dec 21, 2011
1 parent ca75b96 commit d4b68c9
Showing 1 changed file with 26 additions and 19 deletions.
Expand Up @@ -13,6 +13,8 @@

package org.hornetq.tests.integration.client;

import java.util.concurrent.CountDownLatch;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
Expand Down Expand Up @@ -44,7 +46,7 @@ public class TestFlowControlOnIgnoreLargeMessageBodyTest extends JMSTestBase

private Topic topic;

private static int TOTAL_MESSAGES_COUNT = 50000;
private static int TOTAL_MESSAGES_COUNT = 20000;

private static int MSG_SIZE = 150 * 1024;

Expand Down Expand Up @@ -119,7 +121,7 @@ public void run()
connection = cf.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
prod = session.createProducer(topic);

prod.setDeliveryMode(DeliveryMode.PERSISTENT);

for (int i = 1; i <= messagesCount && !requestForStop; i++)
Expand Down Expand Up @@ -195,8 +197,11 @@ class LoadConsumer extends Thread
private final int numberOfMessages;

private int receiveTimeout = 0;

private final CountDownLatch consumerCreated;

LoadConsumer(final String name,
LoadConsumer(final CountDownLatch consumerCreated,
final String name,
final Topic topic,
final ConnectionFactory cf,
final int receiveTimeout,
Expand All @@ -207,6 +212,7 @@ class LoadConsumer extends Thread
this.topic = topic;
this.receiveTimeout = receiveTimeout;
this.numberOfMessages = numberOfMessages;
this.consumerCreated = consumerCreated;
}

public void sendStopRequest()
Expand All @@ -231,12 +237,19 @@ public void run()
try
{
connection = cf.createConnection();

connection.setClientID(getName());

connection.start();

session = connection.createSession(true, Session.SESSION_TRANSACTED);

TopicSubscriber subscriber = session.createDurableSubscriber(topic, getName());

consumerCreated.countDown();

int counter = 0;
int invalidOrderCounter = 0;

while (counter < numberOfMessages && !requestForStop && !error)
{
if (counter == 0)
Expand All @@ -252,21 +265,9 @@ public void run()
else
{
counter++;
// msg.readBytes(new byte[MSG_SIZE]);
if (msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) != counter)
{
if (invalidOrderCounter < 10)
{
error = true;
System.out.println("Invalid messages order! expected: " + counter +
", received " +
msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) +
" " +
topic +
" - " +
getName());
invalidOrderCounter++;
}
error = true;
}
}
if (counter % 10 == 0)
Expand Down Expand Up @@ -336,10 +337,13 @@ public void testFlowControl()
TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);

LoadConsumer consumers[] = new LoadConsumer[CONSUMERS_COUNT];

CountDownLatch latch = new CountDownLatch(CONSUMERS_COUNT);

for (int i = 0; i < consumers.length; i++)
{
consumers[i] = new LoadConsumer("consumer " + i,
consumers[i] = new LoadConsumer(latch,
"consumer " + i,
topic,
cf,
receiveTimeout,
Expand All @@ -350,6 +354,9 @@ public void testFlowControl()
{
consumer.start();
}

latch.await();

producer.start();
producer.join();
for (LoadConsumer consumer : consumers)
Expand Down Expand Up @@ -383,7 +390,7 @@ public void testFlowControl()
{
System.out.println(" OK ");
}

assertFalse(error);
}
catch (Exception e)
Expand Down

0 comments on commit d4b68c9

Please sign in to comment.