Skip to content

Commit

Permalink
Separate prolog and sparql query phases in the events test.
Browse files Browse the repository at this point in the history
Expand the query phase into two phases, one for sparql queries, one
for prolog. Initialization of the workers for each phase still occurs
prior to the first of these phases.

make test-suite run? no

release notes: non-user-visible code change.

Change-Id: I00e76e39fc6810c5636507a6b90bf18814cf5c45
  • Loading branch information
Mikel Bancroft committed Aug 10, 2011
1 parent 3b0976c commit 7a4b85a
Showing 1 changed file with 142 additions and 39 deletions.
181 changes: 142 additions & 39 deletions src/test/stress/Events.java
Expand Up @@ -49,6 +49,8 @@
import org.openrdf.model.ValueFactory;
import org.openrdf.model.vocabulary.RDF;
import org.openrdf.query.BindingSet;
import org.openrdf.query.QueryLanguage;
import org.openrdf.query.TupleQuery;
import org.openrdf.query.TupleQueryResult;
import org.openrdf.query.TupleQueryResultHandler;
import org.openrdf.query.TupleQueryResultHandlerException;
Expand Down Expand Up @@ -820,16 +822,18 @@ class Querier implements Callable<Object>, Closeable {
private int id;
private String timestamp;
private final RandomDate dateMaker;
private final QueryLanguage language;
AGRepositoryConnection conn;

public Querier(int theId, int theSecondsToRun, RandomDate dateMaker) throws Exception {
public Querier(int theId, int theSecondsToRun, RandomDate dateMaker, QueryLanguage lang) throws Exception {
id = theId;
secondsToRun = theSecondsToRun;
this.dateMaker = dateMaker;
language = lang;
conn = connect();
}

private long randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean trace) {
private long sparqlQuery(AGRepositoryConnection conn, ValueFactory vf, boolean trace) {
// Pick a random customer
String customerNT = NTriplesUtil.toNTriplesString(new RandomCustomer().makeValue());

Expand All @@ -850,28 +854,81 @@ private long randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean t

String queryString;
AGTupleQuery tupleQuery;
if (Defaults.hasOption("sparql")) {
queryString = String.format(
"select ?s ?p ?o " +
"from %s " +
"where { " +
" ?s %s ?date . " +
" filter ( ( ?date >= %s ) && ( ?date <= %s ) ) " +
" ?s ?p ?o " +
"}",
customerNT, timestamp, startNT, endNT);
tupleQuery = conn.prepareTupleQuery(AGQueryLanguage.SPARQL, queryString);
} else {
queryString = String.format(
"(select (?s ?p ?o)" +
"(:use-planner nil)" +
"(q- ?s !%s (? !%s !%s) !%s)" +
"(q- ?s ?p ?o !%s))",
timestamp, startNT, endNT, customerNT, customerNT);
tupleQuery = conn.prepareTupleQuery(AGQueryLanguage.PROLOG, queryString);
}
queryString = String.format(
"select ?s ?p ?o " +
"from %s " +
"where { " +
" ?s %s ?date . " +
" filter ( ( ?date >= %s ) && ( ?date <= %s ) ) " +
" ?s ?p ?o " +
"}",
customerNT, timestamp, startNT, endNT);
tupleQuery = conn.prepareTupleQuery(AGQueryLanguage.SPARQL, queryString);
tupleQuery = streamQuery(tupleQuery);

// Actually pull the full results to the client, then just count them
TupleQueryResult result = null;
long count = 0;
try {
if (Defaults.VERBOSE > 0 && trace) {
trace("query: %s", queryString);
}
if (Defaults.stream == Defaults.STREAM.HAND || Defaults.stream == Defaults.STREAM.PULH) {
CountingHandler handler = new CountingHandler();
tupleQuery.evaluate(handler);
count = handler.count;
} else {
result = tupleQuery.evaluate();
count = count(result);
}
// test sparql and prolog return same results:
// Set<Stmt> stmts = Stmt.statementSet(result);
// count = stmts.size();
// AGAbstractTest.assertSetsEqual(queryString, stmts,
// Stmt.statementSet(tupleQuery2.evaluate()));
} catch (Exception e) {
errors++;
trace("Error executing query:\n%s\n", queryString);
e.printStackTrace();
count = -1;
} finally {
Events.this.close(result);
}

return count;
}

private long prologQuery(AGRepositoryConnection conn, ValueFactory vf, boolean trace) {
// Pick a random customer
String customerNT = NTriplesUtil.toNTriplesString(new RandomCustomer().makeValue());

// Pick a random date range
GregorianCalendar start, end;
start = FullDateRange.getRandom();
end = FullDateRange.getRandom();

if (start.after(end)) {
GregorianCalendar swap = end;
end = start;
start = swap;
}

String startNT, endNT;
startNT = NTriplesUtil.toNTriplesString(CalendarToValue(start));
endNT = NTriplesUtil.toNTriplesString(CalendarToValue(end));

String queryString;
AGTupleQuery tupleQuery;

queryString = String.format(
"(select (?s ?p ?o)" +
"(:use-planner nil)" +
"(q- ?s !%s (? !%s !%s) !%s)" +
"(q- ?s ?p ?o !%s))",
timestamp, startNT, endNT, customerNT, customerNT);
tupleQuery = conn.prepareTupleQuery(AGQueryLanguage.PROLOG, queryString);
tupleQuery = streamQuery(tupleQuery);

// Actually pull the full results to the client, then just count them
TupleQueryResult result = null;
long count = 0;
Expand Down Expand Up @@ -902,10 +959,19 @@ private long randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean t
}

return count;
}

private long randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean trace) {

if(this.language == AGQueryLanguage.PROLOG) {
return prologQuery(conn, vf, trace);
} else {
return sparqlQuery(conn, vf, trace);
}
}

