Skip to content

Commit

Permalink
https://jira.jboss.org/browse/HORNETQ-410
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Jun 8, 2010
1 parent 71c1fa3 commit 07e3ac6
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 37 deletions.
7 changes: 5 additions & 2 deletions src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Expand Up @@ -453,13 +453,16 @@ public void setTransferring(final boolean transferring)
}

public void receiveCredits(final int credits) throws Exception
{
{
if (credits == -1)
{
// No flow control
availableCredits = null;

//There may be messages already in the queue
promptDelivery();
}
else if(credits == 0)
else if (credits == 0)
{
//reset, used on slow consumers
availableCredits.set(0);
Expand Down
73 changes: 38 additions & 35 deletions tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
Expand Up @@ -66,41 +66,7 @@ protected void tearDown() throws Exception
super.tearDown();
}

// public void testQueueSpin() throws Exception
// {
// ClientSessionFactory sf = createInVMFactory();
//
// ClientSession session1 = sf.createSession();
//
// ClientSession session2 = sf.createSession();
//
// session1.createQueue(QUEUE, QUEUE, null, false);
//
// ClientProducer producer = session1.createProducer(QUEUE);
//
// final int numMessages = 100;
//
// for (int i = 0; i < numMessages; i++)
// {
// ClientMessage message = createTextMessage("m" + i, session1);
// producer.send(message);
// }
//
// ClientConsumer consumer1 = session1.createConsumer(QUEUE);
//
// ClientConsumer consumer2 = session2.createConsumer(QUEUE, new SimpleString("foo=wibble"));
//
// session2.start();
//
// consumer2.receive();
//
// Thread.sleep(30000);
//
// session1.close();
//
// session2.close();
// }


public void testConsumerAckImmediateAutoCommitTrue() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
Expand Down Expand Up @@ -301,6 +267,43 @@ public void onMessage(final ClientMessage message)
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
sessionRec.close();
}

// https://jira.jboss.org/browse/HORNETQ-410
public void testConsumeWithNoConsumerFlowControl() throws Exception
{
ClientSessionFactory sf = createInVMFactory();

sf.setConsumerWindowSize(-1);

ClientSession session = sf.createSession(false, true, true);

session.createQueue(QUEUE, QUEUE, null, false);

session.start();

ClientProducer producer = session.createProducer(QUEUE);

final int numMessages = 100;

for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}

ClientConsumer consumer = session.createConsumer(QUEUE);

for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer.receive(10000);
assertNotNull(message);
message.acknowledge();
}

session.close();
sf.close();

}

public void testClearListener() throws Exception
{
Expand Down

0 comments on commit 07e3ac6

Please sign in to comment.