Skip to content

Commit

Permalink
Fix another race condition where a synchronizer was locked but not un…
Browse files Browse the repository at this point in the history
…locked in some situations
  • Loading branch information
Vampire authored and felldo committed Sep 21, 2023
1 parent c6fc6f1 commit f8eedf6
Showing 1 changed file with 43 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,42 +304,52 @@ private void checkRunningListenersAndStartIfPossible(DispatchQueueSelector queue
}
DispatchQueueSelector finalQueueSelector = queueSelector;
Queue<Runnable> taskQueue = queue;
// if there is something to execute and there is task running already
if (runningListeners.add(finalQueueSelector) && !queue.isEmpty()) {
AtomicReference<Future<?>> activeListener = new AtomicReference<>();
activeListener.set(api.getThreadPool().getExecutorService().submit(() -> {
if (finalQueueSelector instanceof ServerImpl) {
Object serverReadyNotifier = new Object();
((ServerImpl) finalQueueSelector)
.addServerReadyConsumer(s -> {
// if there is no task running already for the queue
if (runningListeners.add(finalQueueSelector)) {
// check whether there is something to do for the queue
// if not, free the runningListeners synchronizer
// otherwise schedule the task to do
if (queue.isEmpty()) {
runningListeners.remove(finalQueueSelector);
} else {
AtomicReference<Future<?>> activeListener = new AtomicReference<>();
activeListener.set(api.getThreadPool().getExecutorService().submit(() -> {
if (finalQueueSelector instanceof ServerImpl) {
Object serverReadyNotifier = new Object();
((ServerImpl) finalQueueSelector)
.addServerReadyConsumer(s -> {
synchronized (serverReadyNotifier) {
serverReadyNotifier.notifyAll();
}
});
while (!((ServerImpl) finalQueueSelector).isReady()) {
try {
synchronized (serverReadyNotifier) {
serverReadyNotifier.notifyAll();
serverReadyNotifier.wait(5000);
}
});
while (!((ServerImpl) finalQueueSelector).isReady()) {
try {
synchronized (serverReadyNotifier) {
serverReadyNotifier.wait(5000);
}
} catch (InterruptedException ignored) { }
} catch (InterruptedException ignored) { }
}
}
}
// Add the future to the list of active listeners
activeListeners.put(activeListener, new Object[]{System.nanoTime(), finalQueueSelector});
try {
taskQueue.poll().run();
} catch (Throwable t) {
logger.error("Unhandled exception in {}!", () -> getThreadType(finalQueueSelector), () -> t);
}
activeListeners.remove(activeListener);
alreadyCanceledListeners.remove(activeListener);
runningListeners.remove(finalQueueSelector);
// Inform the dispatchEvent method that it maybe can queue new listeners now
synchronized (queuedListenerTasks) {
queuedListenerTasks.notifyAll();
}
checkRunningListenersAndStartIfPossible(finalQueueSelector);
}));
// Add the future to the list of active listeners
activeListeners.put(activeListener, new Object[]{System.nanoTime(), finalQueueSelector});
try {
taskQueue.poll().run();
} catch (Throwable t) {
logger.error(
"Unhandled exception in {}!",
() -> getThreadType(finalQueueSelector),
() -> t);
}
activeListeners.remove(activeListener);
alreadyCanceledListeners.remove(activeListener);
runningListeners.remove(finalQueueSelector);
// Inform the dispatchEvent method that it maybe can queue new listeners now
synchronized (queuedListenerTasks) {
queuedListenerTasks.notifyAll();
}
checkRunningListenersAndStartIfPossible(finalQueueSelector);
}));
}
}
}
}
Expand Down

0 comments on commit f8eedf6

Please sign in to comment.