Skip to content

Commit

Permalink
Added a first draft of the staging concept.
Browse files Browse the repository at this point in the history
This is not yet coined to wikidata, but in highly experimental state.

Abstract: A stage is a set of tasks, that are to be executed in a
certain order i.e. download files from newest to oldest.

Once a stage has output available to be processed further by another
stage the output can be forwarded via synchronized queues the the next
stage(s). The stages itself run in seperate threads and are modelled as
producer-consumer problems.

In WDTK the stages might be (referring to the DumpProcessingExample):
1) find/download files
2) extract and filter revisions
3) parse revisions into JSONObjects
4) measure statistics
  • Loading branch information
Fredo Erxleben committed Apr 10, 2014
1 parent 00ac0ea commit 66e7942
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 0 deletions.
@@ -0,0 +1,45 @@
package org.wikidata.wdtk.dumpfiles.parallel;

import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;

/**
* A collector stage is a stage that collects the results of multiple producers
* and merges them into one result.
* The type of the results does not change during collection.
*
* @author fredo
*
*/
public class CollectorStage<Type> extends Stage<Type, Type> {
// NOTE this would be a good place to apply filtering and sorting

public CollectorStage(){
this.result = new CounterStageResult();
this.producers = new LinkedList<>();
}

/**
* Only one consumer can be added.
* On the attempt to add another consumer the method will return false.
*/
@Override
public synchronized boolean addConsumer(BlockingQueue<Type> consumer){
if(this.consumers.isEmpty()){
this.consumers = Collections.singletonList(consumer);
return true;
} else {
return false;
}
}

@Override
public Type processElement(Type element) {
// filter or sorting here
// sorting might need to override call()
((CounterStageResult) this.result).increment();
return element;
}

}
@@ -0,0 +1,17 @@
package org.wikidata.wdtk.dumpfiles.parallel;

public class CounterStageResult extends StageResult {

private long counter = 0;

public void increment(){
counter++;
}

@Override
public String getReport() {
return "Counted " + this.counter;
}


}
@@ -0,0 +1,12 @@
package org.wikidata.wdtk.dumpfiles.parallel;

public class FailedStageResult extends StageResult {

// TODO allow submitting a reason

@Override
public String getReport() {
return "Stage execution failed";
}

}
@@ -0,0 +1,44 @@
package org.wikidata.wdtk.dumpfiles.parallel;

import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;

import org.wikidata.wdtk.dumpfiles.WmfDumpFile;
import org.wikidata.wdtk.dumpfiles.WmfDumpFileManager;
import org.wikidata.wdtk.util.DirectoryManager;
import org.wikidata.wdtk.util.DirectoryManagerImpl;
import org.wikidata.wdtk.util.WebResourceFetcher;
import org.wikidata.wdtk.util.WebResourceFetcherImpl;

public class FileStage extends Stage<WmfDumpFile, InputStream> {

private DirectoryManager downloadDirectoryManager;
private WebResourceFetcher webResourceFetcher = new WebResourceFetcherImpl();
WmfDumpFileManager fileManager;

public FileStage(){
this.consumers = new LinkedList<>();
this.producers = new LinkedList<>();
this.waitTime = 1000;

try {
downloadDirectoryManager = new DirectoryManagerImpl(System.getProperty("user.dir"));
fileManager = new WmfDumpFileManager("wikidatawiki",
downloadDirectoryManager, webResourceFetcher);
} catch (IOException e) {
e.printStackTrace();
this.running = false;
this.result = new FailedStageResult();
}


}

@Override
public InputStream processElement(WmfDumpFile element) {
// TODO Auto-generated method stub
return null;
}

}
@@ -0,0 +1,82 @@
package org.wikidata.wdtk.dumpfiles.parallel;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;

