From d416819242d2b1b4ad598d444896957df2a5d8db Mon Sep 17 00:00:00 2001 From: shaipan Date: Wed, 29 Apr 2026 19:59:50 +0530 Subject: [PATCH 1/2] camel-salesforce: Improve error resilience in subscription failure handling --- .../streaming/SubscriptionHelper.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 6035fb66676e7..fa47bd85819ac 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; @@ -50,6 +51,7 @@ import org.cometd.client.BayeuxClient.State; import org.cometd.client.http.jetty.JettyHttpClientTransport; import org.cometd.client.transport.ClientTransport; +import org.cometd.common.TransportException; import org.eclipse.jetty.client.Request; import org.eclipse.jetty.http.HttpCookie; import org.eclipse.jetty.http.HttpCookieStore; @@ -82,6 +84,10 @@ public class SubscriptionHelper extends ServiceSupport { private static final String SERVER_TOO_BUSY_ERROR = "503::"; private static final String AUTHENTICATION_INVALID = "401::Authentication invalid"; private static final String INVALID_REPLAY_ID_PATTERN = "400::The replayId \\{.*} you provided was invalid.*"; + private static final String CHANNEL_INVALID_PATTERN = "400::The channel.*"; + private static final String DENIED_BY_SEC_POLICY = "403:denied_by_security_policy"; + private static final String AUTHORIZATION_ERROR = "403::"; + private static final String ORG_LIMIT_ERROR = "403::Organization total events daily limit exceeded"; BayeuxClient client; @@ -219,7 +225,9 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess boolean abort = true; LOG.warn(msg); - if (isTemporaryError(message)) { + if (isTemporaryError(message) || error.equals(AUTHENTICATION_INVALID) || error.startsWith(DENIED_BY_SEC_POLICY) + || error.startsWith(AUTHORIZATION_ERROR) || error.equals(ORG_LIMIT_ERROR) + || error.equals("Missing error message")) { // retry after delay final long backoff = handshakeBackoff.getAndAdd(backoffIncrement); @@ -229,8 +237,6 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess abort = false; LOG.debug("Pausing for {} msecs before subscribe attempt", backoff); - // Use Camel's task API for backoff delay instead of Thread.sleep() - // We use initialDelay for the actual delay, and maxIterations(1) to run once Tasks.foregroundTask() .withBudget(Budgets.iterationBudget() .withMaxIterations(1) @@ -255,6 +261,9 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess for (var consumer : consumers) { subscribe(consumer); } + } else if (error.matches(CHANNEL_INVALID_PATTERN)) { + LOG.warn("Channel invalid for channel {}, removing from subscription list", channelName); + channelsToSubscribe.remove(channelName); } if (abort && client != null) { @@ -465,7 +474,12 @@ ReplayExtension getReplayExtension() { private static boolean isTemporaryError(Message message) { String failureReason = getFailureReason(message); - return failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR); + if (failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR)) { + return true; + } + Exception exception = getFailure(message); + return exception instanceof IOException || exception instanceof TransportException + || exception instanceof TimeoutException; } private static String getFailureReason(Message message) { From 1ebb3b91277da973bbd4691da91d6a7aad8cb83e Mon Sep 17 00:00:00 2001 From: shaipan Date: Thu, 30 Apr 2026 15:50:11 +0530 Subject: [PATCH 2/2] Address review: remove redundant ORG_LIMIT_ERROR, drop silent retry on missing error, restore comments --- .../salesforce/internal/streaming/SubscriptionHelper.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index fa47bd85819ac..cfdd259e1e667 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -87,7 +87,6 @@ public class SubscriptionHelper extends ServiceSupport { private static final String CHANNEL_INVALID_PATTERN = "400::The channel.*"; private static final String DENIED_BY_SEC_POLICY = "403:denied_by_security_policy"; private static final String AUTHORIZATION_ERROR = "403::"; - private static final String ORG_LIMIT_ERROR = "403::Organization total events daily limit exceeded"; BayeuxClient client; @@ -226,8 +225,7 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess LOG.warn(msg); if (isTemporaryError(message) || error.equals(AUTHENTICATION_INVALID) || error.startsWith(DENIED_BY_SEC_POLICY) - || error.startsWith(AUTHORIZATION_ERROR) || error.equals(ORG_LIMIT_ERROR) - || error.equals("Missing error message")) { + || error.startsWith(AUTHORIZATION_ERROR)) { // retry after delay final long backoff = handshakeBackoff.getAndAdd(backoffIncrement); @@ -237,6 +235,8 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess abort = false; LOG.debug("Pausing for {} msecs before subscribe attempt", backoff); + // Use Camel's task API for backoff delay instead of Thread.sleep() + // We use initialDelay for the actual delay, and maxIterations(1) to run once Tasks.foregroundTask() .withBudget(Budgets.iterationBudget() .withMaxIterations(1)