Skip to content

Commit

Permalink
https://issues.jboss.org/browse/HORNETQ-1190 forced delivery doesn't …
Browse files Browse the repository at this point in the history
…work well with slow Consumers
  • Loading branch information
clebertsuconic committed May 6, 2013
1 parent c9acbf1 commit 95b10e8
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 1 deletion.
Expand Up @@ -91,6 +91,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
// Which is a OrderedExecutor
private final Executor flowControlExecutor;

// Number of pending calls on flow control
private final ReusableLatch pendingFlowControl = new ReusableLatch(0);

private final int clientWindowSize;

private final int ackBatchSize;
Expand Down Expand Up @@ -920,6 +923,18 @@ private void startSlowConsumer()
HornetQClientLogger.LOGGER.trace("Sending 1 credit to start delivering of one message to slow consumer");
}
sendCredits(1);
try
{
// We use an executor here to guarantee the messages will arrive in order.
// However when starting a slow consumer, we have to guarantee the credit was sent before we can perform any
// operations like forceDelivery
pendingFlowControl.await(10, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
// will just ignore and forward the ignored
Thread.currentThread().interrupt();
}
}

private void resetIfSlowConsumer()
Expand Down Expand Up @@ -972,11 +987,19 @@ private void queueExecutor()
*/
private void sendCredits(final int credits)
{
pendingFlowControl.countUp();
flowControlExecutor.execute(new Runnable()
{
public void run()
{
channel.send(new SessionConsumerFlowCreditMessage(id, credits));
try
{
channel.send(new SessionConsumerFlowCreditMessage(id, credits));
}
finally
{
pendingFlowControl.countDown();
}
}
});
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import junit.framework.Assert;

Expand Down Expand Up @@ -301,6 +302,156 @@ public void testReceiveImmediateWithZeroWindow4() throws Exception
senderSession.close();
}

public void testMultipleImmediate() throws Exception
{

final int NUMBER_OF_MESSAGES = 200;
HornetQServer server = createServer(false, isNetty());

server.start();

locator.setConsumerWindowSize(0);

final ClientSessionFactory sf = createSessionFactory(locator);

{
ClientSession session = sf.createSession(false, false, false);
session.createQueue("testWindow", "testWindow", true);
session.close();
}

Thread threads[] = new Thread[10];
final AtomicInteger errors = new AtomicInteger(0);
final CountDownLatch latchStart = new CountDownLatch(1);
final AtomicInteger received = new AtomicInteger(0);

for (int i = 0 ; i < threads.length; i++)
{
threads[i] = new Thread()
{
public void run()
{
try
{
ClientSession session = sf.createSession(false, false);
ClientConsumer consumer = session.createConsumer("testWindow");
session.start();
latchStart.await(10, TimeUnit.SECONDS);

while (true)
{

ClientMessage msg = consumer.receiveImmediate();
if (msg == null)
{
break;
}
msg.acknowledge();

session.commit();

received.incrementAndGet();

}

}
catch (Throwable e)
{
e.printStackTrace();
errors.incrementAndGet();
}
}
};


threads[i].start();
}


ClientSession senderSession = sf.createSession(false, false);

ClientProducer producer = senderSession.createProducer("testWindow");

ClientMessage sent = senderSession.createMessage(true);
sent.putStringProperty("hello", "world");
for (int i = 0 ; i < NUMBER_OF_MESSAGES; i++)
{
producer.send(sent);
senderSession.commit();
}

latchStart.countDown();

for (Thread t : threads)
{
t.join();
}

assertEquals(0, errors.get());

assertEquals(NUMBER_OF_MESSAGES, received.get());
}

public void testSingleImmediate() throws Exception
{

final int NUMBER_OF_MESSAGES = 200;
HornetQServer server = createServer(false, isNetty());

server.start();

locator.setConsumerWindowSize(0);

final ClientSessionFactory sf = createSessionFactory(locator);

{
ClientSession session = sf.createSession(false, false, false);
session.createQueue("testWindow", "testWindow", true);
session.close();
}

final AtomicInteger received = new AtomicInteger(0);



ClientSession senderSession = sf.createSession(false, false);

ClientProducer producer = senderSession.createProducer("testWindow");

ClientMessage sent = senderSession.createMessage(true);
sent.putStringProperty("hello", "world");
for (int i = 0 ; i < NUMBER_OF_MESSAGES; i++)
{
producer.send(sent);
}

senderSession.commit();

ClientSession session = sf.createSession(false, false);
ClientConsumer consumer = session.createConsumer("testWindow");
session.start();

while (true)
{

ClientMessage msg = consumer.receiveImmediate();
if (msg == null)
{
System.out.println("Returning null");
break;
}
msg.acknowledge();

session.commit();

received.incrementAndGet();

}

assertEquals(NUMBER_OF_MESSAGES, received.get());
}


/*
* tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
* know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
Expand Down

0 comments on commit 95b10e8

Please sign in to comment.