Skip to content

Commit

Permalink
CAMEL-11354: Optimize oldest exchange inflight per route
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus committed May 28, 2017
1 parent 4a3debe commit ce4e8f7
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 90 deletions.
6 changes: 6 additions & 0 deletions camel-core/src/main/java/org/apache/camel/Exchange.java
Expand Up @@ -16,6 +16,7 @@
*/ */
package org.apache.camel; package org.apache.camel;


import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;


Expand Down Expand Up @@ -584,4 +585,9 @@ public interface Exchange {
*/ */
List<Synchronization> handoverCompletions(); List<Synchronization> handoverCompletions();


/**
* Gets the timestamp when this exchange was created.
*/
Date getCreated();

} }
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.impl; package org.apache.camel.impl;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
Expand Down Expand Up @@ -86,6 +87,15 @@ public String toString() {
return String.format("Exchange[%s]", exchangeId == null ? "" : exchangeId); return String.format("Exchange[%s]", exchangeId == null ? "" : exchangeId);
} }


@Override
public Date getCreated() {
if (hasProperties()) {
return getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
} else {
return null;
}
}

public Exchange copy() { public Exchange copy() {
// to be backwards compatible as today // to be backwards compatible as today
return copy(false); return copy(false);
Expand Down
Expand Up @@ -167,7 +167,7 @@ protected void doStop() throws Exception {


private static long getExchangeDuration(Exchange exchange) { private static long getExchangeDuration(Exchange exchange) {
long duration = 0; long duration = 0;
Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class); Date created = exchange.getCreated();
if (created != null) { if (created != null) {
duration = System.currentTimeMillis() - created.getTime(); duration = System.currentTimeMillis() - created.getTime();
} }
Expand Down
Expand Up @@ -21,12 +21,12 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.AttributeValueExp; import javax.management.AttributeValueExp;
Expand Down Expand Up @@ -68,8 +68,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
protected final String description; protected final String description;
protected final ModelCamelContext context; protected final ModelCamelContext context;
private final LoadTriplet load = new LoadTriplet(); private final LoadTriplet load = new LoadTriplet();
private final ConcurrentSkipListMap<InFlightKey, Long> exchangesInFlightStartTimestamps = new ConcurrentSkipListMap<InFlightKey, Long>(); private final ConcurrentHashMap<String, Exchange> exchangesInFlight = new ConcurrentHashMap<String, Exchange>();
private final ConcurrentHashMap<String, InFlightKey> exchangesInFlightKeys = new ConcurrentHashMap<String, InFlightKey>();
private final String jmxDomain; private final String jmxDomain;


public ManagedRoute(ModelCamelContext context, Route route) { public ManagedRoute(ModelCamelContext context, Route route) {
Expand All @@ -85,8 +84,7 @@ public void init(ManagementStrategy strategy) {
boolean enabled = context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != ManagementStatisticsLevel.Off; boolean enabled = context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != ManagementStatisticsLevel.Off;
setStatisticsEnabled(enabled); setStatisticsEnabled(enabled);


exchangesInFlightKeys.clear(); exchangesInFlight.clear();
exchangesInFlightStartTimestamps.clear();
} }


public Route getRoute() { public Route getRoute() {
Expand Down Expand Up @@ -410,13 +408,14 @@ public String dumpRouteStatsAsXml(boolean fullStats, boolean includeProcessors)
String stat = dumpStatsAsXml(fullStats); String stat = dumpStatsAsXml(fullStats);
answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\""); answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\"");
answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\""); answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\"");
InFlightKey oldestInflightEntry = getOldestInflightEntry(); Exchange oldest = getOldestInflightEntry();
if (oldestInflightEntry == null) { if (oldest == null) {
answer.append(" oldestInflightExchangeId=\"\""); answer.append(" oldestInflightExchangeId=\"\"");
answer.append(" oldestInflightDuration=\"\""); answer.append(" oldestInflightDuration=\"\"");
} else { } else {
answer.append(" oldestInflightExchangeId=\"").append(oldestInflightEntry.exchangeId).append("\""); long duration = System.currentTimeMillis() - oldest.getCreated().getTime();
answer.append(" oldestInflightDuration=\"").append(System.currentTimeMillis() - oldestInflightEntry.timeStamp).append("\""); answer.append(" oldestInflightExchangeId=\"").append(oldest.getExchangeId()).append("\"");
answer.append(" oldestInflightDuration=\"").append(duration).append("\"");
} }
answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n"); answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n");


Expand Down Expand Up @@ -466,111 +465,49 @@ public int hashCode() {
return route.hashCode(); return route.hashCode();
} }


private InFlightKey getOldestInflightEntry() { private Exchange getOldestInflightEntry() {
Map.Entry<InFlightKey, Long> entry = exchangesInFlightStartTimestamps.firstEntry(); return exchangesInFlight.values().stream().max(Comparator.comparing(Exchange::getCreated)).orElse(null);
if (entry != null) {
return entry.getKey();
}
return null;
} }


public Long getOldestInflightDuration() { public Long getOldestInflightDuration() {
InFlightKey oldest = getOldestInflightEntry(); Exchange exchange = getOldestInflightEntry();
if (oldest == null) { if (exchange == null) {
return null;
}
Date created = exchange.getCreated();
if (created != null) {
return System.currentTimeMillis() - created.getTime();
} else {
return null; return null;
} }
return System.currentTimeMillis() - oldest.timeStamp;
} }


public String getOldestInflightExchangeId() { public String getOldestInflightExchangeId() {
InFlightKey oldest = getOldestInflightEntry(); Exchange exchange = getOldestInflightEntry();
if (oldest == null) { if (exchange == null) {
return null; return null;
} }
return oldest.exchangeId; return exchange.getExchangeId();
} }


@Override @Override
public void processExchange(Exchange exchange) { public void processExchange(Exchange exchange) {
exchangesInFlightKeys.computeIfAbsent(exchange.getExchangeId(), id -> { exchangesInFlight.put(exchange.getExchangeId(), exchange);
InFlightKey key = new InFlightKey(System.currentTimeMillis(), exchange.getExchangeId());
exchangesInFlightStartTimestamps.put(key, key.timeStamp);
return key;
});
super.processExchange(exchange); super.processExchange(exchange);
} }


@Override @Override
public void completedExchange(Exchange exchange, long time) { public void completedExchange(Exchange exchange, long time) {
exchangesInFlightKeys.computeIfPresent(exchange.getExchangeId(), (id, key) -> { exchangesInFlight.remove(exchange.getExchangeId());
exchangesInFlightStartTimestamps.remove(key);
return null;
});
super.completedExchange(exchange, time); super.completedExchange(exchange, time);
} }


@Override @Override
public void failedExchange(Exchange exchange) { public void failedExchange(Exchange exchange) {
exchangesInFlightKeys.computeIfPresent(exchange.getExchangeId(), (id, key) -> { exchangesInFlight.remove(exchange.getExchangeId());
exchangesInFlightStartTimestamps.remove(key);
return null;
});
super.failedExchange(exchange); super.failedExchange(exchange);
} }


private static class InFlightKey implements Comparable<InFlightKey> {

private final Long timeStamp;
private final String exchangeId;

InFlightKey(Long timeStamp, String exchangeId) {
this.timeStamp = timeStamp;
this.exchangeId = exchangeId;
}

@Override
public int compareTo(InFlightKey o) {
int compare = Long.compare(timeStamp, o.timeStamp);
if (compare == 0) {
return exchangeId.compareTo(o.exchangeId);
}
return compare;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

InFlightKey that = (InFlightKey) o;

if (!exchangeId.equals(that.exchangeId)) {
return false;
}
if (!timeStamp.equals(that.timeStamp)) {
return false;
}

return true;
}

@Override
public int hashCode() {
int result = timeStamp.hashCode();
result = 31 * result + exchangeId.hashCode();
return result;
}

@Override
public String toString() {
return exchangeId;
}
}

/** /**
* Used for sorting the processor mbeans accordingly to their index. * Used for sorting the processor mbeans accordingly to their index.
*/ */
Expand Down
Expand Up @@ -560,7 +560,7 @@ public static String doDumpMessageHistoryStacktrace(Exchange exchange, ExchangeF
label = URISupport.sanitizeUri(exchange.getFromEndpoint().getEndpointUri()); label = URISupport.sanitizeUri(exchange.getFromEndpoint().getEndpointUri());
} }
long elapsed = 0; long elapsed = 0;
Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class); Date created = exchange.getCreated();
if (created != null) { if (created != null) {
elapsed = new StopWatch(created).taken(); elapsed = new StopWatch(created).taken();
} }
Expand Down
Expand Up @@ -85,7 +85,7 @@ public void testExchangeCompleted() throws Exception {
assertEquals("direct://start", event.getExchange().getFromEndpoint().getEndpointUri()); assertEquals("direct://start", event.getExchange().getFromEndpoint().getEndpointUri());


// grab the created timestamp // grab the created timestamp
Date created = event.getExchange().getProperty(Exchange.CREATED_TIMESTAMP, Date.class); Date created = event.getExchange().getCreated();
assertNotNull(created); assertNotNull(created);


// calculate elapsed time // calculate elapsed time
Expand Down

0 comments on commit ce4e8f7

Please sign in to comment.