From 1fdf3477102d0030a3d06db3349b82336be1c9ea Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Mon, 3 Jun 2024 17:56:23 +0200 Subject: [PATCH] Streamline ServicePool synchronization --- .../camel/support/cache/ServicePool.java | 74 ++++++------------- 1 file changed, 22 insertions(+), 52 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java index de7b5b513e111..72373c96031ec 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java @@ -17,11 +17,12 @@ package org.apache.camel.support.cache; import java.util.ArrayList; -import java.util.List; +import java.util.Deque; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; @@ -53,8 +54,6 @@ abstract class ServicePool extends ServiceSupport implements private final ConcurrentMap> singlePoolEvicted = new ConcurrentHashMap<>(); private final int capacity; private final Map cache; - // synchronizes access only to cache - private final Object cacheLock; private interface Pool { S acquire() throws Exception; @@ -75,11 +74,10 @@ public ServicePool(ThrowingFunction creator, Function 0 ? LRUCacheFactory.newLRUCache(capacity, this::onEvict) : null; - this.cacheLock = capacity > 0 ? new Object() : null; } /** - * This callback is invoked by LRUCache from a separate background cleanup thread. Therefore we mark the entries to + * This callback is invoked by LRUCache from a separate background cleanup thread. Therefore, we mark the entries to * be evicted from this thread only, and then let SinglePool and MultiPool handle the evictions (stop the * producer/consumer safely) when they are acquiring/releases producers/consumers. If we stop the producer/consumer * from the LRUCache background thread we can have a race condition with a pooled producer may have been acquired at @@ -117,15 +115,7 @@ public S acquire(Endpoint endpoint) throws Exception { } S s = getOrCreatePool(endpoint).acquire(); if (s != null && cache != null) { - if (isStoppingOrStopped()) { - // during stopping then access to the cache is synchronized - synchronized (cacheLock) { - cache.putIfAbsent(s, s); - } - } else { - // optimize for normal operation - cache.putIfAbsent(s, s); - } + cache.putIfAbsent(s, s); } return s; } @@ -182,10 +172,8 @@ protected void doStop() throws Exception { pool.values().forEach(Pool::stop); pool.clear(); if (cache != null) { - synchronized (cacheLock) { - cache.values().forEach(ServicePool::stop); - cache.clear(); - } + cache.values().forEach(ServicePool::stop); + cache.clear(); } singlePoolEvicted.values().forEach(Pool::stop); singlePoolEvicted.clear(); @@ -295,29 +283,19 @@ void doStop(Service s) { * thread at any given time. */ private class MultiplePool implements Pool { - private final Object lock = new Object(); private final Endpoint endpoint; private final BlockingQueue queue; - private final List evicts; + private final Deque evicts; MultiplePool(Endpoint endpoint) { this.endpoint = endpoint; this.queue = new ArrayBlockingQueue<>(capacity); - this.evicts = new ArrayList<>(); + this.evicts = new ConcurrentLinkedDeque<>(); } private void cleanupEvicts() { - if (!evicts.isEmpty()) { - synchronized (lock) { - if (!evicts.isEmpty()) { - for (S evict : evicts) { - queue.remove(evict); - // stop the service after having removed it from queue - doStop(evict); - } - evicts.clear(); - } - } + for (S evict = evicts.pollFirst(); evict != null; evict = evicts.pollFirst()) { + doStop(evict); } } @@ -325,13 +303,10 @@ private void cleanupEvicts() { public S acquire() throws Exception { cleanupEvicts(); - S s; - synchronized (lock) { - s = queue.poll(); - if (s == null) { - s = creator.apply(endpoint); - s.start(); - } + S s = queue.poll(); + if (s == null) { + s = creator.apply(endpoint); + s.start(); } return s; } @@ -340,11 +315,9 @@ public S acquire() throws Exception { public void release(S s) { cleanupEvicts(); - synchronized (lock) { - if (!queue.offer(s)) { - // there is no room so lets just stop and discard this - doStop(s); - } + if (!queue.offer(s)) { + // there is no room so let's just stop and discard this + doStop(s); } } @@ -355,19 +328,16 @@ public int size() { @Override public void stop() { - synchronized (lock) { - queue.forEach(this::doStop); - queue.clear(); - pool.remove(endpoint); - } + ArrayList list = new ArrayList<>(); + queue.drainTo(list); + pool.remove(endpoint); + list.forEach(this::doStop); } @Override public void evict(S s) { // to be evicted - synchronized (lock) { - evicts.add(s); - } + evicts.add(s); } @Override