class CountingHandler implements TupleQueryResultHandler {
long count = 0;
long count = 0;
public void startQueryResult(List<String> bindingNames) throws TupleQueryResultHandlerException {
}
public void handleSolution(BindingSet bindingSet) throws TupleQueryResultHandlerException {
Expand All @@ -915,8 +981,8 @@ public void endQueryResult() throws TupleQueryResultHandlerException {
}
}

private int count(TupleQueryResult result) throws Exception {
int count = 0;
private int count(TupleQueryResult result) throws Exception {
int count = 0;
while (result.hasNext()) {
result.next();
count++;
Expand Down Expand Up @@ -1383,21 +1449,26 @@ public void run(String[] args) throws Exception {
trace("Phase 0 Begin: Launching child query workers.");

ExecutorService executor = Executors.newFixedThreadPool(Defaults.QUERY_WORKERS);
List<Callable<Object>> queriers = new ArrayList<Callable<Object>>(Defaults.QUERY_WORKERS);
List<Callable<Object>> sparqlQueriers = new ArrayList<Callable<Object>>(Defaults.QUERY_WORKERS);
List<Callable<Object>> prologQueriers = new ArrayList<Callable<Object>>(Defaults.QUERY_WORKERS);
for (int task = 0; task < Defaults.QUERY_WORKERS; task++) {
queriers.add(new Querier(task, Defaults.QUERY_TIME*60, FullDateRange));
sparqlQueriers.add(new Querier(task, Defaults.QUERY_TIME*60, FullDateRange,
AGQueryLanguage.SPARQL));
prologQueriers.add(new Querier(task, Defaults.QUERY_TIME*60, FullDateRange,
AGQueryLanguage.PROLOG));
}
end = System.currentTimeMillis();
seconds = (end - start) / 1000.0;
trace("Phase 0 End: Initial query_workers took " + seconds + " seconds.");
trace("Phase 4 Begin: Perform customer/date range queries with %d processes for %d minutes.",
trace("Phase 4 Begin: Perform SPARQL queries with %d processes for %d minutes.",
Defaults.QUERY_WORKERS, Defaults.QUERY_TIME);
Monitor.start(4);
int queries = 0;
long triples = 0;
start = System.currentTimeMillis();
// sparql first, then prolog second
try {
List<Future<Object>> fs = executor.invokeAll(queriers);
List<Future<Object>> fs = executor.invokeAll(sparqlQueriers);
for (Future<Object> f : fs) {
QueryResult queryResult = (QueryResult) f.get();
queries += queryResult.queries;
Expand All @@ -1418,13 +1489,45 @@ public void run(String[] args) throws Exception {
logtime(triples/seconds), logtime(queries/seconds), triples/queries,
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed());
Monitor.stop(4, conn);

closeAll(sparqlQueriers);

trace("Phase 5 Begin: Perform PROLOG queries with %d processes for %d minutes.",
Defaults.QUERY_WORKERS, Defaults.QUERY_TIME);
Monitor.start(5);
queries = 0;
triples = 0;
start = System.currentTimeMillis();
// sparql first, then prolog second
try {
List<Future<Object>> fs = executor.invokeAll(prologQueriers);
for (Future<Object> f : fs) {
QueryResult queryResult = (QueryResult) f.get();
queries += queryResult.queries;
triples += queryResult.triples;
}
} catch (InterruptedException e) {
errors++;
e.printStackTrace();
} catch (ExecutionException e) {
errors++;
e.printStackTrace();
}
end = System.currentTimeMillis();
seconds = (end - start) / 1000.0;
trace("Phase 5 End: %d total triples returned over %d queries in " +
"%.1f seconds (%.2f triples/second, %.2f queries/second, " +
"%d triples/query).", triples, queries, logtime(seconds),
logtime(triples/seconds), logtime(queries/seconds), triples/queries);
Monitor.stop(5, conn);

executor.shutdown();
closeAll(queriers);
closeAll(prologQueriers);
}
}

/////////////////////////////////////////////////////////////////////// PHASE 5
if (Defaults.PHASE <= 5 && Defaults.DELETE_WORKERS > 0) {
if (Defaults.PHASE <= 6 && Defaults.DELETE_WORKERS > 0) {
long triplesStart = conn.size();
long start = System.currentTimeMillis(), end;
double seconds;
Expand All @@ -1440,8 +1543,8 @@ public void run(String[] args) throws Exception {
end = System.currentTimeMillis();
seconds = (end - start) / 1000.0;
trace("Phase 0 End: Initial delete_workers took " + seconds + " seconds.");
trace("Phase 5 Begin: Shrink store by 1 month.");
Monitor.start(5);
trace("Phase 6 Begin: Shrink store by 1 month.");
Monitor.start(6);
start = System.currentTimeMillis();
invokeAndGetAll(executor, tasks);
end = System.currentTimeMillis();
Expand All @@ -1450,14 +1553,14 @@ public void run(String[] args) throws Exception {
long triplesEnd = conn.size();
long triples = triplesEnd - triplesStart;
seconds = (end - start) / 1000.0;
trace("Phase 5 End: %d total triples deleted in %.1f seconds " +
trace("Phase 6 End: %d total triples deleted in %.1f seconds " +
"(%.2f triples/second). Store contains %d triples.", triples,
logtime(seconds), logtime(triples/seconds), triplesEnd);
Monitor.stop(5, conn);
Monitor.stop(6, conn);
}

if (Defaults.PHASE <= 6 && Defaults.MIXED_RUNS != 0) {
Monitor.start(6);
if (Defaults.PHASE <= 7 && Defaults.MIXED_RUNS != 0) {
Monitor.start(7);
RandomDate smallCommitsRange = SmallCommitsRange;
RandomDate fullDateRange = FullDateRange;
RandomDate deleteRangeOne = DeleteRangeOne;
Expand All @@ -1481,7 +1584,7 @@ public void run(String[] args) throws Exception {
tasks.add(new Loader(task, Defaults.SIZE/10, 1, smallCommitsRange));
}
for (int task = 0; task < Defaults.QUERY_WORKERS; task++) {
tasks.add(new Querier(task, Defaults.QUERY_TIME*60, fullDateRange));
tasks.add(new Querier(task, Defaults.QUERY_TIME*60, fullDateRange, AGQueryLanguage.SPARQL));
}
if (Defaults.DELETE_WORKERS > 0) {
tasks.add(new Deleter(deleteRangeOne));
Expand Down Expand Up @@ -1525,7 +1628,7 @@ public void run(String[] args) throws Exception {
closeAll(tasks);
}
executor.shutdown();
Monitor.stop(6, conn);
Monitor.stop(7, conn);
}

/////////////////////////////////////////////////////////////////////// END
Expand Down

0 comments on commit 7a4b85a

Please sign in to comment.