Skip to content

Commit

Permalink
Improve SupervisingRouteController
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Aug 9, 2017
1 parent a030a22 commit 637e2b6
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 86 deletions.
Expand Up @@ -27,7 +27,6 @@
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -52,7 +51,6 @@
import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.spi.RoutePolicyFactory;
import org.apache.camel.support.EventNotifierSupport; import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.backoff.BackOff; import org.apache.camel.util.backoff.BackOff;
import org.apache.camel.util.backoff.BackOffContext; import org.apache.camel.util.backoff.BackOffContext;
Expand Down Expand Up @@ -192,6 +190,10 @@ public Collection<Filter> getFilters() {
return Collections.unmodifiableList(filters); return Collections.unmodifiableList(filters);
} }


public Optional<BackOffContext> getBackOffContext(String id) {
return routeManager.getBackOffContext(id);
}

// ********************************* // *********************************
// Lifecycle // Lifecycle
// ********************************* // *********************************
Expand Down Expand Up @@ -337,41 +339,34 @@ public Collection<Route> getControlledRoutes() {
private void doStopRoute(RouteHolder route, boolean checker, ThrowingConsumer<RouteHolder, Exception> consumer) throws Exception { private void doStopRoute(RouteHolder route, boolean checker, ThrowingConsumer<RouteHolder, Exception> consumer) throws Exception {
synchronized (lock) { synchronized (lock) {
if (checker) { if (checker) {
// remove them from checked routes so they don't get started by the // remove it from checked routes so the route don't get started
// routes check task as a manual operation on the routes indicates that // by the routes manager task as a manual operation on the routes
// the route is then managed manually // indicates that the route is then managed manually
routeManager.release(route); routeManager.release(route);
} }


ServiceStatus status = route.getStatus(); LOGGER.info("Route {} has been requested to stop: stop supervising it", route.getId());
if (!status.isStoppable()) {
LOGGER.debug("Route {} status is {}, skipping", route.getId(), status);
return;
}

consumer.accept(route);


// Mark the route as un-managed // Mark the route as un-managed
route.getContext().setRouteController(null); route.getContext().setRouteController(null);

consumer.accept(route);
} }
} }


private void doStartRoute(RouteHolder route, boolean checker, ThrowingConsumer<RouteHolder, Exception> consumer) throws Exception { private void doStartRoute(RouteHolder route, boolean checker, ThrowingConsumer<RouteHolder, Exception> consumer) throws Exception {
synchronized (lock) { synchronized (lock) {
ServiceStatus status = route.getStatus(); // If a manual start is triggered, then the controller should take
if (!status.isStartable()) { // care that the route is started
LOGGER.debug("Route {} status is {}, skipping", route.getId(), status); route.getContext().setRouteController(this);
return;
}


try { try {
if (checker) { if (checker) {
// remove it from checked routes as a manual start may trigger
// a new back off task if start fails
routeManager.release(route); routeManager.release(route);
} }


// Mark the route as managed
route.getContext().setRouteController(this);

consumer.accept(route); consumer.accept(route);
} catch (Exception e) { } catch (Exception e) {


Expand Down Expand Up @@ -444,7 +439,7 @@ private synchronized void stopRoutes() {


private class RouteManager { private class RouteManager {
private final Logger logger; private final Logger logger;
private final ConcurrentMap<RouteHolder, CompletableFuture<BackOffContext>> routes; private final ConcurrentMap<RouteHolder, BackOffTimer.Task> routes;


RouteManager() { RouteManager() {
this.logger = LoggerFactory.getLogger(RouteManager.class); this.logger = LoggerFactory.getLogger(RouteManager.class);
Expand All @@ -461,9 +456,7 @@ void start(RouteHolder route) {


logger.info("Start supervising route: {} with back-off: {}", r.getId(), backOff); logger.info("Start supervising route: {} with back-off: {}", r.getId(), backOff);


// Return this future as cancel does not have effect on the BackOffTimer.Task task = timer.schedule(backOff, context -> {
// computation (future chain)
CompletableFuture<BackOffContext> future = timer.schedule(backOff, context -> {
try { try {
logger.info("Try to restart route: {}", r.getId()); logger.info("Try to restart route: {}", r.getId());


Expand All @@ -474,22 +467,19 @@ void start(RouteHolder route) {
} }
}); });


future.whenComplete((context, throwable) -> { task.whenComplete((context, throwable) -> {
if (context == null || context.isExhausted()) { if (context == null || context.getStatus() != BackOffContext.Status.Active) {
// This indicates that the future has been cancelled // This indicates that the task has been cancelled
// or that back-off retry is exhausted thus if the // or that back-off retry is exhausted thus if the
// route is not started it is moved out of the supervisor. // route is not started it is moved out of the

// supervisor control.
if (context != null && context.isExhausted()) {
LOGGER.info("Back-off for route {} is exhausted, no more attempts will be made", route.getId());
}


synchronized (lock) { synchronized (lock) {
final ServiceStatus status = route.getStatus(); final ServiceStatus status = route.getStatus();
final boolean stopped = status.isStopped() || status.isStopping();


if (status.isStopped() || status.isStopping()) { if (context != null && context.getStatus() == BackOffContext.Status.Exhausted && stopped) {
LOGGER.info("Route {} has status {}, stop supervising it", route.getId(), status); LOGGER.info("Back-off for route {} is exhausted, no more attempts will be made and stop supervising it", route.getId());

r.getContext().setRouteController(null); r.getContext().setRouteController(null);
} }
} }
Expand All @@ -498,24 +488,33 @@ void start(RouteHolder route) {
routes.remove(r); routes.remove(r);
}); });


return future; return task;
} }
); );
} }


boolean release(RouteHolder route) { boolean release(RouteHolder route) {
CompletableFuture<BackOffContext> future = routes.remove(route); BackOffTimer.Task task = routes.remove(route);
if (future != null) { if (task != null) {
future.cancel(true); LOGGER.info("Cancel restart task for route {}", route.getId());
task.cancel();
} }


return future != null; return task != null;
} }


void clear() { void clear() {
routes.forEach((k, v) -> v.cancel(true)); routes.values().forEach(BackOffTimer.Task::cancel);
routes.clear(); routes.clear();
} }

public Optional<BackOffContext> getBackOffContext(String id) {
return routes.entrySet().stream()
.filter(e -> ObjectHelper.equal(e.getKey().getId(), id))
.findFirst()
.map(Map.Entry::getValue)
.map(BackOffTimer.Task::getContext);
}
} }


// ********************************* // *********************************
Expand Down
Expand Up @@ -20,18 +20,30 @@
* The context associated to a back-off operation. * The context associated to a back-off operation.
*/ */
public final class BackOffContext { public final class BackOffContext {
public enum Status {
Active,
Inactive,
Exhausted
}

private final BackOff backOff; private final BackOff backOff;


private Status status;
private long currentAttempts; private long currentAttempts;
private long currentDelay; private long currentDelay;
private long currentElapsedTime; private long currentElapsedTime;
private long lastAttemptTime;
private long nextAttemptTime;


public BackOffContext(BackOff backOff) { public BackOffContext(BackOff backOff) {
this.backOff = backOff; this.backOff = backOff;
this.status = Status.Active;


this.currentAttempts = 0; this.currentAttempts = 0;
this.currentDelay = backOff.getDelay().toMillis(); this.currentDelay = backOff.getDelay().toMillis();
this.currentElapsedTime = 0; this.currentElapsedTime = 0;
this.lastAttemptTime = BackOff.NEVER;
this.nextAttemptTime = BackOff.NEVER;
} }


// ************************************* // *************************************
Expand All @@ -45,6 +57,13 @@ public BackOff getBackOff() {
return backOff; return backOff;
} }


/**
* Gets the context status.
*/
public Status getStatus() {
return status;
}

/** /**
* The number of attempts so far. * The number of attempts so far.
*/ */
Expand All @@ -67,10 +86,31 @@ public long getCurrentElapsedTime() {
} }


/** /**
* Inform if the context is exhausted thus not more attempts should be made. * The time the last attempt has been performed.
*/
public long getLastAttemptTime() {
return lastAttemptTime;
}

/**
* Used by BackOffTimer
*/
void setLastAttemptTime(long lastAttemptTime) {
this.lastAttemptTime = lastAttemptTime;
}

/**
* An indication about the time the next attempt will be made.
*/
public long getNextAttemptTime() {
return nextAttemptTime;
}

/**
* Used by BackOffTimer
*/ */
public boolean isExhausted() { void setNextAttemptTime(long nextAttemptTime) {
return currentDelay == BackOff.NEVER; this.nextAttemptTime = nextAttemptTime;
} }


// ************************************* // *************************************
Expand All @@ -86,14 +126,16 @@ long next() {
// A call to next when currentDelay is set to NEVER has no effects // A call to next when currentDelay is set to NEVER has no effects
// as this means that either the timer is exhausted or it has explicit // as this means that either the timer is exhausted or it has explicit
// stopped // stopped
if (currentDelay != BackOff.NEVER) { if (status == Status.Active) {


currentAttempts++; currentAttempts++;


if (currentAttempts > backOff.getMaxAttempts()) { if (currentAttempts > backOff.getMaxAttempts()) {
currentDelay = BackOff.NEVER; currentDelay = BackOff.NEVER;
status = Status.Exhausted;
} else if (currentElapsedTime > backOff.getMaxElapsedTime().toMillis()) { } else if (currentElapsedTime > backOff.getMaxElapsedTime().toMillis()) {
currentDelay = BackOff.NEVER; currentDelay = BackOff.NEVER;
status = Status.Exhausted;
} else { } else {
if (currentDelay <= backOff.getMaxDelay().toMillis()) { if (currentDelay <= backOff.getMaxDelay().toMillis()) {
currentDelay = (long) (currentDelay * backOff.getMultiplier()); currentDelay = (long) (currentDelay * backOff.getMultiplier());
Expand All @@ -113,6 +155,9 @@ public BackOffContext reset() {
this.currentAttempts = 0; this.currentAttempts = 0;
this.currentDelay = 0; this.currentDelay = 0;
this.currentElapsedTime = 0; this.currentElapsedTime = 0;
this.lastAttemptTime = BackOff.NEVER;
this.nextAttemptTime = BackOff.NEVER;
this.status = Status.Active;


return this; return this;
} }
Expand All @@ -125,6 +170,9 @@ public BackOffContext stop() {
this.currentAttempts = 0; this.currentAttempts = 0;
this.currentDelay = BackOff.NEVER; this.currentDelay = BackOff.NEVER;
this.currentElapsedTime = 0; this.currentElapsedTime = 0;
this.lastAttemptTime = BackOff.NEVER;
this.nextAttemptTime = BackOff.NEVER;
this.status = Status.Inactive;


return this; return this;
} }
Expand Down

0 comments on commit 637e2b6

Please sign in to comment.