Skip to content

Commit

Permalink
Merge b94c763 into 094cc13
Browse files Browse the repository at this point in the history
  • Loading branch information
psteitz committed Nov 13, 2021
2 parents 094cc13 + b94c763 commit 1bf45d0
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 9 deletions.
6 changes: 6 additions & 0 deletions src/changes/changes.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ The <action> type attribute can be add,update,fix,remove.
<body>
<release version="2.12.0" date="2021-MM-DD" description="This is a maintenance release (Java 8).">
<!-- ADD -->
<action dev="psteitz" type="fix" issue="POOL-401">
Ensure that capacity freed by invalidateObject is available to all keyed pools.
</action>
<action dev="psteitz" type="fix" due-to="Codievilky August" issue="POOL-391">
Ensure capacity freed by clear is made available to GKOP borrowers.
</action>
<action dev="ggregory" type="add" due-to="Gary Gregory">
Add PooledObject.getFullDuration().
</action>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,6 @@ private int calculateDeficit(final ObjectDeque<T> objectDeque) {
* method on each idle instance.
* <p>
* Implementation notes:
* </p>
* <ul>
* <li>This method does not destroy or effect in any way instances that are
* checked out when it is invoked.</li>
Expand All @@ -570,23 +569,43 @@ private int calculateDeficit(final ObjectDeque<T> objectDeque) {
*/
@Override
public void clear() {
poolMap.keySet().forEach(this::clear);
poolMap.keySet().forEach(key -> clear(key,false));
}


/**
* Clears the specified sub-pool, removing all pooled instances
* corresponding to the given {@code key}. Exceptions encountered
* destroying idle instances are swallowed but notified via a
* {@link SwallowedExceptionListener}.
* <p>
* If there are clients waiting to borrow objects, this method will
* attempt to reuse the capacity freed by this operation, adding
* instances to the most loaded keyed pools. To avoid triggering
* possible object creation, use {@link #clear(Object, boolean)}.
*
* @param key the key to clear
*/
@Override
public void clear(final K key) {
clear(key, true);
}

/**
* Clears the specified sub-pool, removing all pooled instances
* corresponding to the given {@code key}. Exceptions encountered
* destroying idle instances are swallowed but notified via a
* {@link SwallowedExceptionListener}.
* <p>
* If reuseCapacity is true and there are clients waiting to
* borrow objects, this method will attempt to reuse the capacity freed
* by this operation, adding instances to the most loaded keyed pools.
*
* @param key the key to clear
* @param reuseCapacity whether or not to reuse freed capacity
*/
public void clear(final K key, boolean reuseCapacity) {
final ObjectDeque<T> objectDeque = register(key);

int freedCapacity = 0;
try {
final LinkedBlockingDeque<PooledObject<T>> idleObjects =
objectDeque.getIdleObjects();
Expand All @@ -595,7 +614,9 @@ public void clear(final K key) {

while (p != null) {
try {
destroy(key, p, true, DestroyMode.NORMAL);
if (destroy(key, p, true, DestroyMode.NORMAL)) {
freedCapacity++;
}
} catch (final Exception e) {
swallowException(e);
}
Expand All @@ -604,8 +625,11 @@ public void clear(final K key) {
} finally {
deregister(key);
}
}
if (reuseCapacity) {
reuseCapacity(freedCapacity);
}

}

/**
* Clears oldest 15% of objects in pool. The method sorts the objects into
Expand Down Expand Up @@ -1306,11 +1330,9 @@ public void invalidateObject(final K key, final T obj, final DestroyMode destroy
synchronized (p) {
if (p.getState() != PooledObjectState.INVALID) {
destroy(key, p, true, destroyMode);
reuseCapacity();
}
}
if (objectDeque.idleObjects.hasTakeWaiters()) {
addObject(key);
}
}

/**
Expand Down Expand Up @@ -1575,6 +1597,17 @@ private void reuseCapacity() {
}
}

/**
* Call {@link #reuseCapacity()} repeatedly.
*
* @param newCapacity number of new instances to attempt to create.
*/
private void reuseCapacity(int newCapacity) {
for (int i = 0; i < newCapacity; i++) {
reuseCapacity();
}
}

/**
* Sets the configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,140 @@ public void testWhenExhaustedBlockClosePool() throws Exception {
assertTrue(wtt.thrown instanceof InterruptedException);
}

/**
* POOL-391 Adapted from code in the JIRA ticket.
*
* @throws Exception May occur in some failure modes
*/
@Test
@Timeout(value = 2000, unit = TimeUnit.MILLISECONDS)
public void testClearUnblocksWaiters() {
final GenericKeyedObjectPoolConfig<Integer> config = new GenericKeyedObjectPoolConfig<>();
config.setMaxTotalPerKey(1);
config.setMinIdlePerKey(0);
config.setMaxIdlePerKey(-1);
config.setMaxTotal(-1);
config.setMaxWait(Duration.ofMillis(5));
GenericKeyedObjectPool<Integer, Integer> testPool = new GenericKeyedObjectPool<>(
new KeyedPooledObjectFactory<Integer, Integer>() {
@Override
public PooledObject<Integer> makeObject(Integer key) throws Exception {
return new DefaultPooledObject<>(10);
}

@Override
public void destroyObject(Integer key, PooledObject<Integer> p) throws Exception {
Thread.sleep(500);
}

@Override
public boolean validateObject(Integer key, PooledObject<Integer> p) {
return true;
}

@Override
public void activateObject(Integer key, PooledObject<Integer> p) throws Exception {
// do nothing
}

@Override
public void passivateObject(Integer key, PooledObject<Integer> p) throws Exception {
// do nothing
}
}, config);
final int borrowKey = 10;
Thread t = new Thread(() -> {
try {
while (true) {
Integer integer = testPool.borrowObject(borrowKey);
testPool.returnObject(borrowKey, integer);
Thread.sleep(10);
}
} catch (Exception e) {
fail();
}
});
Thread t2 = new Thread(() -> {
try {
while (true) {
testPool.clear(borrowKey);
Thread.sleep(10);
}
} catch (Exception e) {
fail();
}
});
t.start();
t2.start();
}

/**
* POOL-391 Verify that when clear(key) is called with reuseCapacity true,
* capacity freed is reused and allocated to most loaded pools.
*
* @throws Exception May occur in some failure modes
*/
@Test
public void testClearReuseCapacity() throws Exception {
gkoPool.setMaxTotalPerKey(6);
gkoPool.setMaxTotal(6);
gkoPool.setMaxWait(Duration.ofSeconds(5));
// Create one thread to wait on "one", two on "two", three on "three"
final ArrayList<Thread> testThreads = new ArrayList<>();
testThreads.add(new Thread(new SimpleTestThread<>(gkoPool, "one")));
testThreads.add(new Thread(new SimpleTestThread<>(gkoPool, "two")));
testThreads.add(new Thread(new SimpleTestThread<>(gkoPool, "two")));
testThreads.add(new Thread(new SimpleTestThread<>(gkoPool, "three")));
testThreads.add(new Thread(new SimpleTestThread<>(gkoPool, "three")));
testThreads.add(new Thread(new SimpleTestThread<>(gkoPool, "three")));
// Borrow two each from "four", "five", "six" - using all capacity
final String four = gkoPool.borrowObject("four");
final String four2 = gkoPool.borrowObject("four");
final String five = gkoPool.borrowObject("five");
final String five2 = gkoPool.borrowObject("five");
final String six = gkoPool.borrowObject("six");
final String six2 = gkoPool.borrowObject("six");
Thread.sleep(100);
// Launch the waiters - all will be blocked waiting
for (Thread t : testThreads) {
t.start();
}
Thread.sleep(100);
// Return and clear the fours - at least one "three" should get served
// Other should be a two or a three (three was most loaded)
gkoPool.returnObject("four", four);
gkoPool.returnObject("four", four2);
gkoPool.clear("four");
Thread.sleep(20);
assertTrue(!testThreads.get(3).isAlive() || !testThreads.get(4).isAlive() || !testThreads.get(5).isAlive());
// Now clear the fives
gkoPool.returnObject("five", five);
gkoPool.returnObject("five", five2);
gkoPool.clear("five");
Thread.sleep(20);
// Clear the sixes
gkoPool.returnObject("six", six);
gkoPool.returnObject("six", six2);
gkoPool.clear("six");
Thread.sleep(20);
for (Thread t : testThreads) {
assertTrue(!t.isAlive());
}
}

@Test
public void testInvalidateFreesCapacityForOtherKeys() throws Exception {
gkoPool.setMaxTotal(1);
gkoPool.setMaxWait(Duration.ofMillis(500));
Thread borrower = new Thread(new SimpleTestThread<>(gkoPool, "two"));
String obj = gkoPool.borrowObject("one");
borrower.start(); // Will block
Thread.sleep(100); // Make sure borrower has started
gkoPool.invalidateObject("one", obj); // Should free capacity to serve the other key
Thread.sleep(20); // Should have been served by now
assertFalse(borrower.isAlive());
}

}


0 comments on commit 1bf45d0

Please sign in to comment.