Skip to content

Commit

Permalink
Various fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cgueret committed Jan 6, 2011
1 parent f5e1eb1 commit ac87fd2
Show file tree
Hide file tree
Showing 17 changed files with 296 additions and 132 deletions.
1 change: 1 addition & 0 deletions data/dogfood_dbpedia.csv
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
DBpedia (dbpedia);http://dbpedia.org/sparql
Semantic Web Dog Food Corpus (semantic-web-dog-food);http://data.semanticweb.org/sparql
OpenLink Software LOD Cache (openlink-lod-cache);http://lod.openlinksw.com/sparql
2 changes: 1 addition & 1 deletion nl.erdf.core/src/main/java/nl/erdf/model/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public Variable add(Variable variable) {
*/
@Override
public String toString() {
String buffer = "Variables : " + variables.size() + "\n";
String buffer = "Variables : " + variables.size() + "\n";
buffer += "Constraints :\n";
for (Constraint cstr : constraints)
buffer += cstr.toString() + "\n";
Expand Down
2 changes: 1 addition & 1 deletion nl.erdf.core/src/main/java/nl/erdf/optimizer/Generate.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Generate(DataLayer datalayer, Request request) {
constraint.getPart(2));
providers.add(provider);
}
logger.info("Number of providers: " + providers.size());
//logger.info("Number of providers: " + providers.size());
}

