Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge branch 'm1' of git:/repo/git/agraph-java-client into m1

  • Loading branch information...
commit 32d04b0d0127b10fb2c9f23fd52020741c851aa2 2 parents d1a1af8 + 79fff8a
dklayer authored November 13, 2009

Showing 1 changed file with 176 additions and 411 deletions. Show diff stats Hide diff stats

  1. 587  src/test/stress/Events.java
587  src/test/stress/Events.java
@@ -11,12 +11,16 @@
11 11
 import org.openrdf.model.vocabulary.RDF;
12 12
 import org.openrdf.repository.RepositoryException;
13 13
 import org.openrdf.repository.RepositoryResult;
  14
+import org.openrdf.rio.ntriples.NTriplesUtil;
  15
+import org.openrdf.query.BindingSet;
  16
+import org.openrdf.query.TupleQuery;
14 17
 import org.openrdf.query.TupleQueryResult;
15 18
 import org.openrdf.query.QueryLanguage;
16 19
 import org.openrdf.query.MalformedQueryException;
17 20
 import org.openrdf.query.QueryEvaluationException;
18 21
 
19 22
 import com.franz.agraph.repository.AGCatalog;
  23
+import com.franz.agraph.repository.AGQueryLanguage;
20 24
 import com.franz.agraph.repository.AGRepository;
21 25
 import com.franz.agraph.repository.AGRepositoryConnection;
22 26
 import com.franz.agraph.repository.AGServer;
@@ -43,7 +47,7 @@ private static String with_default(String theValue, String theDefault) {
43 47
 	}
44 48
 	
45 49
 	static private final Random RANDOM = new Random();
46  
-	static private final String LOCALHOST = "10.211.55.5";
  50
+	static private final String LOCALHOST = "localhost";
47 51
 	static private final String AG_HOST = with_default(System.getenv("AGRAPH_HOST"), LOCALHOST);
48 52
 	static private final int AG_PORT = Integer.parseInt(with_default(System.getenv("AGRAPH_PORT"), "10035"));
49 53
 	static private final String AG_USER = with_default(System.getenv("AGRAPH_USER"), "test");
@@ -96,11 +100,11 @@ public static AGRepositoryConnection connect(boolean shared) throws RepositoryEx
96 100
         repository.initialize();
97 101
         AGRepositoryConnection conn = repository.getConnection();
98 102
         
99  
-        //if (!shared) {
  103
+        if (!shared) {
100 104
         	// Force an auto-committing non-shared backend 
101  
-        //	conn.setAutoCommit(false);
102  
-        //	conn.setAutoCommit(true);
103  
-        //}
  105
+        	conn.setAutoCommit(false);
  106
+        	conn.setAutoCommit(true);
  107
+        }
104 108
         
105 109
         return conn;
106 110
     }
@@ -440,10 +444,10 @@ public static int makeEvent(Vector<Statement> statements, int index) {
440 444
 		    }
441 445
 
442 446
 		    BNode bnode = ThreadVars.valueFactory.get().createBNode();
443  
-		    Resource graph = (Resource) new RandomCustomer().makeValue();
444  
-
  447
+		    Resource context = (Resource) new RandomCustomer().makeValue();
  448
+		    
445 449
 		    for (int i = 0; i < Defaults.EVENT_SIZE; index++, i++) {
446  
-		    	statements.set(index, event[i].makeStatement(bnode, graph));
  450
+		    	statements.set(index, event[i].makeStatement(bnode, context));
447 451
 		    }
448 452
 			
449 453
 			return index;
