diff --git a/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/CollectorStage.java b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/CollectorStage.java new file mode 100644 index 000000000..fad83c617 --- /dev/null +++ b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/CollectorStage.java @@ -0,0 +1,45 @@ +package org.wikidata.wdtk.dumpfiles.parallel; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; + +/** + * A collector stage is a stage that collects the results of multiple producers + * and merges them into one result. + * The type of the results does not change during collection. + * + * @author fredo + * + */ +public class CollectorStage extends Stage { +// NOTE this would be a good place to apply filtering and sorting + + public CollectorStage(){ + this.result = new CounterStageResult(); + this.producers = new LinkedList<>(); + } + + /** + * Only one consumer can be added. + * On the attempt to add another consumer the method will return false. + */ + @Override + public synchronized boolean addConsumer(BlockingQueue consumer){ + if(this.consumers.isEmpty()){ + this.consumers = Collections.singletonList(consumer); + return true; + } else { + return false; + } + } + + @Override + public Type processElement(Type element) { + // filter or sorting here + // sorting might need to override call() + ((CounterStageResult) this.result).increment(); + return element; + } + +} diff --git a/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/CounterStageResult.java b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/CounterStageResult.java new file mode 100644 index 000000000..ca5c1eaed --- /dev/null +++ b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/CounterStageResult.java @@ -0,0 +1,17 @@ +package org.wikidata.wdtk.dumpfiles.parallel; + +public class CounterStageResult extends StageResult { + + private long counter = 0; + + public void increment(){ + counter++; + } + + @Override + public String getReport() { + return "Counted " + this.counter; + } + + +} diff --git a/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/FailedStageResult.java b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/FailedStageResult.java new file mode 100644 index 000000000..50b8e3b25 --- /dev/null +++ b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/FailedStageResult.java @@ -0,0 +1,12 @@ +package org.wikidata.wdtk.dumpfiles.parallel; + +public class FailedStageResult extends StageResult { + + // TODO allow submitting a reason + + @Override + public String getReport() { + return "Stage execution failed"; + } + +} diff --git a/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/FileStage.java b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/FileStage.java new file mode 100644 index 000000000..60e17f1bb --- /dev/null +++ b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/FileStage.java @@ -0,0 +1,44 @@ +package org.wikidata.wdtk.dumpfiles.parallel; + +import java.io.IOException; +import java.io.InputStream; +import java.util.LinkedList; + +import org.wikidata.wdtk.dumpfiles.WmfDumpFile; +import org.wikidata.wdtk.dumpfiles.WmfDumpFileManager; +import org.wikidata.wdtk.util.DirectoryManager; +import org.wikidata.wdtk.util.DirectoryManagerImpl; +import org.wikidata.wdtk.util.WebResourceFetcher; +import org.wikidata.wdtk.util.WebResourceFetcherImpl; + +public class FileStage extends Stage { + + private DirectoryManager downloadDirectoryManager; + private WebResourceFetcher webResourceFetcher = new WebResourceFetcherImpl(); + WmfDumpFileManager fileManager; + + public FileStage(){ + this.consumers = new LinkedList<>(); + this.producers = new LinkedList<>(); + this.waitTime = 1000; + + try { + downloadDirectoryManager = new DirectoryManagerImpl(System.getProperty("user.dir")); + fileManager = new WmfDumpFileManager("wikidatawiki", + downloadDirectoryManager, webResourceFetcher); + } catch (IOException e) { + e.printStackTrace(); + this.running = false; + this.result = new FailedStageResult(); + } + + + } + + @Override + public InputStream processElement(WmfDumpFile element) { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/Stage.java b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/Stage.java new file mode 100644 index 000000000..fa9f5ee45 --- /dev/null +++ b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/Stage.java @@ -0,0 +1,82 @@ +package org.wikidata.wdtk.dumpfiles.parallel; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; + +/** + * A stage batch-processes all elements provided by producers. The results of + * the processing are given to consumers. The life cycle of a stage starts when + * calling the call()-method. Then the stage will call process() upon all + * elements provided by all consumers in order. Once all elements are depleted + * the stage waits for new elements. A stage can be signaled that there will be + * no more elements to process. It then will process all remaining elements and + * return a StageResult. + * + * @author fredo + * + */ +public abstract class Stage implements Callable { + + protected boolean running = true; + private boolean finished = false; + protected int waitTime = 100; // in usec + + protected StageResult result; + protected Collection> producers = Collections.emptyList(); + protected Collection> consumers = Collections.emptyList(); + + public synchronized boolean addProducer(BlockingQueue producer) { + this.producers.add(producer); + return true; + } + + public synchronized boolean addConsumer(BlockingQueue consumer) { + this.consumers.add(consumer); + return true; + } + + public synchronized void finish(){ + this.running = false; + } + + public synchronized boolean isFinished(){ + return this.finished; + } + + public abstract OutType processElement(InType element); + + /** + * The default implementation processes the input only element-wise + */ + @Override + public StageResult call() throws Exception { + List currentStep = new LinkedList<>(); + + while (this.running) { + // get all the input for the steps + for (BlockingQueue producer : this.producers) { + producer.drainTo(currentStep); + } + // process the elements + while (!currentStep.isEmpty()) { + OutType stepResult = this.processElement(currentStep.remove(0)); + // distribute result to all consumers + for (BlockingQueue consumer : this.consumers) { + consumer.put(stepResult); + } + } + + // wait for new input + synchronized(this){ + wait(this.waitTime); + } + } + this.finished = true; + return this.result; + } + +} diff --git a/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/StageManager.java b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/StageManager.java new file mode 100644 index 000000000..a15123882 --- /dev/null +++ b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/StageManager.java @@ -0,0 +1,100 @@ +package org.wikidata.wdtk.dumpfiles.parallel; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +public class StageManager { + + // TODO the whole life cycle should be managed by this class + + ExecutorService executor = Executors.newCachedThreadPool(); + + List> stages = new LinkedList<>(); + List> futures = new LinkedList<>(); + List results = new LinkedList<>(); + List> connectors = new LinkedList<>(); + + /** + * + * @param sender + * @param receiver + * @return + */ + public BlockingQueue connectStages(Stage sender, Stage receiver){ + + BlockingQueue connectionQueue = new LinkedBlockingQueue(); + + sender.addConsumer(connectionQueue); + receiver.addProducer(connectionQueue); + + this.connectors.add(connectionQueue); + + return connectionQueue; + } + + public void submitStage(Stage toLaunch){ + + stages.add(toLaunch); + } + + public void run(){ + for(Stage stage : this.stages){ + Future result = executor.submit(stage); + this.futures.add(result); + } + } + + public void collectResults(){ + List> toRemove = new LinkedList<>(); + + for(Future future : this.futures){ + if(future.isDone()){ + try { + toRemove.add(future); + this.results.add(future.get()); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + } + + this.futures.removeAll(toRemove); + } + + /** + * The StageResults are kept for evaluation. + * The Futures and Stages and connecting Queues will be cleared. + */ + public void shutdown(){ + this.executor.shutdown(); + + for(Stage stage : this.stages){ + stage.finish(); + } + + for(BlockingQueue queue : this.connectors){ + queue.clear(); + } + + for(Future future : this.futures){ + try { + this.results.add(future.get()); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + this.stages.clear(); + this.futures.clear(); + this.connectors.clear(); + } + + public List getStageResults(){ + return this.results; + } +} diff --git a/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/StageResult.java b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/StageResult.java new file mode 100644 index 000000000..27c0dc464 --- /dev/null +++ b/wdtk-dumpfiles/src/main/java/org/wikidata/wdtk/dumpfiles/parallel/StageResult.java @@ -0,0 +1,21 @@ +package org.wikidata.wdtk.dumpfiles.parallel; + + + +public abstract class StageResult { + + static long nextId = 0; + + private long id; + + StageResult(){ + this.id = nextId; + nextId++; + } + + public long getId(){ + return this.id; + } + + public abstract String getReport(); +} diff --git a/wdtk-examples/src/main/java/org/wikidata/wdtk/examples/SquaringStage.java b/wdtk-examples/src/main/java/org/wikidata/wdtk/examples/SquaringStage.java new file mode 100644 index 000000000..c572d0387 --- /dev/null +++ b/wdtk-examples/src/main/java/org/wikidata/wdtk/examples/SquaringStage.java @@ -0,0 +1,23 @@ +package org.wikidata.wdtk.examples; + +import java.util.LinkedList; + +import org.wikidata.wdtk.dumpfiles.parallel.CounterStageResult; +import org.wikidata.wdtk.dumpfiles.parallel.Stage; + +public class SquaringStage extends Stage { + + SquaringStage(){ + this.result = new CounterStageResult(); + this.producers = new LinkedList<>(); + this.consumers = new LinkedList<>(); + } + + @Override + public Integer processElement(Integer element) { + + ((CounterStageResult) this.result).increment(); + return element * element; + } + +} diff --git a/wdtk-examples/src/main/java/org/wikidata/wdtk/examples/StagedExample.java b/wdtk-examples/src/main/java/org/wikidata/wdtk/examples/StagedExample.java new file mode 100644 index 000000000..e4fa09c53 --- /dev/null +++ b/wdtk-examples/src/main/java/org/wikidata/wdtk/examples/StagedExample.java @@ -0,0 +1,61 @@ +package org.wikidata.wdtk.examples; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.wikidata.wdtk.dumpfiles.parallel.CollectorStage; +import org.wikidata.wdtk.dumpfiles.parallel.StageManager; +import org.wikidata.wdtk.dumpfiles.parallel.StageResult; + +public class StagedExample { + + + static StageManager manager = new StageManager(); + + public static void main(String[] args){ + + BlockingQueue input0 = new LinkedBlockingQueue<>(); + BlockingQueue input1 = new LinkedBlockingQueue<>(); + BlockingQueue output = new LinkedBlockingQueue<>(); + + CollectorStage collector = new CollectorStage(); + SquaringStage squarer = new SquaringStage(); + + squarer.addProducer(input0); + + manager.connectStages(squarer, collector); + + collector.addProducer(input1); + collector.addConsumer(output); + + // fill the queues + for(int i = 0; i < 1000; i++){ + input0.add(i); + input1.add(i); + } + + manager.submitStage(collector); + manager.submitStage(squarer); + + manager.run(); + + // NOTE need to give the stages some time before calling shutdown + // this will be more comfortable if the stage lifecycle is handled by the + // stage manager + synchronized(Thread.currentThread()){ + try { + Thread.currentThread().wait(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + manager.shutdown(); + manager.collectResults(); + + for(StageResult r : manager.getStageResults()){ + System.out.println(r.getId() + ": " + r.getReport()); + } + } +}