Skip to content

Commit

Permalink
HORNETQ-708 - Fixing consumeImmediate on paging as part of my current…
Browse files Browse the repository at this point in the history
… work on paging
  • Loading branch information
clebertsuconic committed May 31, 2011
1 parent b00f2e5 commit ff05bd2
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 8 deletions.
Expand Up @@ -87,7 +87,7 @@ public boolean runExample() throws Exception
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);


// Step 13. Send the message for about 1K, which should be over the memory limit imposed by the server // Step 13. Send the message for about 1K, which should be over the memory limit imposed by the server
for (int i = 0; i < 1000; i++) for (int i = 0; i < 10000; i++)
{ {
messageProducer.send(message); messageProducer.send(message);
} }
Expand All @@ -106,7 +106,7 @@ public boolean runExample() throws Exception
// paging // paging
// until messages are ACKed // until messages are ACKed


for (int i = 0; i < 1000; i++) for (int i = 0; i < 10000; i++)
{ {
message = (BytesMessage)messageConsumer.receive(3000); message = (BytesMessage)messageConsumer.receive(3000);


Expand Down
29 changes: 27 additions & 2 deletions src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
Expand Up @@ -17,7 +17,6 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;


import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientMessage;
Expand Down Expand Up @@ -48,6 +47,8 @@ public class ClientConsumerImpl implements ClientConsumerInternal
// ------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------


private static final Logger log = Logger.getLogger(ClientConsumerImpl.class); private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);

private static final boolean isTrace = log.isTraceEnabled();


private static final boolean trace = ClientConsumerImpl.log.isTraceEnabled(); private static final boolean trace = ClientConsumerImpl.log.isTraceEnabled();


Expand Down Expand Up @@ -223,6 +224,10 @@ private ClientMessage receive(long timeout, final boolean forcingDelivery) throw
// we only force delivery once per call to receive // we only force delivery once per call to receive
if (!deliveryForced) if (!deliveryForced)
{ {
if (isTrace)
{
log.trace("Forcing delivery");
}
session.forceDelivery(id, forceDeliveryCount++); session.forceDelivery(id, forceDeliveryCount++);


deliveryForced = true; deliveryForced = true;
Expand Down Expand Up @@ -258,15 +263,26 @@ private ClientMessage receive(long timeout, final boolean forcingDelivery) throw
{ {
long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE); long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);


if (forcingDelivery && seq == forceDeliveryCount - 1) // Need to check if forceDelivery was called at this call
// As we could be receiving a message that came from a previous call
if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1)
{ {
// forced delivery messages are discarded, nothing has been delivered by the queue // forced delivery messages are discarded, nothing has been delivered by the queue
resetIfSlowConsumer(); resetIfSlowConsumer();

if (isTrace)
{
log.trace("There was nothing on the queue, leaving it now:: returning null");
}


return null; return null;
} }
else else
{ {
if (isTrace)
{
log.trace("Ignored force delivery answer as it belonged to another call");
}
// Ignore the message // Ignore the message
continue; continue;
} }
Expand Down Expand Up @@ -301,11 +317,20 @@ private ClientMessage receive(long timeout, final boolean forcingDelivery) throw
{ {
largeMessageReceived = m; largeMessageReceived = m;
} }

if (isTrace)
{
log.trace("Returning " + m);
}


return m; return m;
} }
else else
{ {
if (isTrace)
{
log.trace("Returning null");
}
resetIfSlowConsumer(); resetIfSlowConsumer();
return null; return null;
} }
Expand Down
3 changes: 3 additions & 0 deletions src/main/org/hornetq/core/server/Queue.java
Expand Up @@ -71,6 +71,9 @@ public interface Queue extends Bindable
void cancel(MessageReference reference, long timeBase) throws Exception; void cancel(MessageReference reference, long timeBase) throws Exception;


void deliverAsync(); void deliverAsync();

/** This method will make sure that any pending message (including paged message) will be delivered */
void forceDelivery();


long getMessageCount(); long getMessageCount();


Expand Down
24 changes: 24 additions & 0 deletions src/main/org/hornetq/core/server/impl/QueueImpl.java
Expand Up @@ -403,6 +403,25 @@ public void addTail(final MessageReference ref, final boolean direct)
executor.execute(concurrentPoller); executor.execute(concurrentPoller);
} }


public void forceDelivery()
{
if (pageSubscription != null && pageSubscription.isPaging())
{
if (isTrace)
{
log.trace("Force delivery scheduling depage");
}
scheduleDepage();
}

if (isTrace)
{
log.trace("Force delivery deliverying async");
}

deliverAsync();
}

