Skip to content

Commit

Permalink
More precise logging of hanged collect
Browse files Browse the repository at this point in the history
  • Loading branch information
fbacchella committed Nov 16, 2015
1 parent a8e5d81 commit 095897d
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 60 deletions.
17 changes: 14 additions & 3 deletions src/jrds/starter/HostStarter.java
Expand Up @@ -13,9 +13,12 @@ public class HostStarter extends StarterNode {
private HostInfo host;
private final Set<Probe<?,?>> allProbes = new TreeSet<Probe<?,?>>();

private String runningname;

public HostStarter(HostInfo host) {
super();
this.host = host;
this.runningname = host.getName() + ":notrunning";
registerStarter(new Resolver(host.getDnsName()));
}

Expand Down Expand Up @@ -44,10 +47,9 @@ public void collectAll() {
}
log(Level.TRACE, "Starting collect for %s", probe);
log(Level.DEBUG, "Collect all stats for host " + host.getName());
String threadName = oldThreadName + "/" + probe.getName();
Thread.currentThread().setName(threadName);
setRunningname(oldThreadName + "/" + probe.getName());
probe.collect();
Thread.currentThread().setName(threadName + ":finished");
setRunningname(oldThreadName + ":finished");
}
stopCollect();
long end = System.currentTimeMillis();
Expand Down Expand Up @@ -106,4 +108,13 @@ public boolean equals(Object obj) {
return host.equals(other.getHost()) && parentEquals;
}

public String getRunningname() {
return runningname;
}

public void setRunningname(String runningname) {
Thread.currentThread().setName(runningname);
this.runningname = runningname;
}

}
178 changes: 121 additions & 57 deletions src/jrds/starter/Timer.java
Expand Up @@ -2,14 +2,23 @@

import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -20,7 +29,31 @@

