From 21c976332a9444984247f85d1d2ab7a5900ed25d Mon Sep 17 00:00:00 2001 From: Andy Seaborne Date: Sat, 6 Oct 2018 17:15:22 +0100 Subject: [PATCH 1/3] JENA-1612: A lightly parallel loader plan. --- jena-cmds/src/main/java/tdb2/tdbloader.java | 11 ++++++++-- .../jena/tdb2/loader/LoaderFactory.java | 21 +++++++++++++++++++ .../jena/tdb2/loader/main/DataToTuples.java | 4 ++-- .../tdb2/loader/main/DataToTuplesInline.java | 2 +- .../jena/tdb2/loader/main/InputStage.java | 2 +- .../jena/tdb2/loader/main/LoaderMain.java | 4 +++- .../jena/tdb2/loader/main/LoaderPlans.java | 21 +++++++++++++++++++ .../jena/tdb2/loader/TestLoaderMain.java | 1 + .../jena/tdb2/loader/TestLoaderStd.java | 3 +++ 9 files changed, 62 insertions(+), 7 deletions(-) diff --git a/jena-cmds/src/main/java/tdb2/tdbloader.java b/jena-cmds/src/main/java/tdb2/tdbloader.java index 52e541addc9..1ee896e61c2 100644 --- a/jena-cmds/src/main/java/tdb2/tdbloader.java +++ b/jena-cmds/src/main/java/tdb2/tdbloader.java @@ -36,6 +36,7 @@ import org.apache.jena.tdb2.loader.LoaderFactory; import org.apache.jena.tdb2.loader.base.LoaderOps; import org.apache.jena.tdb2.loader.base.MonitorOutput; +import org.apache.jena.tdb2.loader.main.LoaderPlans; import tdb2.cmdline.CmdTDB; import tdb2.cmdline.CmdTDBGraph; @@ -43,7 +44,7 @@ public class tdbloader extends CmdTDBGraph { private static final ArgDecl argStats = new ArgDecl(ArgDecl.HasValue, "stats"); private static final ArgDecl argLoader = new ArgDecl(ArgDecl.HasValue, "loader"); - private enum LoaderEnum { Basic, Parallel, Sequential, Phased } + private enum LoaderEnum { Basic, Parallel, Sequential, Light, Phased } private boolean showProgress = true; private boolean generateStats = false; @@ -57,7 +58,7 @@ public static void main(String... args) { protected tdbloader(String[] argv) { super(argv); // super.add(argStats, "Generate statistics"); - super.add(argLoader, "--loader=", "Loader to use: 'basic', 'phased' (default), 'sequential' or 'parallel'"); + super.add(argLoader, "--loader=", "Loader to use: 'basic', 'phased' (default), 'sequential', 'parallel' or 'light'"); } @Override @@ -74,6 +75,10 @@ else if ( loadername.matches("seq.*") ) loader = LoaderEnum.Sequential; else if ( loadername.matches("para.*") ) loader = LoaderEnum.Parallel; + else if ( loadername.matches("para.*") ) + loader = LoaderEnum.Parallel; + else if ( loadername.matches("light") ) + loader = LoaderEnum.Light; else throw new CmdException("Unrecognized value for --loader: "+loadername); } @@ -174,6 +179,8 @@ private DataLoader createLoader(LoaderEnum useLoader, DatasetGraph dsg, Node gn, return LoaderFactory.parallelLoader(dsg, gn, output); case Sequential : return LoaderFactory.sequentialLoader(dsg, gn, output); + case Light : + return LoaderFactory.createLoader(LoaderPlans.loaderPlanLight, dsg, output); case Basic : return LoaderFactory.basicLoader(dsg, gn, output); default : diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java index 83f13c9b339..006d1c89b56 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java @@ -26,8 +26,11 @@ import org.apache.jena.sparql.core.Quad; import org.apache.jena.tdb2.loader.base.MonitorOutput; import org.apache.jena.tdb2.loader.basic.LoaderBasic; +import org.apache.jena.tdb2.loader.main.LoaderMain; import org.apache.jena.tdb2.loader.main.LoaderParallel; import org.apache.jena.tdb2.loader.main.LoaderPhased; +import org.apache.jena.tdb2.loader.main.LoaderPlan; +import org.apache.jena.tdb2.loader.main.LoaderPlans; import org.apache.jena.tdb2.loader.sequential.LoaderSequential; /** Obtain a {@link DataLoader}. @@ -193,6 +196,24 @@ public static DataLoader parallelLoader(DatasetGraph dsg, Node graphName, Monito return new LoaderParallel(dsg, graphName, output); } + /** + * Return a loader to load a dataset, using the provided plan. + * See {@link LoaderPlans} for the standard plans. + */ + public static DataLoader createLoader(LoaderPlan plan, DatasetGraph dsg, MonitorOutput output) { + Objects.requireNonNull(dsg); + return new LoaderMain(plan, dsg, output); + } + + /** + * Return a loader to load a graph, using the provided plan. + * See {@link LoaderPlans} for the standard plans. + */ + public static DataLoader createLoader(LoaderPlan plan, DatasetGraph dsg, Node graphName, MonitorOutput output) { + Objects.requireNonNull(dsg); + return new LoaderMain(plan, dsg, graphName, output); + } + /** * Return a general purpose loader to load a dataset. * This default may change between versions. diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java index 02b163d34bb..2b0cbd1490c 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java @@ -42,8 +42,8 @@ import org.apache.jena.tdb2.store.NodeId; import org.apache.jena.tdb2.store.nodetable.NodeTable; -/** Batch processing of {@link DataBlock}s (triples or Quads) converting them to two output of - * to blocks of {@code Tuple}. +/** Batch processing of {@link DataBlock}s (triples or Quads) converting them to two outputs of + * blocks of {@code Tuple}. *

* This class runs one task thread. *

diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java index c6b5075f6ca..6549425d7f5 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java @@ -103,7 +103,7 @@ public void finishBulk() { } dispatchTriples(LoaderConst.END_TUPLES); if ( quads != null && ! quads.isEmpty() ) { - dispatchTriples(quads); + dispatchQuads(quads); quads = null; } dispatchQuads(LoaderConst.END_TUPLES); diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java index 13cb6ffee03..2e6c7725616 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java @@ -23,7 +23,7 @@ * can be done in several ways. *

    *
  • {@code MULTI} - one thread parsing (caller), one for nodetable/tuples, and one for each index - *
  • {@code PARSE_NODE} - one thread parsing (caller) and also nodetable/tuples, and one for each index + *
  • {@code PARSE_NODE} - one thread parsing (caller) and one for nodetable/tuples, and one for each index *
  • {@code PARSE_NODE_INDEX} - use the caller thread for all operations *
* {@code MULTI} is fastest when hardware allows. diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java index 458d4f05e61..8bd158d240c 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java @@ -133,7 +133,8 @@ public LoaderMain(LoaderPlan loaderPlan, DatasetGraph dsg, Node graphName, Monit } /** - * Create data ingestion and primary index building of a {@link LoaderPlan}. + * Create data ingestion and primary index building of a {@link LoaderPlan}. + * Separate threads for parsing, node table loading and primary index building. */ private static StreamRDFCounting executeData(LoaderPlan loaderPlan, DatasetGraphTDB dsgtdb, Map indexMap, List dataProcess, MonitorOutput output) { DatasetPrefixesTDB dps = (DatasetPrefixesTDB)dsgtdb.getPrefixes(); @@ -165,6 +166,7 @@ private static StreamRDFCounting executeData(LoaderPlan loaderPlan, DatasetGraph /** * Create data ingestion and primary index building of a {@link LoaderPlan}. + * One thread for parsing and node table building and one for each primary index building. * This version uses a thread for parse/NodeTable/Tuple and a thread for each of triple and quad index for phase one. */ private static StreamRDFCounting executeDataParseId(LoaderPlan loaderPlan, DatasetGraphTDB dsgtdb, Map indexMap, List dataProcess, MonitorOutput output) { diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java index a97693c9e7f..0649c9d881e 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java @@ -61,6 +61,27 @@ public class LoaderPlans { new String[][]{ } ); + /** + * Lightly parallel, intermediate plan: for triples, this is two threaded. It aims to + * speed up the data phase on a machine where an index is larger than the size of + * available RAM (not heap). In this case it is better to use RAM for better caching a + * single index than trying to work in parallel on two or more indexes. Like all load + * plans, data shape and machine characteristics affect speed so experimentation is + * recommended. + *

+ * Data phase: One thread for parser and building the node table, and one thread for + * each primary index. + *

+ * Index phase: Secondary indexes: one by one. + */ + public static LoaderPlan loaderPlanLight = new LoaderPlan( + InputStage.PARSE_NODE, + new String[]{ "SPO" }, + new String[]{ "GSPO" }, + new String[][]{ { "POS" }, { "OSP" } }, + new String[][]{ { "GPOS" }, { "GOSP" }, { "SPOG" }, { "POSG" } , { "OSPG" } } + ); + /** * A nearly sequential process, as a loader plan including single threaded first * phase. Each index is calculated separately but on a separate thread. diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java index 3596cf3f6a7..0c7093b5b9c 100644 --- a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java +++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java @@ -43,6 +43,7 @@ public static Iterable data() { add(x, "Simple plan", LoaderPlans.loaderPlanSimple); add(x, "Minimal plan", LoaderPlans.loaderPlanMinimal); add(x, "Phased Plan", LoaderPlans.loaderPlanPhased); + add(x, "Light plan", LoaderPlans.loaderPlanLight); add(x, "Parallel plan", LoaderPlans.loaderPlanParallel); return x ; } diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java index 73994231727..748cf5dc4d4 100644 --- a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java +++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java @@ -24,6 +24,7 @@ import org.apache.jena.graph.Node ; import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.tdb2.loader.main.LoaderPlans; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -41,11 +42,13 @@ public static Iterable data() { BiFunction phased = (dsg, gn)->LoaderFactory.phasedLoader(dsg, gn, output); BiFunction sequential = (dsg, gn)->LoaderFactory.sequentialLoader(dsg, gn, output); BiFunction parallel = (dsg, gn)->LoaderFactory.parallelLoader(dsg, gn, output); + BiFunction light = (dsg, gn)->LoaderFactory.createLoader(LoaderPlans.loaderPlanLight, dsg, gn, output); x.add(new Object[]{"Basic loader", basic}) ; x.add(new Object[]{"Phased loader", phased}) ; x.add(new Object[]{"Sequential loader", sequential}) ; x.add(new Object[]{"Parallel loader", parallel}) ; + x.add(new Object[]{"Light loader", light}) ; return x ; } From 0dbc1c2b346f0ed5af73658c9dd0bdf50b3e8bac Mon Sep 17 00:00:00 2001 From: Andy Seaborne Date: Sat, 6 Oct 2018 19:14:59 +0100 Subject: [PATCH 2/3] Tidy up. --- .../jena/tdb2/loader/base/LoaderOps.java | 2 +- .../loader/base/ProgressMonitorOutput.java | 34 ++++++++++++------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java index bb6b16b6abe..88b95b7fcdd 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java @@ -78,7 +78,7 @@ private static StreamRDF streamWithProgressMonitor(StreamRDF dest, String label, return new ProgressStreamRDF(dest, monitor); } - /** Calculate a label for a progress montior. */ + /** Calculate a label for a progress monitor. */ private static String label(String fileName) { String basename = FileOps.splitDirFile(fileName).get(1); return basename; diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorOutput.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorOutput.java index 9f498ae6c86..9720069c060 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorOutput.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorOutput.java @@ -34,10 +34,14 @@ public class ProgressMonitorOutput implements ProgressMonitor { private final long tickPoint; private final int superTick; private final Timer timer; + private Timer getTimer() { return timer; } + private final String label; + // Counters - this monitor. private long counterBatch = 0; private long counterTotal = 0; + //private final ProgressMonitorContext context; private long lastTime = -1; private long timeTotalMillis = -1; @@ -92,27 +96,29 @@ public void startMessage(String msg) { @Override public void finishMessage(String msg) { // Elapsed. - long timePoint = timer.getTimeInterval(); + long timePoint = getTimer().getTimeInterval(); + long counterTotalMsg = getRunningTotal(); // /1000L is milli to second conversion if ( timePoint != 0 ) { double time = timePoint / 1000.0; - long runAvgRate = (counterTotal * 1000L) / timePoint; + long runAvgRate = (counterTotalMsg * 1000L) / timePoint; - print("%s: %,d %s %.2fs (Avg: %,d)", msg, counterTotal, label, time, runAvgRate); + print("%s: %,d %s %.2fs (Avg: %,d)", msg, counterTotalMsg, label, time, runAvgRate); } else - print("%s: %,d %s (Avg: ----)", msg, counterTotal, label); + print("%s: %,d %s (Avg: ----)", msg, counterTotalMsg, label); } @Override public void start() { - timer.startTimer(); + // XXX + getTimer().startTimer(); lastTime = 0; } @Override public void finish() { - timeTotalMillis = timer.endTimer(); + timeTotalMillis = getTimer().endTimer(); } @Override @@ -120,22 +126,22 @@ public void tick() { counterBatch++; counterTotal++; - if ( tickPoint(counterTotal, tickPoint) ) { - long timePoint = timer.readTimer(); + if ( tickPoint(getRunningTotal(), tickPoint) ) { + long timePoint = getTimer().readTimer(); long thisTime = timePoint - lastTime; // *1000L is milli to second conversion if ( thisTime != 0 && timePoint != 0 ) { long batchAvgRate = (counterBatch * 1000L) / thisTime; - long runAvgRate = (counterTotal * 1000L) / timePoint; - print("Add: %,d %s (Batch: %,d / Avg: %,d)", counterTotal, label, batchAvgRate, runAvgRate); + long runAvgRate = (getRunningTotal() * 1000L) / timePoint; + print("Add: %,d %s (Batch: %,d / Avg: %,d)", getRunningTotal(), label, batchAvgRate, runAvgRate); } else { - print("Add: %,d %s (Batch: ---- / Avg: ----)", counterTotal, label); + print("Add: %,d %s (Batch: ---- / Avg: ----)", getRunningTotal(), label); } lastTime = timePoint; - if ( tickPoint(counterTotal, superTick * tickPoint) ) + if ( tickPoint(getRunningTotal(), superTick * tickPoint) ) elapsed(timePoint); counterBatch = 0; lastTime = timePoint; @@ -147,6 +153,10 @@ public long getTicks() { return counterTotal; } + private long getRunningTotal() { + return counterTotal; + } + @Override public long getTime() { return timeTotalMillis; From f5de3418f8b619648e6cc863050db48d704ade91 Mon Sep 17 00:00:00 2001 From: Andy Seaborne Date: Tue, 9 Oct 2018 16:53:37 +0100 Subject: [PATCH 3/3] JENA-1616: Better progress output. --- .../java/org/apache/jena/atlas/lib/Timer.java | 18 ++-- jena-cmds/src/main/java/tdb2/tdbloader.java | 20 +++++ .../jena/tdb2/loader/base/LoaderBase.java | 54 ++++++++++-- .../jena/tdb2/loader/base/LoaderOps.java | 9 +- .../tdb2/loader/base/ProgressMonitor.java | 19 +++- .../loader/base/ProgressMonitorBasic.java | 48 ++++++++-- .../loader/base/ProgressMonitorContext.java | 61 +++++++++++++ .../loader/base/ProgressMonitorOutput.java | 88 ++++++++++++++----- .../tdb2/loader/base/ProgressStreamRDF.java | 16 ---- .../jena/tdb2/loader/basic/LoaderBasic.java | 6 +- .../jena/tdb2/loader/main/LoaderMain.java | 12 +-- .../sequential/BuilderSecondaryIndexes.java | 16 ++-- .../loader/sequential/LoaderSequential.java | 8 +- 13 files changed, 290 insertions(+), 85 deletions(-) create mode 100644 jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorContext.java diff --git a/jena-base/src/main/java/org/apache/jena/atlas/lib/Timer.java b/jena-base/src/main/java/org/apache/jena/atlas/lib/Timer.java index 340ba379b8e..c3e1ee47ca1 100644 --- a/jena-base/src/main/java/org/apache/jena/atlas/lib/Timer.java +++ b/jena-base/src/main/java/org/apache/jena/atlas/lib/Timer.java @@ -32,13 +32,12 @@ public Timer() {} public void startTimer() { if ( inTimer ) throw new AtlasException("Already in timer") ; - timeStart = System.currentTimeMillis() ; timeFinish = -1 ; inTimer = true ; } - /** Return time in milliseconds */ + /** Stop timing and return the elapsed time in milliseconds */ public long endTimer() { if ( !inTimer ) throw new AtlasException("Not in timer") ; @@ -47,21 +46,30 @@ public long endTimer() { return getTimeInterval() ; } + /** Read the timer - either the instantanious value (if running) or elapsed time (if finished). */ + public long read() { + return inTimer + ? System.currentTimeMillis() - timeStart + : timeFinish - timeStart; + } + + /** Read a running timer */ public long readTimer() { if ( !inTimer ) throw new AtlasException("Not in timer") ; - return System.currentTimeMillis() - timeStart ; + return read() ; } + /** Read an elapsed timer */ public long getTimeInterval() { if ( inTimer ) throw new AtlasException("Still timing") ; if ( timeFinish == -1 ) throw new AtlasException("No valid interval") ; - - return timeFinish - timeStart ; + return read() ; } + /** Helper function to format milliseconds as "%.3f" seconds */ static public String timeStr(long timeInterval) { return String.format("%.3f", timeInterval / 1000.0) ; } diff --git a/jena-cmds/src/main/java/tdb2/tdbloader.java b/jena-cmds/src/main/java/tdb2/tdbloader.java index 1ee896e61c2..2473a8fc59a 100644 --- a/jena-cmds/src/main/java/tdb2/tdbloader.java +++ b/jena-cmds/src/main/java/tdb2/tdbloader.java @@ -18,12 +18,16 @@ package tdb2; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import java.util.Objects; import jena.cmd.ArgDecl; import jena.cmd.CmdException; import org.apache.jena.atlas.lib.InternalErrorException; +import org.apache.jena.atlas.lib.ListUtils; import org.apache.jena.atlas.lib.Timer; import org.apache.jena.graph.Node; import org.apache.jena.graph.NodeFactory; @@ -109,6 +113,8 @@ protected void exec() { List urls = getPositional(); if ( urls.size() == 0 ) urls.add("-"); + else + checkFiles(urls); if ( graphName == null ) { loadQuads(urls); @@ -128,6 +134,20 @@ protected void exec() { loadTriples(graphName, urls); } + // Check files exists before starting. + private void checkFiles(List urls) { + List problemFiles = + ListUtils.toList( + urls.stream() + .map(Paths::get) + .filter(p-> !Files.exists(p) || !Files.isRegularFile(p /*follow links*/) || !Files.isReadable(p) ) + .map(Path::toString) + ); + if ( ! problemFiles.isEmpty() ) { + throw new CmdException("Can't read files : ["+problemFiles+"]"); + } + } + private void loadTriples(String graphName, List urls) { execBulkLoad(super.getDatasetGraph(), graphName, urls, showProgress); } diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderBase.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderBase.java index ce205e0631a..544310e9451 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderBase.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderBase.java @@ -26,13 +26,13 @@ import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.tdb2.loader.DataLoader; -/** Simple bulk loader framework. +/** + * Bulk loader framework. *

* It puts a write-transaction around the whole process if {@link #bulkUseTransaction} - * returns true and then calls abstract {@link #loadOne(String)} - * for each file. + * returns true. It calls {@link #loadOne} for each file. *

- * If a graph name is provided, it converts triples to quads in that named graph. + * If a graph name is provided, it converts triples to quads in that named graph. */ public abstract class LoaderBase implements DataLoader { @@ -62,7 +62,7 @@ public void finishBulk() { dsg.end(); } long totalElapsed = timer.endTimer(); - outputTime(totalElapsed); + outputSummary(totalElapsed); } @Override @@ -75,21 +75,57 @@ public void finishException(Exception ex) { @Override public void load(List filenames) { - // Default implementation. + if ( filenames.isEmpty() ) { + output.print("No files to load"); + return; + } + + ProgressMonitor monitor = createProgressMonitor(output); + + boolean multipleFiles = (filenames.size()>1); + + String file1 = null; + if ( ! multipleFiles ) { + file1 = filenames.get(0); + if ( file1.equals("-") ) + file1 = "stdin"; + } try { - filenames.forEach(fn->loadOne(fn)); + if ( multipleFiles ) + monitor.startMessage("Start: "+filenames.size()+" files"); + else + monitor.startMessage("Start: "+file1); + monitor.start(); + filenames.forEach(fn->{ + if ( multipleFiles ) + monitor.startSection(); + loadOne(fn, monitor); + if ( multipleFiles ) + monitor.finishSection(); + }); + monitor.finish(); + if ( multipleFiles ) + monitor.finishMessage("Finished: "+filenames.size()+" files"); + else + monitor.finishMessage("Finished: "+file1); } catch (Exception ex) { finishException(ex); throw ex; } } + protected abstract ProgressMonitor createProgressMonitor(MonitorOutput output); + /** Subclasses must provide a setting. */ protected abstract boolean bulkUseTransaction(); - protected abstract void loadOne(String filename); + protected void loadOne(String filename, ProgressMonitor monitor) { + String label = LoaderOps.label(filename); + monitor.setLabel(label); + LoaderOps.inputFile(stream(), filename, monitor); + } - protected void outputTime(long totalElapsed) { + protected void outputSummary(long totalElapsed) { if ( output != null ) { long count = countTriples()+countQuads(); String label = "Triples/Quads"; diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java index 88b95b7fcdd..89e8941e51b 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java @@ -79,7 +79,7 @@ private static StreamRDF streamWithProgressMonitor(StreamRDF dest, String label, } /** Calculate a label for a progress monitor. */ - private static String label(String fileName) { + public static String label(String fileName) { String basename = FileOps.splitDirFile(fileName).get(1); return basename; } @@ -100,17 +100,16 @@ public static void inputFile(StreamRDF dest, String source, MonitorOutput output * "no output". */ public static void inputFile(StreamRDF sink, String source, ProgressMonitor monitor) { - if ( monitor != null ) { sink = new ProgressStreamRDF(sink, monitor); - monitor.start(); + //monitor.start(); } sink.start(); RDFDataMgr.parse(sink, source); sink.finish(); if ( monitor != null ) { - monitor.finish(); - monitor.finishMessage("Data: " + source); + //monitor.finish(); + //monitor.finishMessage("Data"); } } diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitor.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitor.java index 7f114b5b2c1..9c3934b8d8a 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitor.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitor.java @@ -20,25 +20,31 @@ /** Interface {@code ProgressMonitor} - monitor progress.*/ public interface ProgressMonitor { - public void startMessage(); - /** Output the starting message. * The format is implementation dependent. */ public void startMessage(String message); - + /** * Output the finishing message. * The format is implementation dependent. */ public void finishMessage(String message); + public void setLabel(String label); + /** Start and start timing. This should be paired with a call to {@link #finish()}. */ public void start(); + + /** Start a section within the overall start-finish. */ + public void startSection(); + /** Finish a section within the overall start-finish. */ + public void finishSection(); + /** * Finish and stop timing. The total time is available with {@link #getTime} and the - * numbe rof items processes with {@link #getTicks()}. + * number of items processes with {@link #getTicks()}. */ public void finish(); @@ -51,4 +57,9 @@ public interface ProgressMonitor { /** Return the elapsed time taken - this is only valid after {@link #finish()} has been called. */ public long getTime(); + /** Return the number of ticks. Valid after {@link #startSection()} has been called. */ + public long getSectionTicks(); + + /** Return the elapsed section time taken. */ + public long getSectionTime(); } diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorBasic.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorBasic.java index 81ee9a4b4e5..19ccc2bddc2 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorBasic.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorBasic.java @@ -19,19 +19,23 @@ package org.apache.jena.tdb2.loader.base; import org.apache.jena.atlas.lib.Timer; +import org.apache.jena.tdb2.TDBException; /** Simple {@link ProgressMonitor} that records time and ticks but does not print anything */ public class ProgressMonitorBasic implements ProgressMonitor { - + // Overall private Timer timer = new Timer(); private long timeInMillis = -1; private long tickCounter = 0; + //Section + private boolean inSection = false; + private int sectionCounter = 0; + private Timer sectionTimer = null; + private long sectionTimeInMillis = -1; + private long sectionTickCounter = 0; public ProgressMonitorBasic() {} - @Override - public void startMessage() {} - @Override public void startMessage(String message) {} @@ -49,7 +53,30 @@ public void finish() { } @Override - public void tick() { tickCounter++; } + public void startSection() { + if ( inSection ) + throw new TDBException("startSection: Already in section"); + inSection = true; + sectionCounter++; + sectionTimer = new Timer(); + sectionTimeInMillis = 0; + sectionTickCounter = 0; + } + + @Override + public void finishSection() { + if ( ! inSection ) + throw new TDBException("finishSection: Not in section"); + inSection = false; + sectionTimeInMillis = sectionTimer.endTimer(); + } + + @Override + public void tick() { + tickCounter++; + if ( inSection ) + sectionTickCounter++; + } @Override public long getTicks() { @@ -61,4 +88,15 @@ public long getTime() { return timeInMillis; } + @Override + public long getSectionTicks() { + return sectionTickCounter; + } + @Override + public long getSectionTime() { + return sectionTimeInMillis; + } + + @Override + public void setLabel(String label) {} } diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorContext.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorContext.java new file mode 100644 index 00000000000..14d9811a72d --- /dev/null +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorContext.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.tdb2.loader.base; + +import org.apache.jena.atlas.lib.Timer; +import org.apache.jena.atlas.logging.Log; + +/** + * The counting state of a {@link ProgressMonitor}. This can be used to across different + * {@code ProgressMonitor}s (sequentially) to give running totals + */ + +public class ProgressMonitorContext { + /*package*/ long ticks; + /*package*/ Timer timer; + private int depth = 0; + + public ProgressMonitorContext(long ticks, Timer timer) { + super(); + this.ticks = ticks; + this.timer = timer; + } + + public void tick() { ticks++; } + + public void start() { + if ( depth == 0 ) + timer.startTimer(); + depth++; + } + public void finish() { + --depth; + if ( depth < 0 ) { + Log.error(this, "Misaligned start/finish"); + return; + } + if ( depth == 0 ) + timer.endTimer(); + } + + //public int getDepth() { return depth; } + + public long getElapsed() { return depth < 0 ? -1 : timer.readTimer(); } + +} diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorOutput.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorOutput.java index 9720069c060..552d2debef5 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorOutput.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitorOutput.java @@ -23,6 +23,7 @@ import java.util.Objects; import org.apache.jena.atlas.lib.Timer; +import org.apache.jena.tdb2.TDBException; import org.slf4j.Logger ; /** Progress monitor - output lines to show the progress of some long running operation. @@ -34,9 +35,17 @@ public class ProgressMonitorOutput implements ProgressMonitor { private final long tickPoint; private final int superTick; private final Timer timer; - private Timer getTimer() { return timer; } + private Timer getTimer() { return timer; } + + //Section + private boolean inSection = false; + private int sectionCounter = 0; + private Timer sectionTimer = null; + private long sectionTimeInMillis = -1; + private long sectionTickCounter = 0; - private final String label; + // Current label. + private String label; // Counters - this monitor. private long counterBatch = 0; @@ -72,41 +81,38 @@ public static ProgressMonitorOutput create(MonitorOutput output, String label, l */ public ProgressMonitorOutput(String label, long tickPoint, int superTick, MonitorOutput output) { this.output = output; - this.label = label; + setLabel(label); this.tickPoint = tickPoint; this.superTick = superTick; this.timer = new Timer(); } - /** Print a start message using the label */ - @Override - public void startMessage() { - startMessage(null) ; - } +// /** Print a start message using the label */ +// @Override +// public void startMessage() { +// startMessage(null) ; +// } /** Print a start message using a different string. */ @Override public void startMessage(String msg) { if ( msg != null ) output.print(msg) ; - else - output.print("Start: "+label) ; } + //public void startSource(String msg) { + //public void finishSource(String msg) { + @Override public void finishMessage(String msg) { // Elapsed. - long timePoint = getTimer().getTimeInterval(); - long counterTotalMsg = getRunningTotal(); - - // /1000L is milli to second conversion + long timePoint = getTimer().read(); if ( timePoint != 0 ) { double time = timePoint / 1000.0; - long runAvgRate = (counterTotalMsg * 1000L) / timePoint; - - print("%s: %,d %s %.2fs (Avg: %,d)", msg, counterTotalMsg, label, time, runAvgRate); + long runAvgRate = (getRunningTotal() * 1000L) / timePoint; + print("%s: %,d tuples in %.2fs (Avg: %,d)", msg, getTicks(), time, runAvgRate); } else - print("%s: %,d %s (Avg: ----)", msg, counterTotalMsg, label); + print("%s: %,d (Avg: ----)", msg, getTicks()); } @Override @@ -118,14 +124,19 @@ public void start() { @Override public void finish() { - timeTotalMillis = getTimer().endTimer(); + // XXX + getTimer().endTimer(); + timeTotalMillis = getTimer().getTimeInterval(); } @Override public void tick() { + // The ticking counterBatch++; counterTotal++; - + if ( inSection ) + sectionTickCounter++; + // Report overall progress if ( tickPoint(getRunningTotal(), tickPoint) ) { long timePoint = getTimer().readTimer(); long thisTime = timePoint - lastTime; @@ -133,6 +144,8 @@ public void tick() { // *1000L is milli to second conversion if ( thisTime != 0 && timePoint != 0 ) { long batchAvgRate = (counterBatch * 1000L) / thisTime; + // XXX Too large : first after file switch. ???timePoint is wrong. + //System.err.printf("** %d %d\n",getRunningTotal(), timePoint ); long runAvgRate = (getRunningTotal() * 1000L) / timePoint; print("Add: %,d %s (Batch: %,d / Avg: %,d)", getRunningTotal(), label, batchAvgRate, runAvgRate); } else { @@ -161,6 +174,36 @@ private long getRunningTotal() { public long getTime() { return timeTotalMillis; } + + @Override + public void startSection() { + if ( inSection ) + throw new TDBException("startSection: Already in section"); + inSection = true; + sectionCounter++; + sectionTimer = new Timer(); + sectionTimer.startTimer(); + sectionTimeInMillis = 0; + sectionTickCounter = 0; + } + + @Override + public void finishSection() { + if ( ! inSection ) + throw new TDBException("finishSection: Not in section"); + print(" End file: %s (triples/quads = %,d)", label, sectionTickCounter); + inSection = false; + sectionTimeInMillis = sectionTimer.endTimer(); + } + + @Override + public long getSectionTicks() { + return sectionTickCounter; + } + @Override + public long getSectionTime() { + return sectionTimeInMillis; + } protected void elapsed(long timerReading) { float elapsedSecs = timerReading / 1000F; @@ -177,4 +220,9 @@ static boolean tickPoint(long counter, long quantum) { return counter % quantum == 0; } + @Override + public void setLabel(String label) { + this.label = label; + } + } diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressStreamRDF.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressStreamRDF.java index e6c7b59a660..39b31af329e 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressStreamRDF.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressStreamRDF.java @@ -36,22 +36,6 @@ public ProgressStreamRDF(StreamRDF other, ProgressMonitor monitor) { this.monitor = monitor ; } - // Better that the app call start/finish on the monitor so that a number of - // inputs on the stream can call start/finish. i.e the monitor can be used - // for a batch of oeprations. - -// @Override -// public void start() { -// monitor.start(); -// super.start(); -// } -// -// @Override -// public void finish() { -// super.finish(); -// monitor.finish(); -// } - @Override public void triple(Triple triple) { super.triple(triple); diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/basic/LoaderBasic.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/basic/LoaderBasic.java index e30e0f5d885..41dbbbf8e79 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/basic/LoaderBasic.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/basic/LoaderBasic.java @@ -27,6 +27,8 @@ import org.apache.jena.tdb2.loader.base.LoaderBase; import org.apache.jena.tdb2.loader.base.LoaderOps; import org.apache.jena.tdb2.loader.base.MonitorOutput; +import org.apache.jena.tdb2.loader.base.ProgressMonitor; +import org.apache.jena.tdb2.loader.base.ProgressMonitorOutput; /** Simple bulk loader. Algorithm: Parser to dataset. */ public class LoaderBasic extends LoaderBase { @@ -53,8 +55,8 @@ public StreamRDF stream() { } @Override - protected void loadOne(String source) { - LoaderOps.inputFile(dest, source, output, DataTickPoint, DataSuperTick); + protected ProgressMonitor createProgressMonitor(MonitorOutput output) { + return ProgressMonitorOutput.create(output, "", DataTickPoint, DataSuperTick); } @Override diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java index 8bd158d240c..8b0fdf956b0 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java @@ -96,16 +96,16 @@ public class LoaderMain extends LoaderBase implements DataLoader { public static final int DataSuperTick = 10; public static final int IndexTickPoint = 1_000_000; public static final int IndexSuperTick = 10; - + private final LoaderPlan loaderPlan; - + private final DatasetGraphTDB dsgtdb; private final StreamRDF stream; private final Map indexMap; private final StreamRDFCounting dataInput; private final List dataProcess = new ArrayList<>(); - + public LoaderMain(LoaderPlan loaderPlan, DatasetGraph dsg, MonitorOutput output) { this(loaderPlan, dsg, null, output); } @@ -284,7 +284,7 @@ private static void indexPhase(List processes, TupleIndex srcId // Add to processes - we can wait later if we do not touched indexes being built. processes.add(indexer); PhasedOps.ReplayResult result = PhasedOps.replay(srcIdx, dest, output); - // End read tranaction on srcIdx + // End read transaction on srcIdx transaction.end(); String timeStr = "---"; @@ -327,7 +327,7 @@ public long countQuads() { } @Override - protected void loadOne(String filename) { - LoaderOps.inputFile(stream, filename, output, DataTickPoint, DataSuperTick); + protected ProgressMonitor createProgressMonitor(MonitorOutput output) { + return ProgressMonitorOutput.create(output, "", DataTickPoint, DataSuperTick); } } diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/sequential/BuilderSecondaryIndexes.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/sequential/BuilderSecondaryIndexes.java index b1bec81d0c0..8484a399802 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/sequential/BuilderSecondaryIndexes.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/sequential/BuilderSecondaryIndexes.java @@ -18,7 +18,6 @@ package org.apache.jena.tdb2.loader.sequential; -import org.apache.jena.atlas.lib.Timer; import org.apache.jena.tdb2.loader.base.LoaderOps; import org.apache.jena.tdb2.loader.base.MonitorOutput; import org.apache.jena.tdb2.loader.base.ProgressMonitor; @@ -26,27 +25,24 @@ import org.apache.jena.tdb2.store.tupletable.TupleIndex; /** - * This interface is the mechanism for building indexes given that at leasts one index + * This interface is the mechanism for building indexes given that at least one index * already exists (the "primary", which normally is SPO or GSPO). */ public class BuilderSecondaryIndexes { - public static void createSecondaryIndexes(MonitorOutput output, TupleIndex primaryIndex, TupleIndex[] secondaryIndexes) - { - Timer timer = new Timer() ; - timer.startTimer() ; + public static void createSecondaryIndexes(MonitorOutput output, TupleIndex primaryIndex, TupleIndex[] secondaryIndexes) { boolean printTiming = true; for ( TupleIndex index : secondaryIndexes ) { + String msg = primaryIndex.getName()+"->"+index.getName(); if ( index != null ) { - ProgressMonitor monitor = ProgressMonitorOutput.create(output, index.getName(), + ProgressMonitor monitor = ProgressMonitorOutput.create(output, msg, LoaderSequential.IndexTickPoint, LoaderSequential.IndexSuperTick); - monitor.startMessage(); + monitor.startMessage(msg); monitor.start(); - long time1 = timer.readTimer() ; LoaderOps.copyIndex(primaryIndex.all(), new TupleIndex[]{index}, monitor) ; - long time2 = timer.readTimer() ; + monitor.finish(); monitor.finishMessage(index.getName()+" indexing: "); } diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/sequential/LoaderSequential.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/sequential/LoaderSequential.java index 32c2244dd99..98d98dd1140 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/sequential/LoaderSequential.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/sequential/LoaderSequential.java @@ -29,6 +29,8 @@ import org.apache.jena.tdb2.loader.base.LoaderBase; import org.apache.jena.tdb2.loader.base.LoaderOps; import org.apache.jena.tdb2.loader.base.MonitorOutput; +import org.apache.jena.tdb2.loader.base.ProgressMonitor; +import org.apache.jena.tdb2.loader.base.ProgressMonitorOutput; import org.apache.jena.tdb2.store.DatasetGraphTDB; import org.apache.jena.tdb2.sys.TDBInternal; @@ -103,10 +105,10 @@ public StreamRDF stream() { } @Override - protected void loadOne(String filename) { - LoaderOps.inputFile(stream, filename, output, DataTickPoint, DataSuperTick); + protected ProgressMonitor createProgressMonitor(MonitorOutput output) { + return ProgressMonitorOutput.create(output, "", DataTickPoint, DataSuperTick); } - + @Override public boolean bulkUseTransaction() { return true;