Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package org.apache.camel.support.cache;

import java.util.ArrayList;
import java.util.List;
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

Expand Down Expand Up @@ -53,8 +54,6 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements
private final ConcurrentMap<Endpoint, Pool<S>> singlePoolEvicted = new ConcurrentHashMap<>();
private final int capacity;
private final Map<S, S> cache;
// synchronizes access only to cache
private final Object cacheLock;

private interface Pool<S> {
S acquire() throws Exception;
Expand All @@ -75,11 +74,10 @@ public ServicePool(ThrowingFunction<Endpoint, S, Exception> creator, Function<S,
this.getEndpoint = getEndpoint;
this.capacity = capacity;
this.cache = capacity > 0 ? LRUCacheFactory.newLRUCache(capacity, this::onEvict) : null;
this.cacheLock = capacity > 0 ? new Object() : null;
}

/**
* This callback is invoked by LRUCache from a separate background cleanup thread. Therefore we mark the entries to
* This callback is invoked by LRUCache from a separate background cleanup thread. Therefore, we mark the entries to
* be evicted from this thread only, and then let SinglePool and MultiPool handle the evictions (stop the
* producer/consumer safely) when they are acquiring/releases producers/consumers. If we stop the producer/consumer
* from the LRUCache background thread we can have a race condition with a pooled producer may have been acquired at
Expand Down Expand Up @@ -117,15 +115,7 @@ public S acquire(Endpoint endpoint) throws Exception {
}
S s = getOrCreatePool(endpoint).acquire();
if (s != null && cache != null) {
if (isStoppingOrStopped()) {
// during stopping then access to the cache is synchronized
synchronized (cacheLock) {
cache.putIfAbsent(s, s);
}
} else {
// optimize for normal operation
cache.putIfAbsent(s, s);
}
cache.putIfAbsent(s, s);
}
return s;
}
Expand Down Expand Up @@ -182,10 +172,8 @@ protected void doStop() throws Exception {
pool.values().forEach(Pool::stop);
pool.clear();
if (cache != null) {
synchronized (cacheLock) {
cache.values().forEach(ServicePool::stop);
cache.clear();
}
cache.values().forEach(ServicePool::stop);
cache.clear();
}
singlePoolEvicted.values().forEach(Pool::stop);
singlePoolEvicted.clear();
Expand Down Expand Up @@ -295,43 +283,30 @@ void doStop(Service s) {
* thread at any given time.
*/
private class MultiplePool implements Pool<S> {
private final Object lock = new Object();
private final Endpoint endpoint;
private final BlockingQueue<S> queue;
private final List<S> evicts;
private final Deque<S> evicts;

MultiplePool(Endpoint endpoint) {
this.endpoint = endpoint;
this.queue = new ArrayBlockingQueue<>(capacity);
this.evicts = new ArrayList<>();
this.evicts = new ConcurrentLinkedDeque<>();
}

private void cleanupEvicts() {
if (!evicts.isEmpty()) {
synchronized (lock) {
if (!evicts.isEmpty()) {
for (S evict : evicts) {
queue.remove(evict);
// stop the service after having removed it from queue
doStop(evict);
}
evicts.clear();
}
}
for (S evict = evicts.pollFirst(); evict != null; evict = evicts.pollFirst()) {
doStop(evict);
}
}

@Override
public S acquire() throws Exception {
cleanupEvicts();

S s;
synchronized (lock) {
s = queue.poll();
if (s == null) {
s = creator.apply(endpoint);
s.start();
}
S s = queue.poll();
if (s == null) {
s = creator.apply(endpoint);
s.start();
}
return s;
}
Expand All @@ -340,11 +315,9 @@ public S acquire() throws Exception {
public void release(S s) {
cleanupEvicts();

synchronized (lock) {
if (!queue.offer(s)) {
// there is no room so lets just stop and discard this
doStop(s);
}
if (!queue.offer(s)) {
// there is no room so let's just stop and discard this
doStop(s);
}
}

Expand All @@ -355,19 +328,16 @@ public int size() {

@Override
public void stop() {
synchronized (lock) {
queue.forEach(this::doStop);
queue.clear();
pool.remove(endpoint);
}
ArrayList<S> list = new ArrayList<>();
queue.drainTo(list);
pool.remove(endpoint);
list.forEach(this::doStop);
}

@Override
public void evict(S s) {
// to be evicted
synchronized (lock) {
evicts.add(s);
}
evicts.add(s);
}

@Override
Expand Down