@@ -475,11 +479,10 @@ public Integer call() {
475 479
 	        ThreadVars.connection.set(conn);
476 480
 	        ThreadVars.valueFactory.set(conn.getValueFactory());
477 481
 
478  
-	        int statusSize = 50;
479  
-	        int count = 0, errors = 0;
480  
-	        Calendar start = GregorianCalendar.getInstance();
481  
-	        Calendar end = null;
  482
+	        int statusSize = 50, count = 0, errors = 0;
482 483
 	        Vector<Statement> statements = new Vector<Statement>(triplesPerCommit);
  484
+	        Calendar start = GregorianCalendar.getInstance(), end;
  485
+
483 486
 	        statements.setSize(triplesPerCommit);
484 487
 
485 488
 	        for (int loop = 0; loop < loopCount; loop++) {
@@ -521,6 +524,134 @@ public Integer call() {
521 524
 		}
522 525
 	}
523 526
 
  527
+	private static class QueryResult {
  528
+		public int queries;
  529
+		public int triples;
  530
+		
  531
+		public QueryResult(int theQueries, int theTriples) {
  532
+			queries = theQueries;
  533
+			triples = theTriples;
  534
+		}
  535
+	}
  536
+	
  537
+	private static class Querier implements Callable<QueryResult> {
  538
+		private int secondsToRun;
  539
+		private int id;
  540
+		private String timestamp;
  541
+		
  542
+		public Querier(int theId, int theSecondsToRun) {
  543
+			id = theId;
  544
+			secondsToRun = theSecondsToRun;
  545
+		}
  546
+		
  547
+		private int randomQuery(AGRepositoryConnection conn, ValueFactory vf) {
  548
+			// Pick a random customer
  549
+			String customerNT = NTriplesUtil.toNTriplesString(new RandomCustomer().makeValue());
  550
+			
  551
+			// Pick a random date range
  552
+			GregorianCalendar start, end;
  553
+			start = FullDateRange.getRandom();
  554
+			end = FullDateRange.getRandom();
  555
+
  556
+			if (start.after(end)) {
  557
+				GregorianCalendar swap = end;
  558
+				end = start;
  559
+				start = swap;
  560
+			}
  561
+
  562
+			String startNT, endNT;
  563
+			startNT = NTriplesUtil.toNTriplesString(vf.createLiteral(ThreadVars.datatypeFactory.get().newXMLGregorianCalendar(start)));
  564
+			endNT = NTriplesUtil.toNTriplesString(vf.createLiteral(ThreadVars.datatypeFactory.get().newXMLGregorianCalendar(end)));
  565
+			
  566
+			// Perform the query in prolog
  567
+			String queryString = String.format(
  568
+					"(select (?event ?pred ?obj)" +
  569
+					"(:use-planner nil)" +
  570
+					"(q- ?event !%s (? !%s !%s) !%s)" +
  571
+					"(q- ?event ?pred ?obj))", timestamp, startNT, endNT, customerNT);
  572
+
  573
+			int count = 0;
  574
+
  575
+			// Actually pull the full results to the client, then just count them
  576
+		    TupleQuery tupleQuery = conn.prepareTupleQuery(AGQueryLanguage.PROLOG, queryString);
  577
+	        TupleQueryResult result;
  578
+			try {
  579
+				result = tupleQuery.evaluate();
  580
+				while (result.hasNext()) {
  581
+		            result.next();
  582
+		            count++;
  583
+		        }
  584
+		        result.close();
  585
+			} catch (QueryEvaluationException e) {
  586
+	            trace("Error executing query:\n%s\n", queryString);
  587
+				e.printStackTrace();
  588
+	            count = -1;
  589
+			}
  590
+
  591
+			return count;
  592
+		}		
  593
+			    
  594
+		public QueryResult call() {
  595
+			Thread.currentThread().setName("query(" + id + ")");
  596
+			AGRepositoryConnection conn;
  597
+			ValueFactory vf;
  598
+			try {
  599
+				conn = connect(true);
  600
+			} catch (RepositoryException e) {
  601
+				e.printStackTrace();
  602
+				return null;
  603
+			}
  604
+	        ThreadVars.connection.set(conn);
  605
+	        ThreadVars.valueFactory.set(vf = conn.getValueFactory());
  606
+
  607
+	        timestamp = NTriplesUtil.toNTriplesString(vf.createURI(Defaults.NS, "EventTimeStamp"));
  608
+
  609
+	        int statusSize = 10, count = 0, subcount = 0, queries = 0, restarts = 0;
  610
+	        Calendar startTime, start, end;
  611
+	        startTime = start= GregorianCalendar.getInstance();
  612
+
  613
+	        while (true) {
  614
+	        	// Do the query
  615
+	        	int result = randomQuery(conn, vf);
  616
+	        	if (result < 0) {
  617
+	        		restarts++;
  618
+	        	} else {
  619
+	        		queries++;
  620
+	        		count += result;
  621
+	        	}
  622
+
  623
+	        	if (queries % statusSize == 0) {
  624
+		            end = GregorianCalendar.getInstance();
  625
+	        		double seconds = (end.getTimeInMillis() - start.getTimeInMillis()) / 1000.0;
  626
+		            subcount = count - subcount;
  627
+		            trace("Querying status - %d triple results returned for %d queries in %f seconds (%f queries/second, " +
  628
+		                "%d triples per query), %d queries aborted.", subcount, statusSize,
  629
+		                seconds, statusSize/seconds, subcount/statusSize, restarts);
  630
+		            start = end;
  631
+		            subcount = count;
  632
+	        	
  633
+	        		seconds = (end.getTimeInMillis() - startTime.getTimeInMillis()) / 1000.0;
  634
+	        		if (seconds > secondsToRun) {
  635
+	        			break;
  636
+	        		}
  637
+	        	}
  638
+	        }
  639
+	        
  640
+	        double seconds = (GregorianCalendar.getInstance().getTimeInMillis() - startTime.getTimeInMillis())/1000.0;
  641
+	        trace("Querying done - %d triple results returned for %d queries in %f seconds " +
  642
+		    	"(%f queries/second, %d triples per query), %d queries aborted.",
  643
+		    	count, queries, seconds, queries/seconds, count/queries, restarts);
  644
+			
  645
+		 	try {
  646
+				conn.close();
  647
+			} catch (RepositoryException e) {
  648
+				e.printStackTrace();
  649
+			}
  650
+			
  651
+			return new QueryResult(queries, count);
  652
+		}
  653
+	}	
  654
+	
524 655
 	public static class Monitor {
525 656
 		static public void start(String phase) {
526 657
 			try {
@@ -593,12 +724,11 @@ public static void main(String[] args) throws RepositoryException {
593 724
 			List<Future<Integer>> futures = executor.invokeAll(tasks);
594 725
 			
595 726
 			for (Future<Integer> furture : futures) {
596  
-				System.out.println(furture.get());
  727
+				furture.get();
597 728
 			}
598 729
 		} catch (InterruptedException e) {
599 730
 			e.printStackTrace();
600 731
 		} catch (ExecutionException e) {
601  
-			// TODO Auto-generated catch block
602 732
 			e.printStackTrace();
603 733
 		}
604 734
         Monitor.stop();
@@ -622,12 +752,11 @@ public static void main(String[] args) throws RepositoryException {
622 752
 			List<Future<Integer>> futures = executor.invokeAll(tasks);
623 753
 
624 754
 			for (Future<Integer> furture : futures) {
625  
-				System.out.println(furture.get());
  755
+				furture.get();
626 756
 			}
627 757
 		} catch (InterruptedException e) {
628 758
 			e.printStackTrace();
629 759
 		} catch (ExecutionException e) {
630  
-			// TODO Auto-generated catch block
631 760
 			e.printStackTrace();
632 761
 		}
633 762
         Monitor.stop();
@@ -651,12 +780,11 @@ public static void main(String[] args) throws RepositoryException {
651 780
 			List<Future<Integer>> futures = executor.invokeAll(tasks);
652 781
 
653 782
 			for (Future<Integer> furture : futures) {
654  
-				System.out.println(furture.get());
  783
+				furture.get();
655 784
 			}
656 785
 		} catch (InterruptedException e) {
657 786
 			e.printStackTrace();
658 787
 		} catch (ExecutionException e) {
659  
-			// TODO Auto-generated catch block
660 788
 			e.printStackTrace();
661 789
 		}
662 790
         Monitor.stop();
@@ -671,11 +799,40 @@ public static void main(String[] args) throws RepositoryException {
671 799
 
672 800
 	    executor.shutdown();
673 801
 
  802
+	    executor = Executors.newFixedThreadPool(Defaults.QUERY_WORKERS);
  803
+	    
  804
+	    RandomCalendar.dateMaker = FullDateRange;
  805
+        start = GregorianCalendar.getInstance();
  806
+	    List<Callable<QueryResult>> queriers = new ArrayList<Callable<QueryResult>>(Defaults.QUERY_WORKERS);
  807
+	    for (int task = 0; task < Defaults.QUERY_WORKERS; task++) {
  808
+	    	queriers.add(new Querier(task, Defaults.QUERY_TIME*60));
  809
+	    }
674 810
 	    trace("Phase 4: Perform customer/date range queries with %d processes for %d minutes.",
675 811
         	Defaults.QUERY_WORKERS, Defaults.QUERY_TIME);
676 812
         Monitor.start("phase-4");
  813
+        int queries = 0;
  814
+        triples = 0;
  815
+        try {
  816
+			List<Future<QueryResult>> futures = executor.invokeAll(queriers);
  817
+
  818
+			for (Future<QueryResult> furture : futures) {
  819
+				QueryResult queryResult = furture.get();
  820
+				queries += queryResult.queries;
  821
+				triples += queryResult.triples;
  822
+			}
  823
+		} catch (InterruptedException e) {
  824
+			e.printStackTrace();
  825
+		} catch (ExecutionException e) {
  826
+			e.printStackTrace();
  827
+		}
677 828
         Monitor.stop();
678  
-
  829
+		end = GregorianCalendar.getInstance();
  830
+		seconds = (end.getTimeInMillis() - start.getTimeInMillis()) / 1000.0;
  831
+        trace("%d total triples returned over %d queries in " +
  832
+        	"%f seconds (%f triples/second, %f queries/second, " +
  833
+        	"%d triples/query).", triples, queries, seconds, triples/seconds,
  834
+        	queries/seconds, triples/queries);
  835
+        
679 836
         trace("Phase 5: Shrink store by 1 month.");
680 837
         Monitor.start("phase-5");
681 838
         Monitor.stop();
@@ -692,94 +849,6 @@ public static void main(String[] args) throws RepositoryException {
692 849
 }
693 850
 
694 851
 /**
695  
-
696  
-# The program options
697  
-OPT = None
698  
-
699  
-PROG = sys.argv[0]
700  
-
701  
-class LoadPhase:
702  
-    start, baseline, bulk, small_commits, query, delete_one,\
703  
-        delete_two, die = range(8)
704  
-    last = delete_one
705  
-
706  
-class PhaseParameters(object):
707  
-    def __init__(self, date_range, events_in_commit, triples):
708  
-        object.__init__(self)
709  
-        self.date_range = date_range
710  
-        self.events_in_commit = events_in_commit
711  
-        self.triples = triples
712  
-
713  
-    @property
714  
-    def commits(self):
715  
-        return int(self.triples / (self.events_in_commit * OPT.EVENT_SIZE))
716  
-
717  
-    @property
718  
-    def commits_per_worker(self):
719  
-        return int(self.commits / OPT.LOAD_WORKERS)
720  
-
721  
-# The Phase Parameters
722  
-PHASE_PARAMS = None
723  
-
724  
-# The work queues for loading and querying
725  
-loadq = None
726  
-queryq = None
727  
-
728  
-def load_events(proc_num):
729  
-    """
730  
-    load_files does the work of the child processes.
731  
-    """
732  
-    conn = None
733  
-    
734  
-    def dequeue():
735  
-        try:
736  
-            return loadq.get()
737  
-        except Empty:
738  
-            return None
739  
-
740  
-    def load_phase(phase):
741  
-        params = PHASE_PARAMS[phase]
742  
-        random_datetime.range = params.date_range
743  
-        quads = [None]*(OPT.EVENT_SIZE*params.events_in_commit)
744  
-
745  
-        status_size = 50 if params.events_in_commit == 1 else 25
746  
-        start_time = time.time()
747  
-
748  
-        count = 0
749  
-        errors = 0
750  
-        for commit in range(params.commits_per_worker):
751  
-            index = 0
752  
-            for event in range(params.events_in_commit):
753  
-                index = random_event(conn, quads, index)
754  
-
755  
-            if commit > 0 and commit % status_size == 0:
756  
-                end_time = time.time()
757  
-                tpc = OPT.EVENT_SIZE*params.events_in_commit
758  
-                trace('loader(%d) [%s]: Loading Status - %d triples loaded so '
759  
-                    'far at %s triples per commit (%f commits/sec, %f triples/'
760  
-                    'sec over last %d commits), %d loading errors.', (
761  
-                    proc_num, datetime.now(), count, tpc,
762  
-                    status_size/(end_time - start_time),
763  
-                    tpc*status_size/(end_time - start_time),
764  
-                    status_size,
765  
-                    errors))
766  
-                start_time = end_time
767  
-
768  
-            try:
769  
-                conn.mini_repository.addStatements(quads)
770  
-                count += len(quads)
771  
-            except Exception:
772  
-                trace('loader(%d) [%s]: Error adding quads...', (
773  
-                    proc_num, datetime.now()))
774  
-                errors += 1
775  
-                traceback.print_exc()
776  
-            
777  
-        trace('loader(%d) [%s]: Loading done - %d triples at %s triples '
778  
-            'per commit, %d loading errors.', (proc_num, datetime.now(),
779  
-            count, OPT.EVENT_SIZE*params.events_in_commit, errors))
780  
-        sys.stdout.flush()
781  
-        loadq.task_done()
782  
-
783 852
     def delete_phase(phase):
784 853
         params = PHASE_PARAMS[phase]
785 854
         timestamp = URI(namespace=OPT.NS, localname='EventTimeStamp'
@@ -825,308 +894,4 @@ def delete_phase(phase):
825 894
             proc_num, datetime.now(), events, events * OPT.EVENT_SIZE))
826 895
         
827 896
         loadq.task_done()
828  
-
829  
-    def get_phase(expected):
830  
-        phase = dequeue()
831  
-
832  
-        while phase not in expected:
833  
-            # Put it back
834  
-            loadq.put(phase)
835  
-            loadq.task_done()
836  
-            time.sleep(1)
837  
-            phase = dequeue()
838  
-
839  
-        return phase
840  
-
841  
-    with connect().session(True, max(3600, OPT.QUERY_TIME*60)) as conn:
842  
-        if OPT.PHASE <= LoadPhase.baseline:
843  
-            phase = get_phase([LoadPhase.baseline])
844  
-            load_phase(phase)
845  
-        
846  
-        if OPT.PHASE <= LoadPhase.bulk:
847  
-            phase = get_phase([LoadPhase.bulk])
848  
-            load_phase(phase)
849  
-
850  
-        if OPT.PHASE <= LoadPhase.small_commits:
851  
-            phase = get_phase([LoadPhase.small_commits])
852  
-            load_phase(phase)
853  
-
854  
-        if OPT.PHASE <= LoadPhase.die:
855  
-            phase = get_phase([LoadPhase.delete_one, LoadPhase.delete_two,
856  
-                LoadPhase.die])
857  
-            if phase in [LoadPhase.delete_one, LoadPhase.delete_two]:
858  
-                delete_phase(phase)
859  
-                phase = get_phase([LoadPhase.die])
860  
-
861  
-        loadq.task_done()
862  
-
863  
-    conn.close()
864  
-
865  
-def query_events(proc_num, resultq):
866  
-    conn = connect()
867  
-
868  
-    def dequeue():
869  
-        try:
870  
-            return queryq.get_nowait()
871  
-        except Empty:
872  
-            return None
873  
-
874  
-    timestamp = URI(namespace=OPT.NS, localname='EventTimeStamp').toNTriples()
875  
-
876  
-    def random_query():
877  
-        # Pick a random customer
878  
-        customer = random_customer()
879  
-
880  
-        # Pick a random date range
881  
-        start, end = FullDateRange.random(), FullDateRange.random()
882  
-        if start > end:
883  
-            start, end = end, start
884  
-
885  
-        start_nt = Literal(start).toNTriples()
886  
-        end_nt = Literal(end).toNTriples()
887  
-
888  
-        # Perform the query in prolog or sparql
889  
-        #language = QueryLanguage.PROLOG if proc_num % 2 == 1 \
890  
-        #    else QueryLanguage.SPARQL
891  
-
892  
-        language = QueryLanguage.PROLOG
893  
-
894  
-        if language is QueryLanguage.PROLOG:
895  
-            queryString = """
896  
-                  (select (?event ?pred ?obj)
897  
-                    (:use-planner nil)
898  
-                    (q- ?event !%s (? !%s !%s) !%s)
899  
-                    (q- ?event ?pred ?obj))""" % (
900  
-                        timestamp, start_nt, end_nt, customer)
901  
-        else:
902  
-            queryString = """
903  
-                SELECT ?event ?pred ?obj {
904  
-                 GRAPH %s {
905  
-                   ?event %s ?time .
906  
-                   FILTER (%s <= ?time && ?time <= %s)
907  
-                 }
908  
-                 {?event ?pred ?obj}
909  
-                }""" % (customer, timestamp, start_nt, end_nt)
910  
-
911  
-        try:
912  
-            # Actually pull the full results to the client, then just count them
913  
-            count = len(conn.prepareTupleQuery(language, queryString
914  
-                ).evaluate())
915  
-        except Exception:
916  
-            # During huge bulk deletions, some queries may be invalidated
917  
-            # and a error returned to indicate they should be rerun. Keep
918  
-            # track of it if this happens.
919  
-            trace('query(%d) [%s]: Error executing query:\n%s\n', (
920  
-                proc_num, datetime.now(), queryString))
921  
-            traceback.print_exc()
922  
-            count = -1
923  
-
924  
-        return count
925  
-
926  
-    status_size = 10
927  
-    start_time = the_time = time.time()
928  
-    sub_count = 0
929  
-    queries = 0
930  
-    count = 0
931  
-    restarts = 0
932  
-
933  
-    # Run the query at least once
934  
-    stop = None
935  
-    while stop is None:
936  
-        result = random_query()
937  
-        if result >= 0:
938  
-            count += result
939  
-        else:
940  
-            restarts += 1
941  
-
942  
-        queries += 1
943  
-        stop = dequeue()
944  
-
945  
-        if queries % status_size == 0:
946  
-            end_time = time.time()
947  
-            sub_count = count - sub_count
948  
-            trace('query(%d) [%s]: Querying status - %d triple results '
949  
-                'returned for %d queries in %f seconds (%f queries/second, '
950  
-                '%f triples per query), %d queries aborted.', (proc_num,
951  
-                datetime.now(), sub_count, status_size,
952  
-                end_time-start_time, status_size/(end_time-start_time),
953  
-                sub_count/status_size, restarts))
954  
-            start_time = end_time
955  
-            sub_count = count
956  
-            
957  
-    the_time = time.time() - the_time
958  
-    trace('query(%d) [%s]: Querying done - %d triple results returned for %d '
959  
-        'queries in %f seconds (%f queries/second, %f triples per query), '
960  
-        ' %d queries aborted.', (proc_num, datetime.now(), count, queries,
961  
-        the_time, queries/the_time, count/queries, restarts))
962  
-
963  
-    conn.close()
964  
-    resultq.put((queries, count, the_time))
965  
-    queryq.task_done()
966  
-
967  
-
968  
-@contextmanager
969  
-def monitor(phase):
970  
-    """
971  
-    Start and end the monitor for a phase.
972  
-    """
973  
-    try:
974  
-        subprocess.call(['./monitor.sh', 'start', phase])
975  
-    except OSError:
976  
-        pass
977  
-    yield
978  
-    try:
979  
-        subprocess.call(['./monitor.sh', 'end'])
980  
-    except OSError:
981  
-        pass
982  
-
983  
-def main():
984  
-    """
985  
-    The parent main process.
986  
-    """
987  
-    global loadq, PHASE_PARAMS
988  
-
989  
-    # Reduce the number of times we need to round-trip to the server
990  
-    # for blank nodes
991  
-    ValueFactory.BLANK_NODE_AMOUNT = OPT.BULK_EVENTS * 4
992  
-
993  
-    # Initialize the Phase Parameters
994  
-    PHASE_PARAMS = [
995  
-        None,
996  
-        PhaseParameters(BaselineRange, 1, OPT.SIZE/10),
997  
-        PhaseParameters(BulkRange, OPT.BULK_EVENTS, (OPT.SIZE*9)/10),
998  
-        PhaseParameters(SmallCommitsRange, 1, OPT.SIZE/10),
999  
-        None,
1000  
-        PhaseParameters(DeleteRangeOne, OPT.BULK_EVENTS, OPT.SIZE/10),
1001  
-        PhaseParameters(DeleteRangeTwo, OPT.BULK_EVENTS, OPT.SIZE/10)]
1002  
-
1003  
-    # Renew/Open the repository
1004  
-    trace('%s [%s]: %s %s:%s.', (PROG, datetime.now(),
1005  
-        "Opening" if OPT.OPEN else "Renewing", OPT.CATALOG, OPT.REPOSITORY))
1006  
-    conn = connect(Repository.OPEN if OPT.OPEN else Repository.RENEW)
1007  
-    triples = conn.size()
1008  
-
1009  
-    trace('%s [%s]: Testing with %d loading, %d querying processes. '
1010  
-        'Repository contains %d triples.', (
1011  
-        PROG, datetime.now(), OPT.LOAD_WORKERS, OPT.QUERY_WORKERS, triples))
1012  
-    
1013  
-    # Create the work queue
1014  
-    loadq = JoinableQueue(OPT.LOAD_WORKERS)
1015  
-
1016  
-    # Start the loading processes
1017  
-    for proc_num in range(OPT.LOAD_WORKERS):
1018  
-        p = Process(target=load_events, args=(proc_num,))
1019  
-        p.start()
1020  
-
1021  
-    def load_phase(phase):
1022  
-        params = PHASE_PARAMS[phase]
1023  
-        triples_start = conn.size()
1024  
-        phase_time = time.time()
1025  
-
1026  
-        # Tell the processes what to do (We only need one deletion process)
1027  
-        if phase != LoadPhase.delete_one:
1028  
-            for proc_num in range(OPT.LOAD_WORKERS):
1029  
-                loadq.put(phase)
1030  
-        else:
1031  
-            loadq.put(LoadPhase.delete_one)
1032  
-            loadq.put(LoadPhase.delete_two)
1033  
-
1034  
-        if phase == LoadPhase.last:
1035  
-            for proc_num in range(OPT.LOAD_WORKERS):
1036  
-                loadq.put(LoadPhase.die)
1037  
-
1038  
-            # Signal that there is no more work for the queue
1039  
-            loadq.close()
1040  
-
1041  
-        # Wait for all the work to be completed
1042  
-        loadq.join()
1043  
-
1044  
-        triples_end = conn.size()
1045  
-        triples = triples_end - triples_start
1046  
-        phase_time = time.time() - phase_time
1047  
-        commits = abs(triples/(params.events_in_commit*OPT.EVENT_SIZE))
1048  
-        trace('%s [%s]: %d total triples processed in %f seconds '
1049  
-            '(%f triples/second, %f commits/second). '
1050  
-            'Store contains %d triples.', (
1051  
-            PROG, datetime.now(), triples, phase_time, triples/phase_time,
1052  
-            commits/phase_time, triples_end))
1053  
-
1054  
-    @contextmanager
1055  
-    def run_queries():
1056  
-        global queryq
1057  
-        queryq = JoinableQueue(OPT.QUERY_WORKERS)
1058  
-        resultq = Queue(OPT.QUERY_WORKERS)
1059  
-
1060  
-        # Start the query processes
1061  
-        for proc_num in range(OPT.QUERY_WORKERS):
1062  
-            p = Process(target=query_events, args=(proc_num, resultq))
1063  
-            p.start()
1064  
-
1065  
-        yield
1066  
-
1067  
-        for proc_num in range(OPT.QUERY_WORKERS):
1068  
-            queryq.put('Stop')
1069  
-
1070  
-        # Signal that there is no more work for the queue
1071  
-        queryq.close()
1072  
-        queryq.join()
1073  
-
1074  
-        queries = 0
1075  
-        triples = 0
1076  
-        phase_time = 0
1077  
-
1078  
-        for proc_num in range(OPT.QUERY_WORKERS):
1079  
-            result = resultq.get()
1080  
-            queries += result[0]
1081  
-            triples += result[1]
1082  
-            phase_time = max(phase_time, result[2])
1083  
-
1084  
-        trace('%s [%s]: %d total triples returned over %d queries in '
1085  
-            '%f seconds (%f triples/second, %f queries/second, '
1086  
-            '%f triples/query). ', (PROG, datetime.now(), triples, queries,
1087  
-            phase_time, triples/phase_time, queries/phase_time,
1088  
-            triples/queries))
1089  
-
1090  
-    total_time = time.time()
1091  
-    if OPT.PHASE <= LoadPhase.baseline:
1092  
-        with monitor('phase-1'):
1093  
-            trace('%s [%s]: Phase 1: Baseline %d triple commits.', (
1094  
-                PROG, datetime.now(), OPT.EVENT_SIZE))
1095  
-            load_phase(LoadPhase.baseline)
1096  
-
1097  
-    if OPT.PHASE <= LoadPhase.bulk:
1098  
-        with monitor('phase-2'):
1099  
-            trace('%s [%s]: Phase 2: Grow store to about %d triples.', (
1100  
-                PROG, datetime.now(), OPT.SIZE))
1101  
-            load_phase(LoadPhase.bulk)
1102  
-
1103  
-    if OPT.PHASE <= LoadPhase.small_commits:
1104  
-        with monitor('phase-3'):
1105  
-            trace('%s [%s]: Phase 3: Perform %d triple commits.',
1106  
-                (PROG, datetime.now(), OPT.EVENT_SIZE))
1107  
-            load_phase(LoadPhase.small_commits)
1108  
-
1109  
-    if OPT.PHASE <= LoadPhase.query:
1110  
-        with monitor('phase-4'):
1111  
-            trace('%s [%s]: Phase 4: Perform customer/date range queries '
1112  
-                'with %d processes for %d minutes.', (PROG, datetime.now(),
1113  
-                OPT.QUERY_WORKERS, OPT.QUERY_TIME))
1114  
-            with run_queries():
1115  
-                time.sleep(OPT.QUERY_TIME*60)
1116  
-
1117  
-    if OPT.PHASE <= LoadPhase.delete_one:
1118  
-        with monitor('phase-5'):
1119  
-            trace('%s [%s]: Phase 5: Shrink store by 1 month.', (
1120  
-                PROG, datetime.now()))
1121  
-            load_phase(LoadPhase.delete_one)
1122  
-    
1123  
-    # Display the results
1124  
-    total_time = time.time() - total_time
1125  
-    triples_end = conn.size()
1126  
-    triples = triples_end - triples
1127  
-    conn.close()
1128  
-
1129  
-    trace('%s [%s]: Test completed in %f total seconds - '
1130  
-        'store contains %d triples (%d triples added/removed).',
1131  
-        (PROG, datetime.now(), total_time, triples_end, triples))
1132 897
 **/

0 notes on commit 32d04b0

Please sign in to comment.
Something went wrong with that request. Please try again.