Skip to content

Commit

Permalink
spring-projectsGH-8981: Add UnicastingDispatcher.failoverStrategy o…
Browse files Browse the repository at this point in the history
…ption

Fixes: spring-projects#8981

Sometime the simple `boolean failover` on the `MessageChannel` (default `true`)
is not enough to be sure that we can dispatch to the next handler or not.
Such a decision can be made using `ErrorMessageExceptionTypeRouter`, but that would
require an overhaul for the whole integration flow

* Introduce a simple `Predicate<Exception> failoverStrategy` into `UnicastingDispatcher`
and all its `MessageChannel` implementation consumers to allow to make a decision about next failover
according to a thrown exception from the current `MessageHandler`
* Expose `failoverStrategy` on the `DirectChannel`, `ExecutorChannel` & `PartitionedChannel`,
and add it into respective Java DSL specs
* Fix involved tests to rely on the `failoverStrategy` property from now on
* Document the new feature
  • Loading branch information
artembilan committed Mar 8, 2024
1 parent 2731e94 commit 02ac57b
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.integration.channel;

import java.util.function.Predicate;

import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
Expand Down Expand Up @@ -60,12 +62,26 @@ public DirectChannel(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
/**
* Specify whether the channel's dispatcher should have failover enabled.
* By default, it will. Set this value to 'false' to disable it.
* Overrides {@link #setFailoverStrategy(Predicate)} option.
* In other words: or this, or that option has to be set.
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
this.dispatcher.setFailover(failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
this.dispatcher.setFailoverStrategy(failoverStrategy);
}

/**
* Specify the maximum number of subscribers supported by the
* channel's dispatcher.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.integration.channel;

import java.util.concurrent.Executor;
import java.util.function.Predicate;

import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
Expand Down Expand Up @@ -49,7 +50,7 @@ public class ExecutorChannel extends AbstractExecutorChannel {

private final LoadBalancingStrategy loadBalancingStrategy;

private boolean failover = true;
private Predicate<Exception> failoverStrategy = (exception) -> true;

/**
* Create an ExecutorChannel that delegates to the provided
Expand Down Expand Up @@ -88,8 +89,20 @@ public ExecutorChannel(Executor executor, @Nullable LoadBalancingStrategy loadBa
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
this.failover = failover;
getDispatcher().setFailover(failover);
setFailoverStrategy((exception) -> failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
this.failoverStrategy = failoverStrategy;
getDispatcher().setFailoverStrategy(failoverStrategy);
}

@Override
Expand All @@ -107,7 +120,7 @@ public final void onInit() {
this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler);
}
UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher(this.executor);
unicastingDispatcher.setFailover(this.failover);
unicastingDispatcher.setFailoverStrategy(this.failoverStrategy);
if (this.maxSubscribers == null) {
this.maxSubscribers = getIntegrationProperties().getChannelsMaxUnicastSubscribers();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import java.util.function.Predicate;

import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
Expand Down Expand Up @@ -99,6 +100,18 @@ public void setFailover(boolean failover) {
getDispatcher().setFailover(failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
getDispatcher().setFailoverStrategy(failoverStrategy);
}

/**
* Provide a {@link LoadBalancingStrategy} for the {@link PartitionedDispatcher}.
* @param loadBalancingStrategy The load balancing strategy implementation.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;

import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class PartitionedDispatcher extends AbstractDispatcher {

private ThreadFactory threadFactory = new CustomizableThreadFactory("partition-thread-");

private boolean failover = true;
private Predicate<Exception> failoverStrategy = (exception) -> true;

@Nullable
private LoadBalancingStrategy loadBalancingStrategy;
Expand Down Expand Up @@ -108,7 +109,20 @@ public void setThreadFactory(ThreadFactory threadFactory) {
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
this.failover = failover;
setFailoverStrategy((exception) -> failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
Assert.notNull(failoverStrategy, "'failoverStrategy' must not be null");
this.failoverStrategy = failoverStrategy;
}

/**
Expand Down Expand Up @@ -179,7 +193,7 @@ private UnicastingDispatcher newPartition() {
this.executors.add(executor);
DelegateDispatcher delegateDispatcher =
new DelegateDispatcher(new ErrorHandlingTaskExecutor(executor, this.errorHandler));
delegateDispatcher.setFailover(this.failover);
delegateDispatcher.setFailoverStrategy(this.failoverStrategy);
delegateDispatcher.setLoadBalancingStrategy(this.loadBalancingStrategy);
delegateDispatcher.setMessageHandlingTaskDecorator(this.messageHandlingTaskDecorator);
return delegateDispatcher;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Predicate;

import org.springframework.integration.MessageDispatchingException;
import org.springframework.integration.support.utils.IntegrationUtils;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class UnicastingDispatcher extends AbstractDispatcher {

private final Executor executor;

private boolean failover = true;
private Predicate<Exception> failoverStrategy = (exception) -> true;

private LoadBalancingStrategy loadBalancingStrategy;

Expand All @@ -77,10 +78,25 @@ public UnicastingDispatcher(@Nullable Executor executor) {
* Specify whether this dispatcher should failover when a single
* {@link MessageHandler} throws an Exception. The default value is
* <code>true</code>.
* Overrides {@link #setFailoverStrategy(Predicate)} option.
* In other words: or this, or that option has to be set.
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
this.failover = failover;
setFailoverStrategy((exception) -> failover);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #setFailover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
Assert.notNull(failoverStrategy, "'failoverStrategy' must not be null");
this.failoverStrategy = failoverStrategy;
}

/**
Expand Down Expand Up @@ -154,10 +170,15 @@ private boolean doDispatch(Message<?> message) {
}
exceptions.add(runtimeException);
boolean isLast = !handlerIterator.hasNext();
if (!isLast && this.failover) {
boolean failover = this.failoverStrategy.test(ex);

if (!isLast && failover) {
logExceptionBeforeFailOver(ex, handler, message);
}
handleExceptions(exceptions, message, isLast);

if (isLast || !failover) {
handleExceptions(exceptions, message);
}
}
}
return success;
Expand Down Expand Up @@ -187,22 +208,12 @@ else if (this.logger.isInfoEnabled()) {
}
}

/**
* Handles Exceptions that occur while dispatching. If this dispatcher has
* failover enabled, it will only throw an Exception when the handler list
* is exhausted. The 'isLast' flag will be <em>true</em> if the
* Exception occurred during the final iteration of the MessageHandlers.
* If failover is disabled for this dispatcher, it will re-throw any
* Exception immediately.
*/
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
if (isLast || !this.failover) {
if (allExceptions.size() == 1) {
throw allExceptions.get(0);
}
throw new AggregateMessageDeliveryException(message,
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message) {
if (allExceptions.size() == 1) {
throw allExceptions.get(0);
}
throw new AggregateMessageDeliveryException(message,
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,8 +28,8 @@ public class DirectChannelSpec extends LoadBalancingChannelSpec<DirectChannelSpe
@Override
protected DirectChannel doGet() {
this.channel = new DirectChannel(this.loadBalancingStrategy);
if (this.failover != null) {
this.channel.setFailover(this.failover);
if (this.failoverStrategy != null) {
this.channel.setFailoverStrategy(this.failoverStrategy);
}
if (this.maxSubscribers != null) {
this.channel.setMaxSubscribers(this.maxSubscribers);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,8 +36,8 @@ protected ExecutorChannelSpec(Executor executor) {
@Override
protected ExecutorChannel doGet() {
this.channel = new ExecutorChannel(this.executor, this.loadBalancingStrategy);
if (this.failover != null) {
this.channel.setFailover(this.failover);
if (this.failoverStrategy != null) {
this.channel.setFailoverStrategy(this.failoverStrategy);
}
if (this.maxSubscribers != null) {
this.channel.setMaxSubscribers(this.maxSubscribers);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.integration.dsl;

import java.util.function.Predicate;

import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
Expand All @@ -34,7 +36,7 @@ public abstract class LoadBalancingChannelSpec<S extends MessageChannelSpec<S, C

protected LoadBalancingStrategy loadBalancingStrategy = new RoundRobinLoadBalancingStrategy(); // NOSONAR

protected Boolean failover; // NOSONAR
protected Predicate<Exception> failoverStrategy; // NOSONAR

protected Integer maxSubscribers; // NOSONAR

Expand All @@ -46,8 +48,20 @@ public S loadBalancer(LoadBalancingStrategy loadBalancingStrategyToSet) {
return _this();
}

public S failover(Boolean failoverToSet) {
this.failover = failoverToSet;
public S failover(boolean failoverToSet) {
return failoverStrategy((exception) -> failoverToSet);
}

/**
* Configure a strategy whether the channel's dispatcher should have failover enabled
* for the exception thrown.
* Overrides {@link #failover(boolean)} option.
* In other words: or this, or that option has to be set.
* @param failoverStrategy The failover boolean.
* @since 6.3
*/
public S failoverStrategy(Predicate<Exception> failoverStrategy) {
this.failoverStrategy = failoverStrategy;
return _this();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,8 +63,8 @@ protected PartitionedChannel doGet() {
this.channel = new PartitionedChannel(this.partitionCount);
}
this.channel.setLoadBalancingStrategy(this.loadBalancingStrategy);
if (this.failover != null) {
this.channel.setFailover(this.failover);
if (this.failoverStrategy != null) {
this.channel.setFailoverStrategy(this.failoverStrategy);
}
if (this.maxSubscribers != null) {
this.channel.setMaxSubscribers(this.maxSubscribers);
Expand Down

0 comments on commit 02ac57b

Please sign in to comment.