/
ConnectionPoolWithConcurrentValidation.java
100 lines (86 loc) · 3.14 KB
/
ConnectionPoolWithConcurrentValidation.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* Copyright (C) 2000-2024 Atomikos <info@atomikos.com>
*
* LICENSE CONDITIONS
*
* See http://www.atomikos.com/Main/WhichLicenseApplies for details.
*/
package com.atomikos.datasource.pool;
import java.util.Iterator;
import com.atomikos.logging.Logger;
import com.atomikos.logging.LoggerFactory;
public class ConnectionPoolWithConcurrentValidation<ConnectionType> extends ConnectionPool<ConnectionType>
{
private static final Logger LOGGER = LoggerFactory.createLogger(ConnectionPoolWithConcurrentValidation.class);
public ConnectionPoolWithConcurrentValidation ( ConnectionFactory<ConnectionType> connectionFactory , ConnectionPoolProperties properties ) throws ConnectionPoolException
{
super(connectionFactory, properties);
}
@Override
protected ConnectionType recycleConnectionIfPossible() throws Exception {
ConnectionType ret = null;
XPooledConnection<ConnectionType> xpc = findFirstRecyclablePooledConnectionForCallingThread();
if (xpc != null) {
ret = concurrentlyTryToRecycle(xpc);
}
return ret;
}
@Override
protected ConnectionType retrieveFirstAvailableConnection() {
ConnectionType ret = null;
XPooledConnection<ConnectionType> xpc = claimFirstAvailablePooledConnection();
if (xpc != null) {
ret = concurrentlyTryToUse(xpc);
}
return ret;
}
private ConnectionType concurrentlyTryToRecycle(XPooledConnection<ConnectionType> xpc) throws Exception {
ConnectionType ret = null;
synchronized(xpc) { // just to be sure, although concurrent threads should not happen
if (xpc.canBeRecycledForCallingThread()) {
ret = xpc.createConnectionProxy();
}
}
return ret;
}
private ConnectionType concurrentlyTryToUse(XPooledConnection<ConnectionType> xpc) {
ConnectionType ret = null;
try {
ret = xpc.createConnectionProxy();
// here, connection is no longer available for other threads
} catch ( CreateConnectionException ex ) {
String msg = this + ": error creating proxy of connection " + xpc;
LOGGER.logDebug( msg , ex);
removePooledConnection(xpc);
} finally {
logCurrentPoolSize();
}
return ret;
}
private synchronized XPooledConnection<ConnectionType> claimFirstAvailablePooledConnection() {
XPooledConnection<ConnectionType> ret = null;
Iterator<XPooledConnection<ConnectionType>> it = connections.iterator();
while ( it.hasNext() && ret == null ) {
XPooledConnection<ConnectionType> xpc = it.next();
if (xpc.markAsBeingAcquiredIfAvailable()) {
ret = xpc;
}
}
return ret;
}
private synchronized XPooledConnection<ConnectionType> findFirstRecyclablePooledConnectionForCallingThread() {
XPooledConnection<ConnectionType> ret = null;
Iterator<XPooledConnection<ConnectionType>> it = connections.iterator();
while ( it.hasNext() && ret == null ) {
XPooledConnection<ConnectionType> xpc = it.next();
if (xpc.canBeRecycledForCallingThread()) {
ret = xpc;
}
}
return ret;
}
private synchronized void removePooledConnection(XPooledConnection<ConnectionType> xpc) {
connections.remove(xpc);
destroyPooledConnection(xpc);
}
}