Skip to content

Commit

Permalink
HORNETQ-581 - implement FAIL address-full-policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Bertram authored and clebertsuconic committed Aug 10, 2012
1 parent 79350ab commit 17f2ba0
Show file tree
Hide file tree
Showing 31 changed files with 3,032 additions and 2,620 deletions.
14 changes: 12 additions & 2 deletions docs/user-manual/en/paging.xml
Expand Up @@ -136,8 +136,10 @@
<entry>This must be set to PAGE for paging to enable. If the value
is PAGE then further messages will be paged to disk. If the
value is DROP then further messages will be silently dropped. If
the value is BLOCK then client message producers will block when
they try and send further messages.</entry>
the value is FAIL then the messages will be dropped and the client
message producers will receive an exception. If the value is
BLOCK then client message producers will block when they try and
send further messages.</entry>
<entry>PAGE</entry>
</row>
<row>
Expand All @@ -160,6 +162,14 @@
<para>To do this just set the <literal>address-full-policy</literal> to <literal
>DROP</literal> in the address settings</para>
</section>
<section>
<title>Dropping messages and throwing an exception to producers</title>
<para>Instead of paging messages when the max size is reached, an address can also be
configured to drop messages and also throw an exception on the client-side
when the address is full.</para>
<para>To do this just set the <literal>address-full-policy</literal> to <literal
>FAIL</literal> in the address settings</para>
</section>
<section>
<title>Blocking producers</title>
<para>Instead of paging messages when the max size is reached, an address can also be
Expand Down
5 changes: 3 additions & 2 deletions docs/user-manual/en/queue-attributes.xml
Expand Up @@ -137,9 +137,10 @@
for example, there might be no queues bound to that address, or none of the queues have filters that match, then normally that message
would be discarded. However if this parameter is set to true for that address, if the message is not routed to any queues it will instead
be sent to the dead letter address (DLA) for that address, if it exists.</para>
<para><literal>address-full-policy</literal>. This attribute can have one of the following values: PAGE, DROP or BLOCK and determines what happens when
<para><literal>address-full-policy</literal>. This attribute can have one of the following values: PAGE, DROP, FAIL or BLOCK and determines what happens when
an address where <literal>max-size-bytes</literal> is specified becomes full. The default value is PAGE. If the value is PAGE then further messages will be paged to disk.
If the value is DROP then further messages will be silently dropped.
If the value is DROP then further messages will be silently dropped.
If the value is FAIL then further messages will be dropped and an exception will be thrown on the client-side.
If the value is BLOCK then client message producers will block when they try and send further messages.

See the following chapters for more info <xref linkend="flow-control"/>, <xref linkend="paging"/>.
Expand Down
@@ -0,0 +1,44 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2010, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.hornetq.api.core;

import static org.hornetq.api.core.HornetQExceptionType.ADDRESS_FULL;

/**
* @author Justin Bertram
*
* An address is full.
*/
public class HornetQAddressFullException extends HornetQException
{
private static final long serialVersionUID = 0;

public HornetQAddressFullException(String message)
{
super(ADDRESS_FULL, message);
}

public HornetQAddressFullException()
{
super(ADDRESS_FULL);
}
}
Expand Up @@ -266,6 +266,14 @@ HornetQException createException(String msg)
{
return null;
}
},
ADDRESS_FULL(210)
{
@Override
HornetQException createException(String msg)
{
return null;
}
};

private final static Map<Integer, HornetQExceptionType> TYPE_MAP;
Expand Down
Expand Up @@ -30,6 +30,8 @@ public interface ClientProducerCreditManager

void receiveCredits(SimpleString address, int credits);

void receiveFailCredits(SimpleString address, int credits);

void reset();

void close();
Expand Down
Expand Up @@ -114,6 +114,16 @@ public synchronized void receiveCredits(final SimpleString address, final int cr
}
}

public synchronized void receiveFailCredits(final SimpleString address, int credits)
{
ClientProducerCredits cr = producerCredits.get(address);

if (cr != null)
{
cr.receiveFailCredits(credits);
}
}

