Skip to content

Commit

Permalink
ARTEMIS-1898 Proper fix on AtomicRunnables and avoiding leaks
Browse files Browse the repository at this point in the history
GlobalDiskFullTest was broken before this fix.
Basically when using multiple addresses over a session you would miss flow credits on all your producers except to the first one
that ran out of credit.

(cherry picked from commit fceb9ea)
  • Loading branch information
clebertsuconic committed Oct 11, 2018
1 parent 5e415a8 commit dddff16
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 67 deletions.
Expand Up @@ -22,9 +22,9 @@
public abstract class AtomicRunnable implements Runnable {


public static Runnable checkAtomic(Runnable run) {
public static AtomicRunnable checkAtomic(Runnable run) {
if (run instanceof AtomicRunnable) {
return run;
return (AtomicRunnable)run;
} else {
return new AtomicRunnableWithDelegate(run);
}
Expand All @@ -35,6 +35,20 @@ public static Runnable checkAtomic(Runnable run) {
private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran");

public AtomicRunnable reset() {
RAN_UPDATE.set(this, 0);
return this;
}

public AtomicRunnable setRan() {
RAN_UPDATE.set(this, 1);
return this;
}

public boolean isRun() {
return RAN_UPDATE.get(this) == 1;
}

@Override
public void run() {
if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
Expand Down
Expand Up @@ -109,8 +109,6 @@ public class AMQPSessionCallback implements SessionCallback {

private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();

private CreditRunnable creditRunnable;

public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
Expand Down Expand Up @@ -577,49 +575,17 @@ public void onError(int errorCode, String errorMessage) {
});
}

public void offerProducerCredit(final SimpleString address,
final int credits,
final int threshold,
final Receiver receiver) {
/** Will execute a Runnable on an Address when there's space in memory*/
public void flow(final SimpleString address,
Runnable runnable) {
try {
/*
* The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the
* runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this
* may cause a memory leak, one is enough.
* */
if (creditRunnable != null && !creditRunnable.isRun())
return;
PagingManager pagingManager = manager.getServer().getPagingManager();
creditRunnable = new CreditRunnable() {
boolean isRun = false;
@Override
public boolean isRun() {
return isRun;
}

@Override
public void run() {
connection.lock();
try {
if (receiver.getCredit() <= threshold) {
int topUp = credits - receiver.getCredit();
if (topUp > 0) {
receiver.flow(topUp);
}
}
} finally {
isRun = true;
connection.unlock();
}
connection.flush();
}
};

if (address == null) {
pagingManager.checkMemory(creditRunnable);
pagingManager.checkMemory(runnable);
} else {
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
store.checkMemory(creditRunnable);
store.checkMemory(runnable);
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -791,7 +757,4 @@ public synchronized void setResult(SimpleString parameterAddress, T result) {
}

}
interface CreditRunnable extends Runnable {
boolean isRun();
}
}
Expand Up @@ -34,6 +34,7 @@
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
Expand All @@ -60,6 +61,35 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements

protected final AMQPSessionCallback sessionSPI;

/** We create this AtomicRunnable with setRan.
* This is because we always reuse the same instance.
* In case the creditRunnable was run, we reset and send it over.
* We set it as ran as the first one should always go through */
protected final AtomicRunnable creditRunnable;


/** This Credit Runnable may be used in Mock tests to simulate the credit semantic here */
public static AtomicRunnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) {
return new AtomicRunnable() {
@Override
public void atomicRun() {
connection.lock();
try {
if (receiver.getCredit() <= threshold) {
int topUp = refill - receiver.getCredit();
if (topUp > 0) {
receiver.flow(topUp);
}
}
} finally {
connection.unlock();
}
connection.flush();
}
};
}


/*
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
Expand All @@ -68,7 +98,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements

// Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
private final int minCreditRefresh;
private TerminusExpiryPolicy expiryPolicy;

public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
Expand All @@ -80,11 +109,12 @@ public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
this.sessionSPI = sessionSPI;
this.amqpCredits = connection.getAmqpCredits();
this.minCreditRefresh = connection.getAmqpLowCredits();
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan();
}

@Override
public void onFlow(int credits, boolean drain) {
flow(Math.min(credits, amqpCredits), amqpCredits);
flow();
}

@Override
Expand Down Expand Up @@ -116,7 +146,6 @@ public void initialise() throws Exception {
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
target.setAddress(address.toString());
} else {
// the target will have an address unless the remote is requesting an anonymous
Expand Down Expand Up @@ -182,7 +211,7 @@ public RemotingConnection getRemotingConnection() {
}
}
}
flow(amqpCredits, minCreditRefresh);
flow();
}

public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
Expand Down Expand Up @@ -245,7 +274,7 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {

sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);

flow(amqpCredits, minCreditRefresh);
flow();
} catch (Exception e) {
log.warn(e.getMessage(), e);
Rejected rejected = new Rejected();
Expand All @@ -262,7 +291,7 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {

delivery.disposition(rejected);
delivery.settle();
flow(amqpCredits, minCreditRefresh);
flow();
}
}

Expand All @@ -285,20 +314,18 @@ public void close(ErrorCondition condition) throws ActiveMQAMQPException {
close(false);
}

public void flow(int credits, int threshold) {
public void flow() {
if (!creditRunnable.isRun()) {
return; // nothing to be done as the previous one did not run yet
}

creditRunnable.reset();

// Use the SessionSPI to allocate producer credits, or default, always allocate credit.
if (sessionSPI != null) {
if (receiver.getCredit() <= threshold) {
sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
}
sessionSPI.flow(address, creditRunnable);
} else {
connection.lock();
try {
receiver.flow(credits);
} finally {
connection.unlock();
}
connection.flush();
creditRunnable.run();
}
}

Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Rule;
Expand Down Expand Up @@ -74,7 +75,7 @@ public void testOfferProducerWithNoAddressDoesNotTopOffCreditAboveThreshold() {
// Credit is above threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);

session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));

// Run the credit refill code.
Mockito.verify(pagingManager).checkMemory(argument.capture());
Expand Down Expand Up @@ -105,7 +106,7 @@ public void testOfferProducerWithNoAddressTopsOffCreditAtThreshold() {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);

session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));

// Run the credit refill code.
Mockito.verify(pagingManager).checkMemory(argument.capture());
Expand Down Expand Up @@ -137,7 +138,7 @@ public void testOfferProducerWithAddressDoesNotTopOffCreditAboveThreshold() thro
// Credit is above threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);

session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));

// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
Expand Down Expand Up @@ -169,7 +170,7 @@ public void testOfferProducerWithAddressTopsOffCreditAtThreshold() throws Except
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);

session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));

// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
Expand Down Expand Up @@ -200,7 +201,7 @@ public void testOfferProducerWithNoAddressDoesNotGrantNegativeCredit() {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);

session.offerProducerCredit(null, 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(null, ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));

// Run the credit refill code.
Mockito.verify(pagingManager).checkMemory(argument.capture());
Expand Down Expand Up @@ -232,7 +233,7 @@ public void testOfferProducerWithAddressDoesNotGrantNegativeCredit() throws Exce
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);

session.offerProducerCredit(new SimpleString("test"), 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));

// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
Expand Down

0 comments on commit dddff16

Please sign in to comment.