/**
* A stage batch-processes all elements provided by producers. The results of
* the processing are given to consumers. The life cycle of a stage starts when
* calling the call()-method. Then the stage will call process() upon all
* elements provided by all consumers in order. Once all elements are depleted
* the stage waits for new elements. A stage can be signaled that there will be
* no more elements to process. It then will process all remaining elements and
* return a StageResult.
*
* @author fredo
*
*/
public abstract class Stage<InType, OutType> implements Callable<StageResult> {

protected boolean running = true;
private boolean finished = false;
protected int waitTime = 100; // in usec

protected StageResult result;
protected Collection<BlockingQueue<InType>> producers = Collections.emptyList();
protected Collection<BlockingQueue<OutType>> consumers = Collections.emptyList();

public synchronized boolean addProducer(BlockingQueue<InType> producer) {
this.producers.add(producer);
return true;
}

public synchronized boolean addConsumer(BlockingQueue<OutType> consumer) {
this.consumers.add(consumer);
return true;
}

public synchronized void finish(){
this.running = false;
}

public synchronized boolean isFinished(){
return this.finished;
}

public abstract OutType processElement(InType element);

/**
* The default implementation processes the input only element-wise
*/
@Override
public StageResult call() throws Exception {
List<InType> currentStep = new LinkedList<>();

while (this.running) {
// get all the input for the steps
for (BlockingQueue<InType> producer : this.producers) {
producer.drainTo(currentStep);
}
// process the elements
while (!currentStep.isEmpty()) {
OutType stepResult = this.processElement(currentStep.remove(0));
// distribute result to all consumers
for (BlockingQueue<OutType> consumer : this.consumers) {
consumer.put(stepResult);
}
}

// wait for new input
synchronized(this){
wait(this.waitTime);
}
}
this.finished = true;
return this.result;
}

}
@@ -0,0 +1,100 @@
package org.wikidata.wdtk.dumpfiles.parallel;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class StageManager {

// TODO the whole life cycle should be managed by this class

ExecutorService executor = Executors.newCachedThreadPool();

List<Stage<?,?>> stages = new LinkedList<>();
List<Future<StageResult>> futures = new LinkedList<>();
List<StageResult> results = new LinkedList<>();
List<BlockingQueue<?>> connectors = new LinkedList<>();

/**
*
* @param sender
* @param receiver
* @return
*/
public <CommonType> BlockingQueue<CommonType> connectStages(Stage<?, CommonType> sender, Stage<CommonType, ?> receiver){

BlockingQueue<CommonType> connectionQueue = new LinkedBlockingQueue<CommonType>();

sender.addConsumer(connectionQueue);
receiver.addProducer(connectionQueue);

this.connectors.add(connectionQueue);

return connectionQueue;
}

public void submitStage(Stage<?,?> toLaunch){

stages.add(toLaunch);
}

public void run(){
for(Stage<?,?> stage : this.stages){
Future<StageResult> result = executor.submit(stage);
this.futures.add(result);
}
}

public void collectResults(){
List<Future<StageResult>> toRemove = new LinkedList<>();

for(Future<StageResult> future : this.futures){
if(future.isDone()){
try {
toRemove.add(future);
this.results.add(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}

this.futures.removeAll(toRemove);
}

/**
* The StageResults are kept for evaluation.
* The Futures and Stages and connecting Queues will be cleared.
*/
public void shutdown(){
this.executor.shutdown();

for(Stage<?,?> stage : this.stages){
stage.finish();
}

for(BlockingQueue<?> queue : this.connectors){
queue.clear();
}

for(Future<StageResult> future : this.futures){
try {
this.results.add(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
this.stages.clear();
this.futures.clear();
this.connectors.clear();
}

public List<StageResult> getStageResults(){
return this.results;
}
}
@@ -0,0 +1,21 @@
package org.wikidata.wdtk.dumpfiles.parallel;



public abstract class StageResult {

static long nextId = 0;

private long id;

StageResult(){
this.id = nextId;
nextId++;
}

public long getId(){
return this.id;
}

public abstract String getReport();
}
@@ -0,0 +1,23 @@
package org.wikidata.wdtk.examples;

import java.util.LinkedList;

import org.wikidata.wdtk.dumpfiles.parallel.CounterStageResult;
import org.wikidata.wdtk.dumpfiles.parallel.Stage;

public class SquaringStage extends Stage<Integer, Integer> {

SquaringStage(){
this.result = new CounterStageResult();
this.producers = new LinkedList<>();
this.consumers = new LinkedList<>();
}

@Override
public Integer processElement(Integer element) {

((CounterStageResult) this.result).increment();
return element * element;
}

}

0 comments on commit 66e7942

Please sign in to comment.