Skip to content

Commit

Permalink
CAMEL-8223: Inflight repository to allow browsing of current inflight…
Browse files Browse the repository at this point in the history
… exchanges
  • Loading branch information
davsclaus committed Jan 9, 2015
1 parent 996e32c commit 416654d
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ public interface ManagedInflightRepositoryMBean extends ManagedServiceMBean {
@ManagedOperation(description = "Lists all the exchanges which are currently inflight")
TabularData browse();

@ManagedOperation(description = "Lists all the exchanges which are currently inflight, limited and sorted")
TabularData browse(int limit, boolean sortByLongestDuration);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -88,9 +89,30 @@ public int size(String routeId) {

@Override
public Collection<InflightExchange> browse() {
return browse(-1, false);
}

@Override
public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) {
List<InflightExchange> answer = new ArrayList<InflightExchange>();
for (Exchange exchange : inflight.values()) {

List<Exchange> values = new ArrayList<Exchange>(inflight.values());
if (sortByLongestDuration) {
Collections.sort(values, new Comparator<Exchange>() {
@Override
public int compare(Exchange e1, Exchange e2) {
long d1 = getExchangeDuration(e1);
long d2 = getExchangeDuration(e2);
return Long.compare(d1, d2);
}
});
}

for (Exchange exchange : values) {
answer.add(new InflightExchangeEntry(exchange));
if (limit > 0 && answer.size() >= limit) {
break;
}
}
return Collections.unmodifiableCollection(answer);
}
Expand All @@ -110,6 +132,15 @@ protected void doStop() throws Exception {
routeCount.clear();
}

private static long getExchangeDuration(Exchange exchange) {
long duration = 0;
Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
if (created != null) {
duration = System.currentTimeMillis() - created.getTime();
}
return duration;
}

private static final class InflightExchangeEntry implements InflightExchange {

private final Exchange exchange;
Expand All @@ -125,12 +156,7 @@ public Exchange getExchange() {

@Override
public long getDuration() {
long duration = 0;
Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
if (created != null) {
duration = System.currentTimeMillis() - created.getTime();
}
return duration;
return DefaultInflightRepository.getExchangeDuration(exchange);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh

@Override
protected void doStart() throws Exception {

if (exchangeFormatter == null) {
// setup exchange formatter to be used for message history dump
DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
Expand All @@ -69,7 +68,6 @@ protected void doStart() throws Exception {
exchangeWatchDog = new Thread(woker);
}
exchangeWatchDog.start();

}

@Override
Expand All @@ -78,7 +76,6 @@ protected void doStop() throws Exception {
woker.stop();
exchangeWatchDog = null;
}

}

@Override
Expand All @@ -97,13 +94,11 @@ public void remove(Exchange exchange) {
@Override
public void add(Exchange exchange, String routeId) {
// do nothing here

}

@Override
public void remove(Exchange exchange, String routeId) {
// do nothing here

}

@Override
Expand All @@ -120,7 +115,6 @@ public int size(Endpoint endpoint) {
@Override
public void removeRoute(String routeId) {
// We don't support this interface yet

}

@Override
Expand All @@ -134,14 +128,18 @@ public Collection<InflightExchange> browse() {
return null;
}

@Override
public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) {
return null;
}

public long getWaitTime() {
return waitTime;
}

public void setWaitTime(long waitTime) {
this.waitTime = waitTime;
}


public long getTimeout() {
return timeout;
Expand All @@ -159,7 +157,6 @@ public void setExchangeFormatter(ExchangeFormatter exchangeFormatter) {
this.exchangeFormatter = exchangeFormatter;
}


protected void processTimeoutExchange(Exchange exchange, long processingTime) {
// print out exchange history or send an alarm
// dump a route stack trace of the exchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,15 @@ public int size(String routeId) {

@Override
public TabularData browse() {
return browse(-1, false);
}

@Override
public TabularData browse(int limit, boolean sortByLongestDuration) {
try {
TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listInflightExchangesTabularType());
Collection<InflightRepository.InflightExchange> exchanges = inflightRepository.browse();
Collection<InflightRepository.InflightExchange> exchanges = inflightRepository.browse(limit, sortByLongestDuration);

for (InflightRepository.InflightExchange entry : exchanges) {
CompositeType ct = CamelOpenMBeanTypes.listInflightExchangesCompositeType();
String exchangeId = entry.getExchange().getExchangeId();
Expand All @@ -80,5 +86,4 @@ public TabularData browse() {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,13 @@ interface InflightExchange {
*/
Collection<InflightExchange> browse();

/**
* A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight.
*
* @param limit maximum number of entries to return
* @param sortByLongestDuration to sort by the longest duration. Set to <tt>true</tt> to include the exchanges that has been inflight the longest time,
* set to <tt>false</tt> to include the exchanges in unspecified order.
*/
Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration);

}

0 comments on commit 416654d

Please sign in to comment.