Permalink
Browse files

bug20566: pool concurrency test

tests added: AGConnPoolSessionTest
tests run:   java test-bigger
performance: no impact

<release-notes>
bug20566: pool concurrency test

Fixed synchronization in pool.
Added httpSocketTimeout property for pool to help with debugging.
Added more javadocs for pool.
</release-notes>

Change-Id: I0caf75386d3f7c4129280255af2dbfd86169e012
Reviewed-on: https://gerrit.franz.com:9080/1751
Reviewed-by: John O'Rourke <jor@franz.com>
Reviewed-by: Ahmon Dancy <dancy@franz.com>
Tested-by: Kevin Layer <layer@franz.com>
  • Loading branch information...
1 parent cc12b99 commit 3736ad0f2db3314afffffadd81a43d87e27b03a9 Mike Hinchey committed with dklayer Nov 1, 2011
@@ -40,6 +40,8 @@ public String getErrorMessage() {
public String toString() {
if (errorType != null) {
StringBuilder sb = new StringBuilder(64);
+ sb.append(getClass().getName());
+ sb.append(": ");
sb.append(errorType);
sb.append(": ");
sb.append(getMessage());
@@ -18,6 +18,7 @@
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HttpClient;
@@ -320,6 +321,7 @@ protected final void releaseConnection(HttpMethod method) {
public void putRepository(String repositoryURL) throws IOException,
RepositoryException, UnauthorizedException, AGHttpException {
+ if (logger.isDebugEnabled()) logger.debug("putRepository: " + repositoryURL);
Header[] headers = new Header[0];
NameValuePair[] params = { new NameValuePair(OVERRIDE_PARAM_NAME, "false") };
put(repositoryURL,headers,params,null);
@@ -17,41 +17,72 @@
*/
public class AGConnConfig {
+
+ /**
+ * @see AGConnProp#serverUrl
+ */
public final String serverUrl;
+
+ /**
+ * @see AGConnProp#username
+ */
public final String username;
+
+ /**
+ * @see AGConnProp#password
+ */
public final String password;
+
+ /**
+ * @see AGConnProp#catalog
+ */
public final String catalog;
+
+ /**
+ * @see AGConnProp#repository
+ */
public final String repository;
+
+ /**
+ * @see AGConnProp#session
+ */
public final Session session;
+
+ /**
+ * @see AGConnProp#sessionLifetime
+ */
public final Integer sessionLifetime;
+ /**
+ * @see AGConnProp#httpSocketTimeout
+ * @since v4.4
+ */
+ public final Integer httpSocketTimeout;
+
public AGConnConfig(Map<AGConnProp, String> props) {
- if (props.containsKey(AGConnProp.serverUrl)) {
- serverUrl = props.get(AGConnProp.serverUrl);
- } else {
- throw new IllegalArgumentException("Property required for AGConn: " + AGConnProp.serverUrl);
- }
- if (props.containsKey(AGConnProp.username)) {
- username = props.get(AGConnProp.username);
- } else {
- throw new IllegalArgumentException("Property required for AGConn: " + AGConnProp.username);
- }
- if (props.containsKey(AGConnProp.password)) {
- password = props.get(AGConnProp.password);
- } else {
- throw new IllegalArgumentException("Property required for AGConn: " + AGConnProp.password);
- }
+ serverUrl = getStringRequired(props, AGConnProp.serverUrl);
+ username = getStringRequired(props, AGConnProp.username);
+ password = getStringRequired(props, AGConnProp.password);
catalog = props.get(AGConnProp.catalog);
- if (props.containsKey(AGConnProp.repository)) {
- repository = props.get(AGConnProp.repository);
+ repository = getStringRequired(props, AGConnProp.repository);
+ session = Session.valueOfCaseInsensitive(props.get(AGConnProp.session), Session.SHARED);
+ sessionLifetime = getInt(props, AGConnProp.sessionLifetime);
+ httpSocketTimeout = getInt(props, AGConnProp.httpSocketTimeout);
+ }
+
+ private Integer getInt(Map<AGConnProp, String> props, AGConnProp prop) {
+ if (props.containsKey(prop)) {
+ return Integer.parseInt( props.get(prop));
} else {
- throw new IllegalArgumentException("Property required for AGConn: " + AGConnProp.repository);
+ return null;
}
- session = Session.valueOfCaseInsensitive(props.get(AGConnProp.session), Session.SHARED);
- if (props.containsKey(AGConnProp.sessionLifetime)) {
- sessionLifetime = Integer.parseInt( props.get(AGConnProp.sessionLifetime));
+ }
+
+ private String getStringRequired(Map<AGConnProp, String> props, AGConnProp prop) {
+ if (props.containsKey(prop)) {
+ return props.get(prop);
} else {
- sessionLifetime = null;
+ throw new IllegalArgumentException("Property required for AGConn: " + prop);
}
}
@@ -8,11 +8,15 @@
package com.franz.agraph.pool;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
import org.apache.commons.pool.PoolableObjectFactory;
import org.openrdf.repository.RepositoryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.franz.agraph.http.AGHTTPClient;
import com.franz.agraph.repository.AGCatalog;
import com.franz.agraph.repository.AGRepository;
import com.franz.agraph.repository.AGRepositoryConnection;
@@ -46,7 +50,18 @@ public Object makeObject() throws Exception {
}
protected AGRepositoryConnection makeConnection() throws Exception {
- AGServer server = closeLater( new AGServer(props.serverUrl, props.username, props.password) );
+ HttpConnectionManagerParams params = new HttpConnectionManagerParams();
+ params.setDefaultMaxConnectionsPerHost(Integer.MAX_VALUE);
+ params.setMaxTotalConnections(Integer.MAX_VALUE);
+ //params.setConnectionTimeout((int) TimeUnit.SECONDS.toMillis(10));
+ if (props.httpSocketTimeout != null) {
+ params.setSoTimeout(props.httpSocketTimeout);
+ }
+
+ HttpConnectionManager manager = closeLater( new MultiThreadedHttpConnectionManager());
+ manager.setParams(params);
+ AGHTTPClient httpClient = new AGHTTPClient(props.serverUrl, manager);
+ AGServer server = closeLater( new AGServer(props.username, props.password, httpClient) );
AGCatalog cat;
if (props.catalog != null) {
cat = server.getCatalog(props.catalog);
@@ -66,7 +81,7 @@ protected AGRepositoryConnection makeConnection() throws Exception {
if (props.sessionLifetime != null) {
conn.setSessionLifetime(props.sessionLifetime);
}
- activateConnection(conn);
+ activateObject(conn);
return conn;
}
@@ -109,7 +124,7 @@ protected void activateConnection(AGRepositoryConnection conn) throws Repository
break;
}
}
-
+
@Override
public void activateObject(Object obj) throws Exception {
activateConnection( (AGRepositoryConnection) obj);
@@ -43,6 +43,8 @@
* AGConnProp.repository, "my_repo",
* AGConnProp.session, AGConnProp.Session.DEDICATED,
* AGPoolProp.shutdownHook, true,
+ * AGPoolProp.maxActive, 10,
+ * AGPoolProp.testOnBorrow, true,
* AGPoolProp.initialSize, 2);
* AGRepositoryConnection conn = pool.borrowConnection();
* try {
@@ -317,7 +319,14 @@ public void ensureIdle(int n) throws Exception {
addObject();
}
}
-
+
+ @Override
+ public void close() {
+ if (log.isDebugEnabled()) log.debug("close " + this);
+ close(delegate);
+ super.close();
+ }
+
protected void finalize() throws Throwable {
if (getNumActive() > 0) {
close();
@@ -8,6 +8,9 @@
package com.franz.agraph.pool;
+import org.apache.commons.httpclient.params.HttpConnectionParams;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+
import com.franz.agraph.repository.AGCatalog;
import com.franz.agraph.repository.AGRepositoryConnection;
import com.franz.agraph.repository.AGServer;
@@ -36,7 +39,7 @@
password,
/**
- * Catalog name or no value for {@link AGServer#getRootCatalog()}
+ * Catalog name - "/" or no value for {@link AGServer#getRootCatalog()}.
* @see AGServer#getCatalog(String)
*/
catalog,
@@ -52,9 +55,23 @@
session,
/**
+ * Sets the 'lifetime' (in seconds) for a dedicated session.
+ * Seconds a session can be idle before being collected.
* @see AGRepositoryConnection#setSessionLifetime(int)
*/
- sessionLifetime;
+ sessionLifetime,
+
+ /**
+ * Socket timeout (SO_TIMEOUT) in milliseconds to be used when executing the method.
+ * A timeout value of zero is interpreted as an infinite timeout.
+ *
+ * <p>WARNING: this may break long queries.</p>
+ *
+ * @see HttpConnectionParams#setSoTimeout(int)
+ * @see HttpMethodParams#SO_TIMEOUT
+ * @since v4.4
+ */
+ httpSocketTimeout;
/**
* Property values for {@link AGConnProp#session}.
@@ -52,19 +52,26 @@
/**
* @see GenericObjectPool#setMinIdle(int)
*/
-
minIdle,
+
/**
* @see GenericObjectPool#setMaxIdle(int)
*/
-
maxIdle,
+
/**
+ * Max number of connections that can be allocated by the pool.
+ * If multiple clients (or different pools), are using the same
+ * AllegroGraph Server, this value should be set to something
+ * less than the SessionPorts.
+ * See <a href="http://www.franz.com/agraph/support/documentation/current/server-installation.html#sessionport"
+ * target="_top">Session Port Setup</a>.
* @see GenericObjectPool#setMaxActive(int)
*/
maxActive,
/**
+ * milliseconds to wait to borrow before throwing {@link java.util.NoSuchElementException}
* @see GenericObjectPool#setMaxWait(long)
*/
maxWait,
@@ -56,6 +56,13 @@ public AGServer(String serverURL, String username, String password) {
rootCatalog = new AGCatalog(this,AGCatalog.ROOT_CATALOG);
}
+ public AGServer(String username, String password, AGHTTPClient httpClient) {
+ this.serverURL = httpClient.getServerURL();
+ this.httpClient = httpClient;
+ this.httpClient.setUsernameAndPassword(username, password);
+ rootCatalog = new AGCatalog(this,AGCatalog.ROOT_CATALOG);
+ }
+
/**
* Returns the URL of this AllegroGraph server.
*
@@ -11,8 +11,9 @@
import info.aduna.iteration.CloseableIteration;
import java.util.Collection;
-import java.util.Deque;
+import java.util.Collections;
import java.util.LinkedList;
+import java.util.List;
import javax.xml.stream.XMLStreamReader;
@@ -37,14 +38,14 @@
private final static Logger log = LoggerFactory.getLogger(Closer.class);
- private final Deque toClose = new LinkedList();
+ private final List toClose = Collections.synchronizedList(new LinkedList());
/**
* Add a resource to be closed with {@link #close()}.
*/
public <Obj extends Object>
Obj closeLater(Obj o) {
- toClose.push(o);
+ toClose.add(0, o);
return o;
}
@@ -66,8 +67,12 @@ public boolean remove(Object o) {
*/
@Override
public void close() {
- while (toClose.isEmpty() == false) {
- close( toClose.pop() );
+ try {
+ while (toClose.isEmpty() == false) {
+ close( toClose.get(0) );
+ }
+ } catch (IndexOutOfBoundsException e) {
+ // ignore, the list must be empty
}
}
@@ -15,30 +15,49 @@
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
/**
- * @deprecated in v4.4 use Closer
* @see Closer
*/
public class Util {
+ /**
+ * @deprecated in v4.4 use Closer
+ * @see Closer
+ */
public static <CloseableType extends Closeable>
CloseableType close(CloseableType o) {
return Closer.Close(o);
}
+ /**
+ * @deprecated in v4.4 use Closer
+ * @see Closer
+ */
public static <CloseableType extends java.io.Closeable>
CloseableType close(CloseableType o) {
return Closer.Close(o);
}
+ /**
+ * @deprecated in v4.4 use Closer
+ * @see Closer
+ */
public static MultiThreadedHttpConnectionManager close(MultiThreadedHttpConnectionManager o) {
return Closer.Close(o);
}
+ /**
+ * @deprecated in v4.4 use Closer
+ * @see Closer
+ */
public static <Elem extends Object, Exc extends Exception>
CloseableIteration<Elem, Exc> close(CloseableIteration<Elem, Exc> o) {
return Closer.Close(o);
}
+ /**
+ * @deprecated in v4.4 use Closer
+ * @see Closer
+ */
public static XMLStreamReader close(XMLStreamReader o) {
return Closer.Close(o);
}
@@ -115,6 +115,16 @@ public static void setUpOnce() {
throw new RuntimeException("server url: " + url, e);
}
}
+
+ public static void deleteRepository(String catalog, String repo) throws RepositoryException {
+ AGServer server = new AGServer(findServerUrl(), username(), password());
+ try {
+ AGCatalog cat = server.getCatalog(catalog);
+ cat.deleteRepository(repo);
+ } finally {
+ Closer.Close(server);
+ }
+ }
private static void ping() throws RepositoryException {
AGRepository repo = cat.createRepository(repoId);
Oops, something went wrong.

0 comments on commit 3736ad0

Please sign in to comment.