Skip to content

Commit

Permalink
ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 authored and clebertsuconic committed Jul 26, 2019
1 parent 8963cd9 commit 64ba930
Showing 1 changed file with 45 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -729,58 +730,68 @@ protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean f

@Override
public void setStarted(final boolean started) {
synchronized (lock) {
boolean locked = lockDelivery();

lockDelivery(locked -> {
// This is to make sure nothing would sneak to the client while started = false
// the client will stop the session and perform a rollback in certain cases.
// in case something sneaks to the client you could get to messaging delivering forever until
// you restart the server
try {
this.started = browseOnly || started;
} finally {
if (locked) {
lockDelivery.writeLock().unlock();
}
}
}

this.started = browseOnly || started;
});
// Outside the lock
if (started) {
promptDelivery();
}
}

private boolean lockDelivery() {
try {
if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
if (server != null) {
server.threadDump();
private static final long LOCK_DELIVERY_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
private static final long TRY_LOCK_NS = TimeUnit.MILLISECONDS.toNanos(100);

private boolean lockDelivery(java.util.function.Consumer<Boolean> task) {
final long startWait = System.nanoTime();
long now;
while (((now = System.nanoTime()) - startWait) < LOCK_DELIVERY_TIMEOUT_NS) {
try {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
synchronized (lock) {
task.accept(false);
}
return false;
}
return true;
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
return false;
synchronized (lock) {
if (lockDelivery.writeLock().tryLock()) {
try {
task.accept(true);
} finally {
lockDelivery.writeLock().unlock();
}
return true;
}
}
//entering the lock can take some time: discount that time from the
//time before attempting to lock delivery
final long timeToLock = System.nanoTime() - now;
if (timeToLock < TRY_LOCK_NS) {
final long timeToWait = TRY_LOCK_NS - timeToLock;
LockSupport.parkNanos(timeToWait);
}
}
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
if (server != null) {
server.threadDump();
}
synchronized (lock) {
task.accept(false);
}
return false;
}

@Override
public void setTransferring(final boolean transferring) {
synchronized (lock) {
// This is to make sure that the delivery process has finished any pending delivery
// otherwise a message may sneak in on the client while we are trying to stop the consumer
boolean locked = lockDelivery();
try {
this.transferring = transferring;
} finally {
if (locked) {
lockDelivery.writeLock().unlock();
}
}
}
lockDelivery(locked -> this.transferring = transferring);

// Outside the lock
if (transferring) {
Expand Down

0 comments on commit 64ba930

Please sign in to comment.