Skip to content

Commit

Permalink
Use CompletionService to fail early when a thread crashes (#980)
Browse files Browse the repository at this point in the history
* Changed Runnable to Callable
* Use isInterrupted
  • Loading branch information
boldtrn authored and karussell committed Mar 22, 2017
1 parent 951b443 commit 8f400c3
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
Expand Up @@ -32,8 +32,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.TimeUnit;


import static com.graphhopper.util.Parameters.CH.DISABLE; import static com.graphhopper.util.Parameters.CH.DISABLE;


Expand Down Expand Up @@ -279,38 +278,41 @@ public void setPreparationThreads(int preparationThreads) {
} }


public void prepare(final StorableProperties properties) { public void prepare(final StorableProperties properties) {
ExecutorCompletionService completionService = new ExecutorCompletionService<>(threadPool);
int counter = 0; int counter = 0;
for (final PrepareContractionHierarchies prepare : getPreparations()) { for (final PrepareContractionHierarchies prepare : getPreparations()) {
LOGGER.info((++counter) + "/" + getPreparations().size() + " calling CH prepare.doWork for " + prepare.getWeighting() + " ... (" + Helper.getMemInfo() + ")"); LOGGER.info((++counter) + "/" + getPreparations().size() + " calling CH prepare.doWork for " + prepare.getWeighting() + " ... (" + Helper.getMemInfo() + ")");
final String name = AbstractWeighting.weightingToFileName(prepare.getWeighting()); final String name = AbstractWeighting.weightingToFileName(prepare.getWeighting());
threadPool.execute(new Runnable() { completionService.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
String errorKey = CH.PREPARE + "error." + name; String errorKey = CH.PREPARE + "error." + name;
try { try {
// toString is not taken into account so we need to cheat, see http://stackoverflow.com/q/6113746/194609 for other options // toString is not taken into account so we need to cheat, see http://stackoverflow.com/q/6113746/194609 for other options
Thread.currentThread().setName(name); Thread.currentThread().setName(name);

properties.put(errorKey, "CH preparation incomplete"); properties.put(errorKey, "CH preparation incomplete");
prepare.doWork(); prepare.doWork();
properties.remove(errorKey); properties.remove(errorKey);
properties.put(CH.PREPARE + "date." + name, Helper.createFormatter().format(new Date())); properties.put(CH.PREPARE + "date." + name, Helper.createFormatter().format(new Date()));
} catch (Exception ex) { } catch (Exception ex) {
LOGGER.error("Problem while CH preparation " + name, ex); LOGGER.error("Problem while CH preparation " + name, ex);
properties.put(errorKey, ex.getMessage()); properties.put(errorKey, ex.getMessage());
throw ex;
} }
} }
}); }, name);

} }


threadPool.shutdown(); threadPool.shutdown();
try {
if (!threadPool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS))
threadPool.shutdownNow();


} catch (InterruptedException ie) { try {
for (int i = 0; i < getPreparations().size(); i++) {
completionService.take().get();
}
} catch (Exception e) {
threadPool.shutdownNow(); threadPool.shutdownNow();
throw new RuntimeException(ie); throw new RuntimeException(e);
} }
} }


Expand Down
Expand Up @@ -304,6 +304,11 @@ void contractNodes() {


CHEdgeIterator iter = vehicleAllExplorer.setBaseNode(polledNode); CHEdgeIterator iter = vehicleAllExplorer.setBaseNode(polledNode);
while (iter.next()) { while (iter.next()) {

if(Thread.currentThread().isInterrupted()){

This comment has been minimized.

Copy link
@easbar

easbar Sep 10, 2019

Member

@boldtrn was there a specifc reason you put this check inside this while loop ? would it not be sufficient to put it one level up into the outer while loop ?

This comment has been minimized.

Copy link
@karussell

karussell Sep 10, 2019

Member

It might be that one iteration of it takes relative long for world wide and the last nodes?

Is this a performance problem? If yes, we should move it out of this loop.

This comment has been minimized.

Copy link
@easbar

easbar Sep 10, 2019

Member

The intention here is just to make sure the thread is interrupted not too long after setting the interrupt flag ? I wouldn't expect this time becomes too long if we move the check one level up. And yes for performance it should be better outside (but also not sure if this is relevant).

This comment has been minimized.

Copy link
@boldtrn

boldtrn Sep 10, 2019

Author Member

The intention here is just to make sure the thread is interrupted not too long after setting the interrupt flag?

I think this was the reason yes.

I just found the discussion about this :).

This comment has been minimized.

Copy link
@boldtrn

boldtrn Sep 10, 2019

Author Member

And here

This comment has been minimized.

Copy link
@easbar

easbar Sep 10, 2019

Member

Ok I'll move it one level up: It will be still be in the main loop (so the check will be performed once per node and the time until the interruption will be at most the time of a single node contraction (well below a second which should be ok ?)

throw new RuntimeException("Thread was interrupted");
}

int nn = iter.getAdjNode(); int nn = iter.getAdjNode();
if (prepareGraph.getLevel(nn) != maxLevel) if (prepareGraph.getLevel(nn) != maxLevel)
continue; continue;
Expand Down

0 comments on commit 8f400c3

Please sign in to comment.