diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 6035fb66676e7..cfdd259e1e667 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; @@ -50,6 +51,7 @@ import org.cometd.client.BayeuxClient.State; import org.cometd.client.http.jetty.JettyHttpClientTransport; import org.cometd.client.transport.ClientTransport; +import org.cometd.common.TransportException; import org.eclipse.jetty.client.Request; import org.eclipse.jetty.http.HttpCookie; import org.eclipse.jetty.http.HttpCookieStore; @@ -82,6 +84,9 @@ public class SubscriptionHelper extends ServiceSupport { private static final String SERVER_TOO_BUSY_ERROR = "503::"; private static final String AUTHENTICATION_INVALID = "401::Authentication invalid"; private static final String INVALID_REPLAY_ID_PATTERN = "400::The replayId \\{.*} you provided was invalid.*"; + private static final String CHANNEL_INVALID_PATTERN = "400::The channel.*"; + private static final String DENIED_BY_SEC_POLICY = "403:denied_by_security_policy"; + private static final String AUTHORIZATION_ERROR = "403::"; BayeuxClient client; @@ -219,7 +224,8 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess boolean abort = true; LOG.warn(msg); - if (isTemporaryError(message)) { + if (isTemporaryError(message) || error.equals(AUTHENTICATION_INVALID) || error.startsWith(DENIED_BY_SEC_POLICY) + || error.startsWith(AUTHORIZATION_ERROR)) { // retry after delay final long backoff = handshakeBackoff.getAndAdd(backoffIncrement); @@ -255,6 +261,9 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess for (var consumer : consumers) { subscribe(consumer); } + } else if (error.matches(CHANNEL_INVALID_PATTERN)) { + LOG.warn("Channel invalid for channel {}, removing from subscription list", channelName); + channelsToSubscribe.remove(channelName); } if (abort && client != null) { @@ -465,7 +474,12 @@ ReplayExtension getReplayExtension() { private static boolean isTemporaryError(Message message) { String failureReason = getFailureReason(message); - return failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR); + if (failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR)) { + return true; + } + Exception exception = getFailure(message); + return exception instanceof IOException || exception instanceof TransportException + || exception instanceof TimeoutException; } private static String getFailureReason(Message message) {