Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

rfe10505: use session to get new blankNodeIds

In Events.java:
Instead of using shared backends for queries, and dedicated
for add and delete phases, use dedicated for all.
Ensure connections are closed.
Count errors from threads and exit app with error if any.

tests-added:   none
tests-run:     prepush

<release-note>
rfe10505: use session to get new blankNodeIds

AGValueFactory was using the main repository url to get
blank node ids.  Use connection session instead if available.
Using the session can reduce load on the primary server.

If the ValueFactory was created by Repository instead
of RepositoryConnection, the old behavior remains.
</release-note>

Change-Id: If39bdc52ed4804e1bfb7e772a14e74e0f3aeb08c
Reviewed-on: https://gerrit.franz.com:9080/1258
Tested-by: Kevin Layer <layer@franz.com>
Reviewed-by: Kevin Layer <layer@franz.com>
  • Loading branch information...
commit 98190966ee56a698e5cb8f69ff25072131d930b7 1 parent 2280c13
@mikehinchey mikehinchey authored dklayer committed
View
9 src/com/franz/agraph/http/AGHttpRepoClient.java
@@ -1783,4 +1783,13 @@ public void disableTripleCache() throws RepositoryException {
throw new RepositoryException(e);
}
}
+
+ public String[] getBlankNodes(int blankNodeAmount) throws RepositoryException {
+ try {
+ return getHTTPClient().getBlankNodes(getRoot(), blankNodeAmount);
+ } catch (IOException e) {
+ throw new RepositoryException(e);
+ }
+ }
+
}
View
14 src/com/franz/agraph/repository/AGRepositoryConnection.java
@@ -107,11 +107,20 @@
private final AGAbstractRepository repository;
private final AGHttpRepoClient repoclient;
+ private final AGValueFactory vf;
- public AGRepositoryConnection(AGAbstractRepository repository, AGHttpRepoClient client) {
+ public AGRepositoryConnection(AGRepository repository, AGHttpRepoClient client) {
super(repository);
this.repository = repository;
this.repoclient = client;
+ vf = new AGValueFactory(repository, this);
+ }
+
+ public AGRepositoryConnection(AGVirtualRepository repository, AGHttpRepoClient client) {
+ super(repository);
+ this.repository = repository;
+ this.repoclient = client;
+ vf = new AGValueFactory(repository.wrapped, this);
}
/*
@@ -130,7 +139,7 @@ public AGHttpRepoClient getHttpRepoClient() {
@Override
public AGValueFactory getValueFactory() {
- return getRepository().getValueFactory();
+ return vf;
}
@Override
@@ -1207,4 +1216,5 @@ public int getUploadCommitPeriod() throws RepositoryException {
return getHttpRepoClient().getUploadCommitPeriod();
}
+
}
View
19 src/com/franz/agraph/repository/AGValueFactory.java
@@ -29,6 +29,7 @@
public class AGValueFactory extends ValueFactoryImpl {
private final AGRepository repository;
+ private final AGRepositoryConnection conn;
private int blankNodeAmount = 100;
private String[] blankNodeIds;
@@ -37,7 +38,13 @@
public AGValueFactory(AGRepository repository) {
super();
this.repository = repository;
- blankNodeIds = new String[blankNodeAmount];
+ this.conn = null;
+ }
+
+ public AGValueFactory(AGRepository repository, AGRepositoryConnection conn) {
+ super();
+ this.repository = repository;
+ this.conn = conn;
}
public AGRepository getRepository() {
@@ -50,15 +57,19 @@ public AGHTTPClient getHTTPClient() {
private void getBlankNodeIds() {
try {
- blankNodeIds = getHTTPClient().getBlankNodes(getRepository().getRepositoryURL(),blankNodeAmount);
+ if (conn == null) {
+ blankNodeIds = getHTTPClient().getBlankNodes(getRepository().getRepositoryURL(), blankNodeAmount);
+ } else {
+ blankNodeIds = conn.getHttpRepoClient().getBlankNodes(blankNodeAmount);
+ }
index = blankNodeIds.length - 1;
} catch (UnauthorizedException e) {
// TODO: check on the proper exceptions to throw here
throw new IllegalStateException(e);
- } catch (IOException e) {
- throw new IllegalStateException(e);
} catch (RepositoryException e) {
throw new IllegalStateException(e);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
}
View
8 src/com/franz/agraph/repository/AGVirtualRepository.java
@@ -10,10 +10,10 @@
import com.franz.util.Closeable;
public class AGVirtualRepository implements AGAbstractRepository, Closeable {
- private AGServer server;
- private AGRepository wrapped;
- private String spec;
- private AGValueFactory vf;
+ private final AGServer server;
+ final AGRepository wrapped;
+ private final String spec;
+ private final AGValueFactory vf;
public AGVirtualRepository(AGServer server, String spec, AGRepository wrapped) {
this.server = server;
View
6 src/test/AGAbstractTest.java
@@ -151,7 +151,7 @@ public void tearDown() throws Exception {
@AfterClass
public static void tearDownOnce() throws Exception {
cat = null;
- server = Closer.close(server);
+ server = Util.close(server);
}
AGRepositoryConnection getConnection() throws RepositoryException {
@@ -290,7 +290,7 @@ public static void printRows(CloseableIteration rows) throws Exception {
while (rows.hasNext()) {
println(rows.next());
}
- close(rows);
+ Util.close(rows);
}
public static void printRows(String headerMsg, int limit, CloseableIteration rows) throws Exception {
@@ -301,7 +301,7 @@ public static void printRows(String headerMsg, int limit, CloseableIteration row
count++;
}
println("Number of results: " + count);
- close(rows);
+ Util.close(rows);
}
// static void close(RepositoryConnection conn) {
View
50 src/test/Closer.java
@@ -1,7 +1,6 @@
package test;
-import info.aduna.iteration.CloseableIteration;
-
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -29,50 +28,31 @@ Obj closeLater(Obj o) {
* Must be called in a finally block, to close all resources
* added with closeLater().
*/
+ @Override
public void close() {
while (toClose.isEmpty() == false) {
- Object o = toClose.get(0);
- close(o);
- while (toClose.remove(o)) {
- }
+ close( toClose.get(0) );
}
}
-
+
/**
- * TODO: move to com.franz.util.Util
+ * Close all objects immediately, will not be closed "later".
*/
- public static <Elem extends Object, Exc extends Exception>
- CloseableIteration<Elem, Exc> close(CloseableIteration<Elem, Exc> o) {
- if (o != null) {
- try {
- o.close();
- } catch (Exception e) {
- System.err.println("ignoring error with close:" + e);
- e.printStackTrace();
- }
- }
- return null;
+ public Collection closeAll(Collection objects) {
+ for (Object object : objects) {
+ close(object);
+ }
+ return null;
}
/**
- * TODO: move to com.franz.util.Util
+ * Close an object immediately, will not be closed "later".
*/
- public static <Obj extends Object>
+ public <Obj extends Object>
Obj close(Obj o) {
- if (o instanceof Closeable) {
- com.franz.util.Util.close((Closeable)o);
- } else if (o instanceof java.io.Closeable) {
- com.franz.util.Util.close((java.io.Closeable)o);
- } else if (o instanceof CloseableIteration) {
- close((CloseableIteration)o);
- } else if (o != null) {
- try {
- o.getClass().getMethod("close").invoke(o);
- } catch (Exception e) {
- System.err.println("ignoring error with close:" + e);
- e.printStackTrace();
- }
- }
+ test.Util.close(o);
+ while (toClose.remove(o)) {
+ }
return null;
}
View
33 src/test/Util.java
@@ -8,6 +8,8 @@
package test;
+import info.aduna.iteration.CloseableIteration;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -19,6 +21,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
import java.util.zip.GZIPOutputStream;
import com.franz.util.Closeable;
@@ -33,11 +36,33 @@ public static String get(String[] arr, int i, String defaultVal) {
return defaultVal;
}
- public static Object close(Object o) {
+ /**
+ * TODO: move to com.franz.util.Util
+ */
+ public static <Elem extends Object, Exc extends Exception>
+ CloseableIteration<Elem, Exc> close(CloseableIteration<Elem, Exc> o) {
+ if (o != null) {
+ try {
+ o.close();
+ } catch (Exception e) {
+ System.err.println("ignoring error with close:" + e);
+ e.printStackTrace();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * TODO: move to com.franz.util.Util
+ */
+ public static <Obj extends Object>
+ Obj close(Obj o) {
if (o instanceof Closeable) {
- close((Closeable)o);
+ com.franz.util.Util.close((Closeable)o);
} else if (o instanceof java.io.Closeable) {
- close((java.io.Closeable)o);
+ com.franz.util.Util.close((java.io.Closeable)o);
+ } else if (o instanceof CloseableIteration) {
+ close((CloseableIteration)o);
} else if (o != null) {
try {
o.getClass().getMethod("close").invoke(o);
@@ -200,5 +225,5 @@ public static List reverse(List list) {
Collections.reverse(list);
return list;
}
-
+
}
View
226 src/test/stress/Events.java
@@ -1,5 +1,5 @@
/******************************************************************************
-** Copyright (c) 2008-2010 Franz Inc.
+** Copyright (c) 2008-2011 Franz Inc.
** All rights reserved. This program and the accompanying materials
** are made available under the terms of the Eclipse Public License v1.0
** which accompanies this distribution, and is available at
@@ -30,6 +30,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
@@ -53,18 +54,23 @@
import org.openrdf.repository.RepositoryException;
import org.openrdf.rio.ntriples.NTriplesUtil;
+import test.Closer;
+
import com.franz.agraph.repository.AGCatalog;
import com.franz.agraph.repository.AGQueryLanguage;
import com.franz.agraph.repository.AGRepository;
import com.franz.agraph.repository.AGRepositoryConnection;
import com.franz.agraph.repository.AGServer;
import com.franz.agraph.repository.AGTupleQuery;
+import com.franz.util.Closeable;
import com.franz.util.Util;
-public class Events {
+public class Events extends Closer {
static private final Random RANDOM = new Random();
-
+
+ long errors = 0;
+
private static class Defaults {
private static CommandLine cmd;
@@ -348,25 +354,21 @@ public static void trace(String format, Object... values) {
System.out.println(sb.toString());
}
- public static AGRepositoryConnection connect(boolean shared) throws RepositoryException {
- AGServer server = new AGServer(findServerUrl(), username(), password());
- AGCatalog catalog = server.getCatalog(Defaults.CATALOG);
- AGRepository repository = catalog.createRepository(Defaults.REPOSITORY);
- repository.initialize();
- AGRepositoryConnection conn = repository.getConnection();
-
- if (!shared) {
- // Force an auto-committing non-shared backend
- conn.setAutoCommit(false);
- conn.setAutoCommit(true);
- }
-
- return conn;
+ public AGRepositoryConnection connect() throws RepositoryException {
+ AGServer server = closeLater( new AGServer(findServerUrl(), username(), password()) );
+ AGCatalog cat = server.getCatalog(Defaults.CATALOG);
+ AGRepository repo = closeLater( cat.createRepository(Defaults.REPOSITORY) );
+ repo.initialize();
+ AGRepositoryConnection conn = closeLater( repo.getConnection() );
+ // Force an auto-committing non-shared backend
+ conn.setAutoCommit(false);
+ conn.setAutoCommit(true);
+ trace("Dedicated backend: " + conn.getHttpRepoClient().getRoot());
+ return conn;
}
-
+
private static class ThreadVars {
private static ThreadLocal<ValueFactory> valueFactory = new ThreadLocal<ValueFactory>();
- private static ThreadLocal<AGRepositoryConnection> connection = new ThreadLocal<AGRepositoryConnection>();
private static ThreadLocal<RandomDate> dateMaker = new ThreadLocal<RandomDate>();
private static ThreadLocal<DatatypeFactory> datatypeFactory = new ThreadLocal<DatatypeFactory>() {
protected DatatypeFactory initialValue() {
@@ -425,7 +427,7 @@ public String toString() {
BaselineRange.end);
static private final RandomDate FullDateRange = new RandomDate(BaselineRange.start,
SmallCommitsRange.end);
-
+
private static interface RandomCallback {
public Value makeValue();
}
@@ -725,32 +727,26 @@ public static int makeEvent(Vector<Statement> statements, int index) {
}
}
- private static class Loader implements Callable<Object> {
+ class Loader implements Callable<Object>, Closeable {
private int id;
private int loopCount;
private int eventsPerCommit;
private int triplesPerCommit;
private final RandomDate dateMaker;
+ private AGRepositoryConnection conn;
- public Loader(int theId, int theTripleGoal, int theEventsPerCommit, RandomDate dateMaker) {
+ public Loader(int theId, int theTripleGoal, int theEventsPerCommit, RandomDate dateMaker) throws Exception {
id = theId;
this.dateMaker = dateMaker;
triplesPerCommit = theEventsPerCommit * Defaults.EVENT_SIZE;
loopCount = theTripleGoal / triplesPerCommit / Defaults.LOAD_WORKERS;
eventsPerCommit = theEventsPerCommit;
+ conn = connect();
}
public Integer call() {
Thread.currentThread().setName("loader(" + id + ")");
ThreadVars.dateMaker.set(dateMaker);
- AGRepositoryConnection conn;
- try {
- conn = connect(false);
- } catch (RepositoryException e) {
- e.printStackTrace();
- return -1;
- }
- ThreadVars.connection.set(conn);
ThreadVars.valueFactory.set(conn.getValueFactory());
final int statusSize = Defaults.STATUS;
@@ -782,6 +778,7 @@ public Integer call() {
conn.add(statements);
count += triplesPerCommit;
} catch (Exception e) {
+ errors++;
trace("Error adding statements...");
e.printStackTrace();
}
@@ -790,14 +787,13 @@ public Integer call() {
trace("Loading Done - %d triples at %d triples " +
"per commit, %d errors.", count, triplesPerCommit, errors);
- try {
- conn.close();
- } catch (RepositoryException e) {
- e.printStackTrace();
- }
-
return 0;
}
+
+ @Override
+ public void close() {
+ Events.this.close(conn);
+ }
}
private static class QueryResult {
@@ -810,16 +806,18 @@ public QueryResult(int theQueries, int theTriples) {
}
}
- private static class Querier implements Callable<Object> {
+ class Querier implements Callable<Object>, Closeable {
private int secondsToRun;
private int id;
private String timestamp;
private final RandomDate dateMaker;
+ AGRepositoryConnection conn;
- public Querier(int theId, int theSecondsToRun, RandomDate dateMaker) {
+ public Querier(int theId, int theSecondsToRun, RandomDate dateMaker) throws Exception {
id = theId;
secondsToRun = theSecondsToRun;
this.dateMaker = dateMaker;
+ conn = connect();
}
private int randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean trace) {
@@ -879,11 +877,12 @@ private int randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean tr
// AGAbstractTest.assertSetsEqual(queryString, stmts,
// Stmt.statementSet(tupleQuery2.evaluate()));
} catch (Exception e) {
+ errors++;
trace("Error executing query:\n%s\n", queryString);
e.printStackTrace();
count = -1;
} finally {
- Util.close(result);
+ Events.this.close(result);
}
return count;
@@ -901,15 +900,7 @@ private int count(TupleQueryResult result) throws Exception {
public QueryResult call() {
Thread.currentThread().setName("query(" + id + ")");
ThreadVars.dateMaker.set(dateMaker);
- AGRepositoryConnection conn;
ValueFactory vf;
- try {
- conn = connect(true);
- } catch (RepositoryException e) {
- e.printStackTrace();
- return null;
- }
- ThreadVars.connection.set(conn);
ThreadVars.valueFactory.set(vf = conn.getValueFactory());
timestamp = NTriplesUtil.toNTriplesString(vf.createURI(Defaults.NS, "EventTimeStamp"));
@@ -952,34 +943,28 @@ public QueryResult call() {
"(%f queries/second, %d triples per query), %d queries aborted.",
count, queries, logtime(seconds), logtime(queries/seconds), count/queries, restarts);
- try {
- conn.close();
- } catch (RepositoryException e) {
- e.printStackTrace();
- }
-
return new QueryResult(queries, count);
}
- }
+
+ @Override
+ public void close() {
+ Events.this.close(conn);
+ }
+ }
- private static class Deleter implements Callable<Object> {
+ class Deleter implements Callable<Object>, Closeable {
+
private final RandomDate range;
+ private AGRepositoryConnection conn;
- public Deleter(RandomDate range) {
+ public Deleter(RandomDate range) throws Exception {
this.range = range;
+ conn = connect();
}
public Integer call() {
Thread.currentThread().setName("deleter(" + range + ")");
- AGRepositoryConnection conn;
- try {
- conn = connect(false);
- } catch (RepositoryException e) {
- e.printStackTrace();
- return 0;
- }
ValueFactory vf;
- ThreadVars.connection.set(conn);
ThreadVars.valueFactory.set(vf = conn.getValueFactory());
String timestamp = NTriplesUtil.toNTriplesString(vf.createURI(Defaults.NS, "EventTimeStamp"));
@@ -1035,11 +1020,12 @@ public Integer call() {
trace("delete counts differ: size-diff: %d, query-count: %d", count, sizeDiff);
}
} catch (Exception e) {
+ errors++;
trace("Error executing query:\n%s\n", queryString);
e.printStackTrace();
count = -1;
} finally {
- Util.close(result);
+ Events.this.close(result);
}
} else {
queryString = String.format("(select0 (?event)" +
@@ -1058,6 +1044,7 @@ public Integer call() {
trace("delete counts differ: size-diff: %d, query-count: %d", count, count1);
}
} catch (Exception e) {
+ errors++;
trace("Error executing query:\n%s\n", queryString);
e.printStackTrace();
}
@@ -1076,10 +1063,13 @@ public Integer call() {
trace("Found %d events (%d triples) to delete.", events, events * Defaults.EVENT_SIZE);
- Util.close(conn);
-
return -1;
}
+
+ @Override
+ public void close() {
+ Events.this.close(conn);
+ }
}
public static class Monitor {
@@ -1123,11 +1113,29 @@ static public void stop() {
}
}
}
-
+
/**
* @param args Run with --help
*/
public static void main(String[] args) throws Exception {
+ Events events = new Events();
+ try {
+ events.run(args);
+ } catch (Exception e) {
+ System.err.println(e);
+ e.printStackTrace();
+ System.exit(-1);
+ } finally {
+ Util.close(events);
+ }
+ if (events.errors > 0) {
+ // exit with error
+ throw new Exception("Errors during execution: " + events.errors);
+ }
+ System.exit(0);
+ }
+
+ public void run(String[] args) throws Exception {
Defaults.init(args);
Thread.currentThread().setName("./events");
@@ -1140,15 +1148,12 @@ public static void main(String[] args) throws Exception {
AGServer server = new AGServer(Defaults.URL, Defaults.USERNAME, Defaults.PASSWORD);
AGCatalog catalog = server.getCatalog(Defaults.CATALOG);
-
if (false == Defaults.hasOption("open")) {
catalog.deleteRepository(Defaults.REPOSITORY);
}
+ server.close();
- AGRepository repository = catalog.createRepository(Defaults.REPOSITORY);
- repository.initialize();
- AGRepositoryConnection conn = repository.getConnection();
- ThreadVars.connection.set(conn);
+ AGRepositoryConnection conn = connect();
ThreadVars.valueFactory.set(conn.getValueFactory());
AllEvents.initialize();
@@ -1165,34 +1170,36 @@ public static void main(String[] args) throws Exception {
long start, end, triples;
double seconds;
ExecutorService executor = Executors.newFixedThreadPool(Defaults.LOAD_WORKERS);
- List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(Defaults.LOAD_WORKERS);
- for (int task = 0; task < Defaults.LOAD_WORKERS; task++) {
- tasks.add(new Loader(task, Defaults.SIZE / 10, 1, BaselineRange));
- }
/////////////////////////////////////////////////////////////////////// PHASE 1
if (Defaults.PHASE <= 1) {
+ List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(Defaults.LOAD_WORKERS);
+ for (int task = 0; task < Defaults.LOAD_WORKERS; task++) {
+ tasks.add(new Loader(task, Defaults.SIZE / 10, 1, BaselineRange));
+ }
trace("Phase 1 Begin: Baseline %d triple commits.", Defaults.EVENT_SIZE);
Monitor.start("phase-1");
- start = startTime;
+ start = System.currentTimeMillis();
invokeAndGetAll(executor, tasks);
end = System.currentTimeMillis();
triplesEnd = conn.size();
triples = triplesEnd - triplesStart;
seconds = (end - start) / 1000.0;
trace("Phase 1 End: %d total triples added in %.1f seconds " +
- "(%.2f triples/second, %.2f commits/second). " +
- "Store contains %d triples.", triples, logtime(seconds),
- logtime(triples/seconds),
- logtime(triples/Defaults.EVENT_SIZE/seconds), triplesEnd);
+ "(%.2f triples/second, %.2f commits/second). " +
+ "Store contains %d triples.", triples, logtime(seconds),
+ logtime(triples/seconds),
+ logtime(triples/Defaults.EVENT_SIZE/seconds), triplesEnd);
Monitor.stop(); // sync phase after phase-1 complete.
+ closeAll(tasks);
}
/////////////////////////////////////////////////////////////////////// PHASE 2
if (Defaults.PHASE <= 2) {
triplesStart = triplesEnd;
+ List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(Defaults.LOAD_WORKERS);
for (int task = 0; task < (Defaults.LOAD_WORKERS); task++) {
- tasks.set(task, new Loader(task, Defaults.SIZE*9/10, Defaults.BULK_EVENTS, BulkRange));
+ tasks.add(task, new Loader(task, Defaults.SIZE*9/10, Defaults.BULK_EVENTS, BulkRange));
}
trace("Phase 2 Begin: Grow store by about %d triples.", (Defaults.SIZE*9/10));
Monitor.start("phase-2");
@@ -1203,17 +1210,19 @@ public static void main(String[] args) throws Exception {
triples = triplesEnd - triplesStart;
seconds = (end - start) / 1000.0;
trace("Phase 2 End: %d total triples bulk-loaded in %.1f seconds " +
- "(%.2f triples/second, %.2f commits/second). " +
- "Store contains %d triples.", triples, seconds, triples/seconds,
- triples/Defaults.BULK_EVENTS/Defaults.EVENT_SIZE/seconds, triplesEnd);
+ "(%.2f triples/second, %.2f commits/second). " +
+ "Store contains %d triples.", triples, seconds, triples/seconds,
+ triples/Defaults.BULK_EVENTS/Defaults.EVENT_SIZE/seconds, triplesEnd);
Monitor.stop();
+ closeAll(tasks);
}
/////////////////////////////////////////////////////////////////////// PHASE 3
if (Defaults.PHASE <= 3) {
triplesStart = triplesEnd;
+ List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(Defaults.LOAD_WORKERS);
for (int task = 0; task < Defaults.LOAD_WORKERS; task++) {
- tasks.set(task, new Loader(task, Defaults.SIZE/10, 1, SmallCommitsRange));
+ tasks.add(task, new Loader(task, Defaults.SIZE/10, 1, SmallCommitsRange));
}
trace("Phase 3 Begin: Perform %d triple commits.", Defaults.EVENT_SIZE);
Monitor.start("phase-3");
@@ -1224,11 +1233,12 @@ public static void main(String[] args) throws Exception {
triples = triplesEnd - triplesStart;
seconds = (end - start) / 1000.0;
trace("Phase 3 End: %d total triples added in %.1f seconds " +
- "(%.2f triples/second, %.2f commits/second). " +
- "Store contains %d triples.", triples, seconds, triples/seconds,
- triples/Defaults.EVENT_SIZE/seconds, triplesEnd);
- Monitor.stop();
+ "(%.2f triples/second, %.2f commits/second). " +
+ "Store contains %d triples.", triples, seconds, triples/seconds,
+ triples/Defaults.EVENT_SIZE/seconds, triplesEnd);
+ Monitor.stop();
executor.shutdown();
+ closeAll(tasks);
}
}
@@ -1246,7 +1256,7 @@ public static void main(String[] args) throws Exception {
Monitor.start("phase-4");
int queries = 0;
long triples = 0;
- Calendar start = GregorianCalendar.getInstance();
+ long start = System.currentTimeMillis();
try {
List<Future<Object>> fs = executor.invokeAll(queriers);
for (Future<Object> f : fs) {
@@ -1255,18 +1265,21 @@ public static void main(String[] args) throws Exception {
triples += queryResult.triples;
}
} catch (InterruptedException e) {
+ errors++;
e.printStackTrace();
} catch (ExecutionException e) {
+ errors++;
e.printStackTrace();
}
- Calendar end = GregorianCalendar.getInstance();
- double seconds = (end.getTimeInMillis() - start.getTimeInMillis()) / 1000.0;
+ long end = System.currentTimeMillis();
+ double seconds = (end - start) / 1000.0;
trace("Phase 4 End: %d total triples returned over %d queries in " +
"%.1f seconds (%.2f triples/second, %.2f queries/second, " +
"%d triples/query).", triples, queries, logtime(seconds),
logtime(triples/seconds), logtime(queries/seconds), triples/queries);
Monitor.stop();
executor.shutdown();
+ closeAll(queriers);
}
}
@@ -1284,6 +1297,7 @@ public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
invokeAndGetAll(executor, tasks);
long end = System.currentTimeMillis();
+ closeAll(tasks);
executor.shutdown();
long triplesEnd = conn.size();
long triples = triplesEnd - triplesStart;
@@ -1307,7 +1321,6 @@ public static void main(String[] args) throws Exception {
long triples = 0;
long added = 0;
long deleted = 0;
- Calendar start = GregorianCalendar.getInstance();
smallCommitsRange = smallCommitsRange.next(Calendar.DAY_OF_YEAR, 30);
fullDateRange = new RandomDate(deleteRangeTwo.end, smallCommitsRange.end);
deleteRangeOne = deleteRangeTwo.next(Calendar.DAY_OF_YEAR, 15);
@@ -1329,6 +1342,7 @@ public static void main(String[] args) throws Exception {
}
}
+ long start = System.currentTimeMillis();
try {
List<Future<Object>> fs = executor.invokeAll(tasks);
for (Future f : fs) {
@@ -1347,17 +1361,20 @@ public static void main(String[] args) throws Exception {
}
}
} catch (InterruptedException e) {
+ errors++;
e.printStackTrace();
} catch (ExecutionException e) {
+ errors++;
e.printStackTrace();
}
- Calendar end = GregorianCalendar.getInstance();
- double seconds = (end.getTimeInMillis() - start.getTimeInMillis()) / 1000.0;
+ long end = System.currentTimeMillis();
+ double seconds = (end - start) / 1000.0;
trace("Phase 6 End: %d total triples returned over %d queries in " +
- "%.1f seconds (%.2f triples/second, %.2f queries/second, " +
- "%d triples/query, %d triples added, %d deletes).", triples, queries,
- logtime(seconds), logtime(triples/seconds), logtime(queries/seconds),
- (queries==0 ? 0 : triples/queries), added, deleted);
+ "%.1f seconds (%.2f triples/second, %.2f queries/second, " +
+ "%d triples/query, %d triples added, %d deletes).", triples, queries,
+ logtime(seconds), logtime(triples/seconds), logtime(queries/seconds),
+ (queries==0 ? 0 : triples/queries), added, deleted);
+ closeAll(tasks);
}
executor.shutdown();
Monitor.stop();
@@ -1370,12 +1387,9 @@ public static void main(String[] args) throws Exception {
trace("Test completed in %.1f total seconds - store contains %d triples (%d triples added/removed).",
logtime(totalSeconds), triplesEnd, triples);
-
- conn.close();
- repository.shutDown();
}
- private static <Type> void invokeAndGetAll(ExecutorService executor,
+ private <Type> void invokeAndGetAll(ExecutorService executor,
List<Callable<Type>> tasks) {
try {
List<Future<Type>> fs = executor.invokeAll(tasks);
@@ -1383,8 +1397,10 @@ public static void main(String[] args) throws Exception {
f.get();
}
} catch (InterruptedException e) {
+ errors++;
e.printStackTrace();
} catch (ExecutionException e) {
+ errors++;
e.printStackTrace();
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.