diff --git a/src/test/stress/Events.java b/src/test/stress/Events.java index 8329dd7f..6207ba73 100644 --- a/src/test/stress/Events.java +++ b/src/test/stress/Events.java @@ -49,6 +49,8 @@ import org.openrdf.model.ValueFactory; import org.openrdf.model.vocabulary.RDF; import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; import org.openrdf.query.TupleQueryResult; import org.openrdf.query.TupleQueryResultHandler; import org.openrdf.query.TupleQueryResultHandlerException; @@ -820,16 +822,18 @@ class Querier implements Callable, Closeable { private int id; private String timestamp; private final RandomDate dateMaker; + private final QueryLanguage language; AGRepositoryConnection conn; - public Querier(int theId, int theSecondsToRun, RandomDate dateMaker) throws Exception { + public Querier(int theId, int theSecondsToRun, RandomDate dateMaker, QueryLanguage lang) throws Exception { id = theId; secondsToRun = theSecondsToRun; this.dateMaker = dateMaker; + language = lang; conn = connect(); } - private long randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean trace) { + private long sparqlQuery(AGRepositoryConnection conn, ValueFactory vf, boolean trace) { // Pick a random customer String customerNT = NTriplesUtil.toNTriplesString(new RandomCustomer().makeValue()); @@ -850,28 +854,81 @@ private long randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean t String queryString; AGTupleQuery tupleQuery; - if (Defaults.hasOption("sparql")) { - queryString = String.format( - "select ?s ?p ?o " + - "from %s " + - "where { " + - " ?s %s ?date . " + - " filter ( ( ?date >= %s ) && ( ?date <= %s ) ) " + - " ?s ?p ?o " + - "}", - customerNT, timestamp, startNT, endNT); - tupleQuery = conn.prepareTupleQuery(AGQueryLanguage.SPARQL, queryString); - } else { - queryString = String.format( - "(select (?s ?p ?o)" + - "(:use-planner nil)" + - "(q- ?s !%s (? !%s !%s) !%s)" + - "(q- ?s ?p ?o !%s))", - timestamp, startNT, endNT, customerNT, customerNT); - tupleQuery = conn.prepareTupleQuery(AGQueryLanguage.PROLOG, queryString); - } + queryString = String.format( + "select ?s ?p ?o " + + "from %s " + + "where { " + + " ?s %s ?date . " + + " filter ( ( ?date >= %s ) && ( ?date <= %s ) ) " + + " ?s ?p ?o " + + "}", + customerNT, timestamp, startNT, endNT); + tupleQuery = conn.prepareTupleQuery(AGQueryLanguage.SPARQL, queryString); tupleQuery = streamQuery(tupleQuery); + + // Actually pull the full results to the client, then just count them + TupleQueryResult result = null; + long count = 0; + try { + if (Defaults.VERBOSE > 0 && trace) { + trace("query: %s", queryString); + } + if (Defaults.stream == Defaults.STREAM.HAND || Defaults.stream == Defaults.STREAM.PULH) { + CountingHandler handler = new CountingHandler(); + tupleQuery.evaluate(handler); + count = handler.count; + } else { + result = tupleQuery.evaluate(); + count = count(result); + } + // test sparql and prolog return same results: + // Set stmts = Stmt.statementSet(result); + // count = stmts.size(); + // AGAbstractTest.assertSetsEqual(queryString, stmts, + // Stmt.statementSet(tupleQuery2.evaluate())); + } catch (Exception e) { + errors++; + trace("Error executing query:\n%s\n", queryString); + e.printStackTrace(); + count = -1; + } finally { + Events.this.close(result); + } + + return count; + } + + private long prologQuery(AGRepositoryConnection conn, ValueFactory vf, boolean trace) { + // Pick a random customer + String customerNT = NTriplesUtil.toNTriplesString(new RandomCustomer().makeValue()); + // Pick a random date range + GregorianCalendar start, end; + start = FullDateRange.getRandom(); + end = FullDateRange.getRandom(); + + if (start.after(end)) { + GregorianCalendar swap = end; + end = start; + start = swap; + } + + String startNT, endNT; + startNT = NTriplesUtil.toNTriplesString(CalendarToValue(start)); + endNT = NTriplesUtil.toNTriplesString(CalendarToValue(end)); + + String queryString; + AGTupleQuery tupleQuery; + + queryString = String.format( + "(select (?s ?p ?o)" + + "(:use-planner nil)" + + "(q- ?s !%s (? !%s !%s) !%s)" + + "(q- ?s ?p ?o !%s))", + timestamp, startNT, endNT, customerNT, customerNT); + tupleQuery = conn.prepareTupleQuery(AGQueryLanguage.PROLOG, queryString); + tupleQuery = streamQuery(tupleQuery); + // Actually pull the full results to the client, then just count them TupleQueryResult result = null; long count = 0; @@ -902,10 +959,19 @@ private long randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean t } return count; + } + + private long randomQuery(AGRepositoryConnection conn, ValueFactory vf, boolean trace) { + + if(this.language == AGQueryLanguage.PROLOG) { + return prologQuery(conn, vf, trace); + } else { + return sparqlQuery(conn, vf, trace); + } } class CountingHandler implements TupleQueryResultHandler { - long count = 0; + long count = 0; public void startQueryResult(List bindingNames) throws TupleQueryResultHandlerException { } public void handleSolution(BindingSet bindingSet) throws TupleQueryResultHandlerException { @@ -915,8 +981,8 @@ public void endQueryResult() throws TupleQueryResultHandlerException { } } - private int count(TupleQueryResult result) throws Exception { - int count = 0; + private int count(TupleQueryResult result) throws Exception { + int count = 0; while (result.hasNext()) { result.next(); count++; @@ -1383,21 +1449,26 @@ public void run(String[] args) throws Exception { trace("Phase 0 Begin: Launching child query workers."); ExecutorService executor = Executors.newFixedThreadPool(Defaults.QUERY_WORKERS); - List> queriers = new ArrayList>(Defaults.QUERY_WORKERS); + List> sparqlQueriers = new ArrayList>(Defaults.QUERY_WORKERS); + List> prologQueriers = new ArrayList>(Defaults.QUERY_WORKERS); for (int task = 0; task < Defaults.QUERY_WORKERS; task++) { - queriers.add(new Querier(task, Defaults.QUERY_TIME*60, FullDateRange)); + sparqlQueriers.add(new Querier(task, Defaults.QUERY_TIME*60, FullDateRange, + AGQueryLanguage.SPARQL)); + prologQueriers.add(new Querier(task, Defaults.QUERY_TIME*60, FullDateRange, + AGQueryLanguage.PROLOG)); } end = System.currentTimeMillis(); seconds = (end - start) / 1000.0; trace("Phase 0 End: Initial query_workers took " + seconds + " seconds."); - trace("Phase 4 Begin: Perform customer/date range queries with %d processes for %d minutes.", + trace("Phase 4 Begin: Perform SPARQL queries with %d processes for %d minutes.", Defaults.QUERY_WORKERS, Defaults.QUERY_TIME); Monitor.start(4); int queries = 0; long triples = 0; start = System.currentTimeMillis(); + // sparql first, then prolog second try { - List> fs = executor.invokeAll(queriers); + List> fs = executor.invokeAll(sparqlQueriers); for (Future f : fs) { QueryResult queryResult = (QueryResult) f.get(); queries += queryResult.queries; @@ -1418,13 +1489,45 @@ public void run(String[] args) throws Exception { logtime(triples/seconds), logtime(queries/seconds), triples/queries, ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed()); Monitor.stop(4, conn); + + closeAll(sparqlQueriers); + + trace("Phase 5 Begin: Perform PROLOG queries with %d processes for %d minutes.", + Defaults.QUERY_WORKERS, Defaults.QUERY_TIME); + Monitor.start(5); + queries = 0; + triples = 0; + start = System.currentTimeMillis(); + // sparql first, then prolog second + try { + List> fs = executor.invokeAll(prologQueriers); + for (Future f : fs) { + QueryResult queryResult = (QueryResult) f.get(); + queries += queryResult.queries; + triples += queryResult.triples; + } + } catch (InterruptedException e) { + errors++; + e.printStackTrace(); + } catch (ExecutionException e) { + errors++; + e.printStackTrace(); + } + end = System.currentTimeMillis(); + seconds = (end - start) / 1000.0; + trace("Phase 5 End: %d total triples returned over %d queries in " + + "%.1f seconds (%.2f triples/second, %.2f queries/second, " + + "%d triples/query).", triples, queries, logtime(seconds), + logtime(triples/seconds), logtime(queries/seconds), triples/queries); + Monitor.stop(5, conn); + executor.shutdown(); - closeAll(queriers); + closeAll(prologQueriers); } } /////////////////////////////////////////////////////////////////////// PHASE 5 - if (Defaults.PHASE <= 5 && Defaults.DELETE_WORKERS > 0) { + if (Defaults.PHASE <= 6 && Defaults.DELETE_WORKERS > 0) { long triplesStart = conn.size(); long start = System.currentTimeMillis(), end; double seconds; @@ -1440,8 +1543,8 @@ public void run(String[] args) throws Exception { end = System.currentTimeMillis(); seconds = (end - start) / 1000.0; trace("Phase 0 End: Initial delete_workers took " + seconds + " seconds."); - trace("Phase 5 Begin: Shrink store by 1 month."); - Monitor.start(5); + trace("Phase 6 Begin: Shrink store by 1 month."); + Monitor.start(6); start = System.currentTimeMillis(); invokeAndGetAll(executor, tasks); end = System.currentTimeMillis(); @@ -1450,14 +1553,14 @@ public void run(String[] args) throws Exception { long triplesEnd = conn.size(); long triples = triplesEnd - triplesStart; seconds = (end - start) / 1000.0; - trace("Phase 5 End: %d total triples deleted in %.1f seconds " + + trace("Phase 6 End: %d total triples deleted in %.1f seconds " + "(%.2f triples/second). Store contains %d triples.", triples, logtime(seconds), logtime(triples/seconds), triplesEnd); - Monitor.stop(5, conn); + Monitor.stop(6, conn); } - if (Defaults.PHASE <= 6 && Defaults.MIXED_RUNS != 0) { - Monitor.start(6); + if (Defaults.PHASE <= 7 && Defaults.MIXED_RUNS != 0) { + Monitor.start(7); RandomDate smallCommitsRange = SmallCommitsRange; RandomDate fullDateRange = FullDateRange; RandomDate deleteRangeOne = DeleteRangeOne; @@ -1481,7 +1584,7 @@ public void run(String[] args) throws Exception { tasks.add(new Loader(task, Defaults.SIZE/10, 1, smallCommitsRange)); } for (int task = 0; task < Defaults.QUERY_WORKERS; task++) { - tasks.add(new Querier(task, Defaults.QUERY_TIME*60, fullDateRange)); + tasks.add(new Querier(task, Defaults.QUERY_TIME*60, fullDateRange, AGQueryLanguage.SPARQL)); } if (Defaults.DELETE_WORKERS > 0) { tasks.add(new Deleter(deleteRangeOne)); @@ -1525,7 +1628,7 @@ public void run(String[] args) throws Exception { closeAll(tasks); } executor.shutdown(); - Monitor.stop(6, conn); + Monitor.stop(7, conn); } /////////////////////////////////////////////////////////////////////// END