public synchronized void reset()
{
for (ClientProducerCredits credits : producerCredits.values())
Expand Down Expand Up @@ -186,6 +196,10 @@ public void receiveCredits(int credits)
{
}

public void receiveFailCredits(int credits)
{
}

public boolean isBlocked()
{
return false;
Expand Down
Expand Up @@ -13,6 +13,8 @@

package org.hornetq.core.client.impl;

import org.hornetq.api.core.HornetQException;

/**
* A ClientProducerCredits
*
Expand All @@ -22,10 +24,12 @@
*/
public interface ClientProducerCredits
{
void acquireCredits(int credits) throws InterruptedException;
void acquireCredits(int credits) throws InterruptedException, HornetQException;

void receiveCredits(int credits);


void receiveFailCredits(int credits);

boolean isBlocked();

void init();
Expand Down
Expand Up @@ -13,16 +13,16 @@

package org.hornetq.core.client.impl;

import java.util.concurrent.Semaphore;

import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.HornetQMessageBundle;

import java.util.concurrent.Semaphore;

/**
* A ClientProducerCreditsImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
public class ClientProducerCreditsImpl implements ClientProducerCredits
{
Expand All @@ -31,17 +31,21 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits
private final int windowSize;

private volatile boolean closed;

private boolean blocked;

private final SimpleString address;

private final ClientSessionInternal session;

private int pendingCredits;

private int arriving;

private int refCount;

private boolean serverRespondedWithFail;

public ClientProducerCreditsImpl(final ClientSessionInternal session,
final SimpleString address,
final int windowSize)
Expand All @@ -56,19 +60,27 @@ public ClientProducerCreditsImpl(final ClientSessionInternal session,

semaphore = new Semaphore(0, false);
}

public void init()
{
// We initial request twice as many credits as we request in subsequent requests
// This allows the producer to keep sending as more arrive, minimising pauses
checkCredits(windowSize);
}

public void acquireCredits(final int credits) throws InterruptedException
public void acquireCredits(final int credits) throws InterruptedException, HornetQException
{
checkCredits(credits);

if (!semaphore.tryAcquire(credits))

boolean tryAcquire;

synchronized (this)
{
tryAcquire = semaphore.tryAcquire(credits);
}

if (!tryAcquire)
{
if (!closed)
{
Expand All @@ -83,6 +95,28 @@ public void acquireCredits(final int credits) throws InterruptedException
}
}
}


synchronized (this)
{
pendingCredits -= credits;
}

// check to see if the blocking mode is FAIL on the server
synchronized (this)
{
if (serverRespondedWithFail)
{
serverRespondedWithFail = false;

// remove existing credits to force the client to ask the server for more on the next send
semaphore.drainPermits();
pendingCredits = 0;
arriving = 0;

throw HornetQMessageBundle.BUNDLE.addressIsFull(address.toString(), credits);
}
}
}

public boolean isBlocked()
Expand All @@ -105,14 +139,22 @@ public void receiveCredits(final int credits)
semaphore.release(credits);
}

public void receiveFailCredits(final int credits)
{
serverRespondedWithFail = true;
// receive credits like normal to keep the sender from blocking
receiveCredits(credits);
}

public synchronized void reset()
{
// Any arriving credits from before failover won't arrive, so we re-initialise
// Any pendingCredits credits from before failover won't arrive, so we re-initialise

semaphore.drainPermits();

int beforeFailure = arriving;
int beforeFailure = pendingCredits;

pendingCredits = 0;
arriving = 0;

// If we are waiting for more credits than what's configured, then we need to use what we tried before
Expand Down Expand Up @@ -155,6 +197,7 @@ private void checkCredits(final int credits)
{
toRequest = needed - arriving;

pendingCredits += toRequest;
arriving += toRequest;
}
}
Expand All @@ -169,5 +212,4 @@ private void requestCredits(final int credits)
{
session.sendProducerCreditsMessage(credits, address);
}

}
}
Expand Up @@ -271,6 +271,23 @@ private void doSend(final SimpleString address, final Message msg) throws Hornet

session.workDone();

try
{
// This will block if credits are not available

// Note, that for a large message, the encode size only includes the properties + headers
// Not the continuations, but this is ok since we are only interested in limiting the amount of
// data in *memory* and continuations go straight to the disk

if (!isLarge)
{
theCredits.acquireCredits(msgI.getEncodeSize());
}
}
catch (InterruptedException e)
{
}

if (isLarge)
{
largeMessageSend(sendBlocking, msgI, theCredits);
Expand All @@ -288,23 +305,6 @@ private void doSend(final SimpleString address, final Message msg) throws Hornet
channel.sendBatched(packet);
}
}

try
{
// This will block if credits are not available

// Note, that for a large message, the encode size only includes the properties + headers
// Not the continuations, but this is ok since we are only interested in limiting the amount of
// data in *memory* and continuations go straight to the disk

if (!isLarge)
{
theCredits.acquireCredits(msgI.getEncodeSize());
}
}
catch (InterruptedException e)
{
}
}

private void checkClosed() throws HornetQException
Expand Down
Expand Up @@ -1262,6 +1262,11 @@ public void handleReceiveProducerCredits(final SimpleString address, final int c
producerCreditManager.receiveCredits(address, credits);
}

public void handleReceiveProducerFailCredits(final SimpleString address, int credits)
{
producerCreditManager.receiveFailCredits(address, credits);
}

public ClientProducerCreditManager getProducerCreditManager()
{
return producerCreditManager;
Expand Down
Expand Up @@ -87,6 +87,8 @@ public interface ClientSessionInternal extends ClientSession

void handleReceiveProducerCredits(SimpleString address, int credits);

void handleReceiveProducerFailCredits(SimpleString address, int credits);

ClientProducerCreditManager getProducerCreditManager();

void setAddress(Message message, SimpleString address);
Expand Down

0 comments on commit 17f2ba0

Please sign in to comment.