public class Timer extends StarterNode {

public final static String DEFAULTNAME = "_default";

private class CollectCallable implements Callable<Object> {

private final HostStarter host;

CollectCallable(HostStarter host) {
this.host = host;
}

@Override
public String toString() {
return host.getRunningname();
}

@Override
public Object call() throws Exception {
log(Level.DEBUG, "Collect all stats for host %s", host.getName());
String collectName = Timer.this.name + "/" + "JrdsCollect-" + host.getName();
host.setRunningname(collectName);
host.collectAll();
host.setRunningname(collectName + ":notrunning");
return null;
}

};

public static final class Stats implements Cloneable {
Stats() {
Expand All @@ -42,12 +75,15 @@ public Object clone() throws CloneNotSupportedException {
}
}

public final static String DEFAULTNAME = "_default";

private final Map<String, HostStarter> hostList = new HashMap<String, HostStarter>();
private Semaphore collectMutex = new Semaphore(1);
private final Stats stats = new Stats();
private final int numCollectors;
private final String name;
private ExecutorService tpool = null;
private final Queue<Future<Object>> running = new ConcurrentLinkedQueue<>();
private ThreadPoolExecutor tpool;

public Timer(String name, PropertiesManager.TimerInfo ti) {
super();
Expand Down Expand Up @@ -78,6 +114,8 @@ public Iterable<HostStarter> getAllHosts() {
public void startTimer(java.util.Timer collectTimer) {
TimerTask collector = new TimerTask () {
public void run() {
// The collect is done in a different thread
// So a collect failure will no prevent other collect from running
Thread subcollector = new Thread("Collector/" + Timer.this.name) {
@Override
public void run() {
Expand All @@ -88,6 +126,7 @@ public void run() {
}
}
};
subcollector.setDaemon(true);
subcollector.start();
}
};
Expand All @@ -106,74 +145,79 @@ public void collectAll() {
log(Level.FATAL, "A collect start was interrupted");
return;
}
try {
final AtomicInteger counter = new AtomicInteger(0);
tpool = Executors.newFixedThreadPool(numCollectors,
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r, Timer.this.name + "/CollectorThread" + counter.getAndIncrement());
t.setDaemon(true);
log(Level.DEBUG, "New thread name: %s", t.getName());
return t;
}
final AtomicInteger counter = new AtomicInteger(0);
// Generate threads with a default name
ThreadFactory tf = new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(Timer.this.name + "/CollectorThread" + counter.getAndIncrement());
t.setDaemon(true);
log(Level.DEBUG, "New thread name: %s", getName());
return t;
}
);
startCollect();
for(final HostStarter host: hostList.values()) {
if( ! isCollectRunning())
break;
Runnable runCollect = new Runnable() {
public void run() {
log(Level.DEBUG, "Collect all stats for host %s", host.getName());
String threadName = Timer.this.name + "/" + "JrdsCollect-" + host.getName();
Thread.currentThread().setName(threadName);
host.collectAll();
Thread.currentThread().setName(threadName + ":finished");
}
@Override
public String toString() {
return Thread.currentThread().toString();
}
};
try {
tpool.execute(runCollect);
}
catch(RejectedExecutionException ex) {
log(Level.DEBUG, "collector thread dropped for host %s", host.getName());
};
synchronized (running) {
// Generate a ThreadPoolExecutor where Runnable.toString return
// Callable.toString
tpool = new ThreadPoolExecutor(numCollectors, numCollectors,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
tf) {

@Override
protected <T> RunnableFuture<T> newTaskFor(
final Callable<T> callable) {
return new FutureTask<T>(callable){
@Override
public String toString() {
return callable.toString();
}
};
}
}
tpool.shutdown();

};
}
Set<Callable<Object>> toSchedule = new HashSet<Callable<Object>>();
for(final HostStarter host: hostList.values()) {
Callable<Object> runCollect = new CollectCallable(host);
toSchedule.add(runCollect);
}
running.clear();
startCollect();
try {
try {
tpool.awaitTermination(getStep() - getTimeout() * 2 , TimeUnit.SECONDS);
if(isCollectRunning()) {
List<Future<Object>> scheduled = tpool.invokeAll(toSchedule, getStep() - getTimeout() * 2 , TimeUnit.SECONDS);
running.addAll(scheduled);
tpool.shutdown();
tpool.awaitTermination(getStep() - getTimeout() * 2 , TimeUnit.SECONDS);
}
} catch(RejectedExecutionException ex) {
log(Level.DEBUG, "collector thread refused");
} catch (InterruptedException e) {
log(Level.WARN, "Collect interrupted");
log(Level.INFO, "Collect interrupted");
}
stopCollect();
if( ! tpool.isTerminated()) {
//Second chance, we wait for the time out
boolean emergencystop = false;
try {
emergencystop = tpool.awaitTermination(getTimeout(), TimeUnit.SECONDS);
emergencystop = ! tpool.awaitTermination(getTimeout(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
log(Level.WARN, "Collect interrupted in last chance");
log(Level.INFO, "Collect interrupted in last chance");
}
if(! emergencystop) {
log(Level.WARN, "Some task still alive, needs to be killed");
if(emergencystop) {
log(Level.INFO, "Some task still alive, needs to be killed");
//Last chance to commit results
List<Runnable> timedOut = tpool.shutdownNow();
if(! timedOut.isEmpty()) {
log(Level.WARN, "Still " + timedOut.size() + " waiting probes: ");
for(Runnable r: timedOut) {
log(Level.WARN, r.toString());
}
}
tpool.shutdownNow();
dumpCollectHanged();
}
}
} catch (RuntimeException e) {
log(Level.ERROR, "problem while collecting data: ", e);
log(Level.ERROR, e, "problem while collecting data: %s", e);
}
finally {
synchronized (this) {
synchronized (running) {
tpool.shutdown();
tpool = null;
}
Expand Down Expand Up @@ -219,11 +263,31 @@ public Stats getStats() {
return stats;
}

public synchronized void interrupt() {
public void interrupt() {
log(Level.DEBUG, "timer interrupted");
if(tpool != null) {
tpool.shutdownNow();
synchronized (running) {
if (tpool != null) {
tpool.shutdownNow();
}
}
dumpCollectHanged();
}

private void dumpCollectHanged() {
while(! running.isEmpty()) {
try {
Future<Object> waiting = running.iterator().next();
if(waiting.isDone() || waiting.isCancelled()) {
running.remove(waiting);
} else {
waiting.cancel(true);
log(Level.WARN, "%s blocked", waiting.toString());
Thread.sleep(10);
}
} catch (NoSuchElementException | InterruptedException e) {
}
}

}

}

0 comments on commit 095897d

Please sign in to comment.