/**
Expand Down
18 changes: 7 additions & 11 deletions nl.erdf.core/src/main/java/nl/erdf/optimizer/Optimizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ public class Optimizer extends Observable implements Runnable {
*
* @param datalayer
* @param request
* @param executor
* @param executor
*
*/
public Optimizer(final DataLayer datalayer, final Request request,
final ExecutorService executor) {
public Optimizer(final DataLayer datalayer, final Request request, final ExecutorService executor) {
// Save a pointer to the request and the datalayer
this.request = (SPARQLRequest) request;
this.datalayer = datalayer;
Expand Down Expand Up @@ -135,8 +134,7 @@ public void run() {
// logger.info("Generate");
Set<Solution> newPopulation = new HashSet<Solution>();
newPopulation.addAll(population);// Add the parents
generateOp.createPopulation(population, newPopulation,
OFFSPRING_SIZE);
generateOp.createPopulation(population, newPopulation, OFFSPRING_SIZE);

//
// Evaluate all of them
Expand All @@ -159,9 +157,9 @@ public void run() {
while (population.size() > POPULATION_SIZE)
population.remove(population.first());

for (Solution s :population)
logger.info(s.toString());
// for (Solution s :population)
// logger.info(s.toString());

//
// Track for optimality
//
Expand All @@ -178,9 +176,7 @@ public void run() {
}
boolean opt = ((age >= MAXIMUM_GENERATION) || (best.getFitness() == 1));
best.setOptimal(opt);
logger.info("Generation " + generation
+ " fitness best invididual (optimal/certain) : "
+ best.getFitness() + " (" + best.isOptimal() + ") " + age);
logger.info("Generation " + generation + " fitness best invididual: " + best.getFitness());
// logger.info(best.toString());

//
Expand Down
42 changes: 42 additions & 0 deletions nl.erdf.datalayer-sparql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,34 @@
<artifactId>datalayer-sparql</artifactId>
<version>0.1-SNAPSHOT</version>
<name>eRDF SPARQL DataLayer</name>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.1</version>
<configuration>
<archive>
<manifest>
<mainClass>nl.erdf.main.SPARQLEngine</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.dstovall</groupId>
<artifactId>onejar-maven-plugin</artifactId>
<version>1.4.4</version>
<executions>
<execution>
<goals>
<goal>one-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.hp.hpl.jena</groupId>
Expand Down Expand Up @@ -39,5 +67,19 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
</dependencies>
<pluginRepositories>
<pluginRepository>
<id>onejar-maven-plugin.googlecode.com</id>
<url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url>
</pluginRepository>
</pluginRepositories>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public int waitForCompletion() {
public void run() {
try {
// If canceled, return right away
if (isCanceled)
if (isCanceled || !endpoint.isEnabled())
return;

// Get the query pattern
Expand Down Expand Up @@ -247,6 +247,8 @@ public void run() {
long start = System.nanoTime();

// Execute the request
if (!endpoint.isEnabled())
return;
HttpResponse response = endpoint.getHttpClient().execute(httpget);
entity = response.getEntity();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
import java.util.Collection;
import java.util.LinkedList;

import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.params.ConnManagerParams;
import org.apache.http.conn.params.ConnPerRouteBean;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -23,12 +33,37 @@
// Use
// http://download.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
public class Directory {
/** Logger instance */
private static final Logger logger = LoggerFactory.getLogger(Directory.class);
// Logger instance
protected static final Logger logger = LoggerFactory.getLogger(Directory.class);

/** A list of SPARQL end points */
// A list of SPARQL end points
private LinkedList<EndPoint> listOfEndPoints = new LinkedList<EndPoint>();

// The client connection manager with avoids DoS
private final ClientConnectionManager connManager;

// Connection parameters
private final HttpParams httpParams;

/**
*
*/
public Directory() {
// Create a scheme registry
SchemeRegistry schemeRegistry = new SchemeRegistry();
schemeRegistry.register(new Scheme("http", PlainSocketFactory.getSocketFactory(), 80));

// Set some parameters for the connections
httpParams = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(httpParams, 200);
ConnManagerParams.setTimeout(httpParams, 1000);
ConnManagerParams.setMaxTotalConnections(httpParams, 400);
ConnManagerParams.setMaxConnectionsPerRoute(httpParams, new ConnPerRouteBean(2));

// Create a connection manager
connManager = new ThreadSafeClientConnManager(httpParams, schemeRegistry);
}

/**
* @return a copy of endPoints in that directory
*/
Expand All @@ -49,7 +84,7 @@ public EndPoint add(String name, String URI) {
EndPoint endPoint = new EndPoint(name, URI);
synchronized (listOfEndPoints) {
listOfEndPoints.add(endPoint);
endPoint.start();
endPoint.start(connManager, httpParams);
}
return endPoint;
} catch (URISyntaxException e) {
Expand All @@ -66,9 +101,9 @@ public EndPoint add(String name, String URI) {
*/
public synchronized void loadFrom(InputStream input) {
// Shutdown the end points if they are running
for (EndPoint endPoint: listOfEndPoints)
for (EndPoint endPoint : listOfEndPoints)
endPoint.shutdown();

// Clear the current list
listOfEndPoints.clear();

Expand Down Expand Up @@ -107,4 +142,15 @@ public void writeTo(OutputStream output) {
}
}
}

/**
*
*/
public void close() {
logger.info("Shutdown connections");
for (EndPoint endPoint: listOfEndPoints)
endPoint.shutdown();

connManager.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,7 @@

import org.apache.http.client.HttpClient;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.params.ConnManagerParams;
import org.apache.http.conn.params.ConnPerRouteBean;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,21 +29,14 @@ public class EndPoint {
static final Logger logger = LoggerFactory.getLogger(EndPoint.class);

// The Ping query used to check if an end point is alive
static final Query query = QueryFactory
.create("SELECT * WHERE {?s ?p ?o} LIMIT 1");
static final Query query = QueryFactory.create("SELECT * WHERE {?s ?p ?o} LIMIT 1");

// The name of this end point
private final String name;

// The URI poiting to it
private final URI URI;

// Parameters for the connections to SPARQL end points
private final HttpParams httpParams;

// The client connection manager with avoids DoS
private ClientConnectionManager connManager;

// Executor to run the update tasks against this end point
private ExecutorService executor;
private LinkedBlockingQueue<Runnable> jobQueue;
Expand Down Expand Up @@ -79,17 +64,10 @@ public class EndPoint {
* @param address
* @throws URISyntaxException
*/
public EndPoint(final String name, final String address)
throws URISyntaxException {
public EndPoint(final String name, final String address) throws URISyntaxException {
this.name = name;
this.URI = new URI(address);

httpParams = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(httpParams, 500);
// ConnManagerParams.setTimeout(httpParams, 5000);
ConnManagerParams.setMaxTotalConnections(httpParams, 200);
ConnManagerParams.setMaxConnectionsPerRoute(httpParams,
new ConnPerRouteBean(2));
}

/**
Expand Down Expand Up @@ -123,19 +101,29 @@ public void clear() {
public void shutdown() {
if (!isEnabled())
return;

// Stop the http client
//httpClient.getConnectionManager().shutdown();
setEnabled(false);

// Cancel remaining tasks
for (Runnable runnable : jobQueue)
if (runnable instanceof CacheUpdateTask)
((CacheUpdateTask) runnable).cancel();
jobQueue.clear();

// Wait for the running ones to be stopped
try {
while (((ThreadPoolExecutor) executor).getActiveCount() != 0)
Thread.sleep(100);
} catch (InterruptedException ie) {
}


// Stop the data executor
jobQueue.clear();
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
logger.error(name + "'s data pool did not terminate");
executor.shutdownNow();
if (!executor.awaitTermination(1, TimeUnit.SECONDS))
logger.error(name + "'s data pool did not terminate again");
logger.error(name + "'s data pool did not terminate");
}
} catch (InterruptedException ie) {
logger.error(name + " was interrupted");
Expand All @@ -148,24 +136,15 @@ public void shutdown() {
/**
*
*/
public void start() {
// Create a scheme registry
SchemeRegistry schemeRegistry = new SchemeRegistry();
schemeRegistry.register(new Scheme("http", PlainSocketFactory
.getSocketFactory(), 80));

// Create a connection manager
connManager = new ThreadSafeClientConnManager(httpParams,
schemeRegistry);

public void start(ClientConnectionManager connManager, HttpParams httpParams) {
// Create an HTTP client, disable cookies and don't retry requests
httpClient = new DefaultHttpClient(connManager, httpParams);
((DefaultHttpClient) httpClient).setCookieStore(null);
((DefaultHttpClient) httpClient).setCookieSpecs(null);
((DefaultHttpClient) httpClient).setHttpRequestRetryHandler(null);

// Create an other executor for the data service
//executor = Executors.newFixedThreadPool(5);
// executor = Executors.newFixedThreadPool(5);
jobQueue = new LinkedBlockingQueue<Runnable>();
executor = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS, jobQueue);
((ThreadPoolExecutor) executor).prestartAllCoreThreads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,15 @@ public void shutdown() {
*/
@Override
public void waitForLatencyBuffer() {
// This function blocks until there is on average less that 10
// This function blocks until there is on average less that 4
// jobs per end point queuing to be executed
double load = Double.MAX_VALUE;
while (load > 10) {
while (load > 4) {
load = 0;
for (EndPoint endpoint : directory.endPoints())
load += endpoint.getQueueSize();
load = load / directory.endPoints().size();

// We don't have
try {
Thread.sleep((long) (50 + 2 * load));
Expand Down
Loading

0 comments on commit ac87fd2

Please sign in to comment.