public void deliverAsync() public void deliverAsync()
{ {
getExecutor().execute(deliverRunner); getExecutor().execute(deliverRunner);
Expand Down Expand Up @@ -1670,6 +1689,11 @@ private synchronized void depage()


if (isTrace) if (isTrace)
{ {
if (depaged == 0 && queueMemorySize.get() >= maxSize)
{
log.trace("Couldn't depage any message as the maxSize on the queue was achieved. There are too many pending messages to be acked in reference to the page configuration");
}

log.trace("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages"); log.trace("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
} }


Expand Down
Expand Up @@ -673,7 +673,7 @@ private void promptDelivery()
} }
else else
{ {
messageQueue.deliverAsync(); messageQueue.forceDelivery();
} }
} }
} }
Expand Down
156 changes: 153 additions & 3 deletions tests/src/org/hornetq/tests/integration/client/PagingTest.java
Expand Up @@ -354,6 +354,149 @@ public void testPreparePersistent() throws Exception


} }


public void testReceiveImmediate() throws Exception
{
clearData();

Configuration config = createDefaultConfig();

config.setJournalSyncNonTransactional(false);

HornetQServer server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());

server.start();

final int messageSize = 1024;

final int numberOfMessages = 1000;

try
{
ServerLocator locator = createInVMNonHALocator();

locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);

ClientSessionFactory sf = locator.createSessionFactory();

ClientSession session = sf.createSession(false, false, false);

session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);

ClientProducer producer = session.createProducer(PagingTest.ADDRESS);

ClientMessage message = null;

byte[] body = new byte[messageSize];

ByteBuffer bb = ByteBuffer.wrap(body);

for (int j = 1; j <= messageSize; j++)
{
bb.put(getSamplebyte(j));
}

for (int i = 0; i < numberOfMessages; i++)
{
message = session.createMessage(true);

HornetQBuffer bodyLocal = message.getBodyBuffer();

bodyLocal.writeBytes(body);

message.putIntProperty(new SimpleString("id"), i);

producer.send(message);
if (i % 1000 == 0)
{
session.commit();
}
}
session.commit();
session.close();

session = null;

sf.close();
locator.close();

server.stop();

server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();

locator = createInVMNonHALocator();
sf = locator.createSessionFactory();

Queue queue = server.locateQueue(ADDRESS);

assertEquals(numberOfMessages, queue.getMessageCount());

LinkedList<Xid> xids = new LinkedList<Xid>();

int msgReceived = 0;
ClientSession sessionConsumer = sf.createSession(false, false, false);
sessionConsumer.start();
ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
{
log.info("Received " + msgCount);
msgReceived++;
ClientMessage msg = consumer.receiveImmediate();
if (msg == null)
{
log.info("It's null. leaving now");
sessionConsumer.commit();
fail("Didn't receive a message");
}
msg.acknowledge();

if (msgCount % 5 == 0)
{
log.info("commit");
sessionConsumer.commit();
}
}

sessionConsumer.commit();

sessionConsumer.close();

sf.close();

locator.close();

assertEquals(0, queue.getMessageCount());

long timeout = System.currentTimeMillis() + 5000;
while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
{
Thread.sleep(100);
}
assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
}
finally
{
try
{
server.stop();
}
catch (Throwable ignored)
{
}
}

}

public void testMissingTXEverythingAcked() throws Exception public void testMissingTXEverythingAcked() throws Exception
{ {
clearData(); clearData();
Expand Down Expand Up @@ -3642,10 +3785,17 @@ public void write(int b) throws IOException
} }
}); });


if (!message.waitOutputStreamCompletion(5000)) try
{
if (!message.waitOutputStreamCompletion(5000))
{
log.info(threadDump("dump"));
fail("Couldn't finish large message receiving");
}
}
catch (Throwable e)
{ {
log.info(threadDump("dump")); fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID());
fail("Couldn't finish large message sending");
} }


} }
Expand Down
Expand Up @@ -656,4 +656,13 @@ public int moveReferences(Filter filter, SimpleString toAddress, boolean rejectD
return 0; return 0;
} }


/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#forceDelivery()
*/
public void forceDelivery()
{
// TODO Auto-generated method stub

}

} }
Expand Up @@ -650,6 +650,15 @@ public SimpleString getAddress()
return null; return null;
} }


/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#forceDelivery()
*/
public void forceDelivery()
{
// TODO Auto-generated method stub

}

} }


} }

0 comments on commit ff05bd2

Please sign in to comment.