Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Camel-20918: fix exception handling for Salesforce Streaming API #14778

Merged
merged 8 commits into from
Jul 22, 2024

Conversation

bartoszpop
Copy link
Contributor

@bartoszpop bartoszpop commented Jul 10, 2024

CAMEL-20918: fix exception handling for Salesforce Streaming API:

  • execute subscriptionListener in a separate thread similarly to handshakeListener and connectListener,
  • fix SubscriptionHelper#listenerMap remaining empty when an exception occurs while subscribing to a channel,
  • remove SubscriptionHelper#handshake as there is always a thread hanging in this method as per CAMEL-20388,
  • remove the call to closeChannel for topic listeners as they will be removed on SubscriptionHelper#unsubscribe,
  • fix replay extension not to overwrite ReplayId per channel if already set.

- execute subscriptionListener in a separate thread similarly to handshakeListener and connectListener,
- fix SubscriptionHelper#listenerMap remaining empty when an exception occurs while subscribing to a channel,
- remove SubscriptionHelper#handshake as there is always a thread hanging in this method as per https://issues.apache.org/jira/browse/CAMEL-20388,
- remove the call to closeChannel for topic listeners as they will be removed on SubscriptionHelper#unsubscribe
Copy link
Contributor

🌟 Thank you for your contribution to the Apache Camel project! 🌟

🤖 CI automation will test this PR automatically.

🐫 Apache Camel Committers, please review the following items:

  • First-time contributors require MANUAL approval for the GitHub Actions to run

  • You can use the command /component-test (camel-)component-name1 (camel-)component-name2.. to request a test from the test bot.

  • You can label PRs using build-all, build-dependents, skip-tests and test-dependents to fine-tune the checks executed by this PR.

  • Build and test logs are available in the Summary page. Only Apache Camel committers have access to the summary.

  • ⚠️ Be careful when sharing logs. Review their contents before sharing them publicly.

@davsclaus
Copy link
Contributor

@jeremyross if you have a chance to take a quick look at this PR then that is appreciated

@Jasonlhy
Copy link

I am evaluating camel for Salesforce Platform event, I have also noticed the issue. I tested it on my machine and this pull request seems fixed the issue, this is the log I got

	at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.lambda$createConnectionListener$2(SubscriptionHelper.java:162)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
[ead #29 - SalesforceHttpClient] SubscriptionHelper             WARN  Connect failure: {failure={exception=java.net.UnknownHostException: <mycompanY>.sandbox.my.salesforce.com: nodename nor servname provided, or not known, message={clientId=arjyrbbfdpyy7ry124ktuvyos91u, advice={timeout=0}, channel=/meta/connect, id=18, connectionType=long-polling}, connectionType=long-polling}, channel=/meta/connect, id=18, successful=false}
Exception in thread "Camel (camel-1) thread #29 - SalesforceHttpClient" java.lang.IllegalStateException: Invalid state UNCONNECTED
	at org.cometd.client.BayeuxClient.handshake(BayeuxClient.java:310)
	at org.cometd.bayeux.client.ClientSession.handshake(ClientSession.java:68)
	at org.cometd.bayeux.client.ClientSession.handshake(ClientSession.java:59)
	at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.lambda$createConnectionListener$2(SubscriptionHelper.java:162)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
[ead #31 - SalesforceHttpClient] SubscriptionHelper             WARN  Connect failure: {failure={exception=java.net.UnknownHostException: <mycompanY>.sandbox.my.salesforce.com: nodename nor servname provided, or not known, message={clientId=arjyrbbfdpyy7ry124ktuvyos91u, advice={timeout=0}, channel=/meta/connect, id=19, connectionType=long-polling}, connectionType=long-polling}, channel=/meta/connect, id=19, successful=false}
Exception in thread "Camel (camel-1) thread #31 - SalesforceHttpClient" java.lang.IllegalStateException: Invalid state UNCONNECTED
	at org.cometd.client.BayeuxClient.handshake(BayeuxClient.java:310)
	at org.cometd.bayeux.client.ClientSession.handshake(ClientSession.java:68)
	at org.cometd.bayeux.client.ClientSession.handshake(ClientSession.java:59)
	at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.lambda$createConnectionListener$2(SubscriptionHelper.java:162)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
[ead #33 - SalesforceHttpClient] SubscriptionHelper             WARN  Connect failure: {failure={exception=java.net.UnknownHostException: <mycompanY>.sandbox.my.salesforce.com: nodename nor servname provided, or not known, message={clientId=arjyrbbfdpyy7ry124ktuvyos91u, advice={timeout=0}, channel=/meta/connect, id=20, connectionType=long-polling}, connectionType=long-polling}, channel=/meta/connect, id=20, successful=false}
Exception in thread "Camel (camel-1) thread #33 - SalesforceHttpClient" java.lang.IllegalStateException: Invalid state UNCONNECTED
	at org.cometd.client.BayeuxClient.handshake(BayeuxClient.java:310)
	at org.cometd.bayeux.client.ClientSession.handshake(ClientSession.java:68)
	at org.cometd.bayeux.client.ClientSession.handshake(ClientSession.java:59)
	at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.lambda$createConnectionListener$2(SubscriptionHelper.java:162)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
[ead #35 - SalesforceHttpClient] SubscriptionHelper             WARN  Connect failure: {advice={reconnect=handshake, interval=0}, channel=/meta/connect, id=21, error=403::Unknown client, successful=false}
[ead #36 - SalesforceHttpClient] SubscriptionHelper             INFO  Handshake successful. Channels to subscribe: [/event/PointHistoryEvent__e, /event/AccountEvent__e]
[ead #37 - SalesforceHttpClient] SubscriptionHelper             INFO  Subscribing to channels: [/event/PointHistoryEvent__e, /event/AccountEvent__e]
[ead #37 - SalesforceHttpClient] SubscriptionHelper             INFO  Subscribing to channel /event/PointHistoryEvent__e...
[ead #37 - SalesforceHttpClient] SubscriptionHelper             INFO  Subscribing to channel /event/AccountEvent__e...
[lesforceHttpClient@6ad5923a-29] ResponseListeners              INFO  Exception while notifying listener org.cometd.client.http.jetty.JettyHttpClientTransport$ResponseListener@4e9c40a3
java.util.ConcurrentModificationException: null
	at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013) ~[?:?]
	at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967) ~[?:?]
	at java.base/java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1054) ~[?:?]
	at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$1.storeCookies(SubscriptionHelper.java:392) ~[camel-salesforce-4.7.0-SNAPSHOT.jar:4.7.0-SNAPSHOT]
	at org.cometd.client.http.jetty.JettyHttpClientTransport.access$000(JettyHttpClientTransport.java:52) ~[cometd-java-client-http-jetty-8.0.2.jar:8.0.2]
	at org.cometd.client.http.jetty.JettyHttpClientTransport$ResponseListener.onHeader(JettyHttpClientTransport.java:239) ~[cometd-java-client-http-jetty-8.0.2.jar:8.0.2]
	at org.eclipse.jetty.client.transport.ResponseListeners.notifyHeader(ResponseListeners.java:130) ~[jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.client.transport.ResponseListeners.notifyHeader(ResponseListeners.java:122) ~[jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.client.transport.HttpReceiver.lambda$responseHeader$1(HttpReceiver.java:203) ~[jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.SerializedInvoker$Link.run(SerializedInvoker.java:191) [jetty-util-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.SerializedInvoker.run(SerializedInvoker.java:117) [jetty-util-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.client.transport.HttpReceiver.responseHeader(HttpReceiver.java:191) [jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.client.transport.internal.HttpReceiverOverHTTP.parsedHeader(HttpReceiverOverHTTP.java:419) [jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.http.HttpParser.parsedHeader(HttpParser.java:1203) [jetty-http-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.http.HttpParser.parseFields(HttpParser.java:1362) [jetty-http-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:1641) [jetty-http-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.client.transport.internal.HttpReceiverOverHTTP.parse(HttpReceiverOverHTTP.java:312) [jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.client.transport.internal.HttpReceiverOverHTTP.parseAndFill(HttpReceiverOverHTTP.java:250) [jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.client.transport.internal.HttpReceiverOverHTTP.receive(HttpReceiverOverHTTP.java:76) [jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.client.transport.internal.HttpChannelOverHTTP.receive(HttpChannelOverHTTP.java:97) [jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.client.transport.internal.HttpConnectionOverHTTP.onFillable(HttpConnectionOverHTTP.java:207) [jetty-client-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:322) [jetty-io-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:99) [jetty-io-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.io.ssl.SslConnection$SslEndPoint.onFillable(SslConnection.java:574) [jetty-io-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:390) [jetty-io-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:150) [jetty-io-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:99) [jetty-io-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53) [jetty-io-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:478) [jetty-util-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:441) [jetty-util-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:293) [jetty-util-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:201) [jetty-util-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:311) [jetty-util-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:979) [jetty-util-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1209) [jetty-util-12.0.10.jar:12.0.10]
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1164) [jetty-util-12.0.10.jar:12.0.10]
	at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
[ead #38 - SalesforceHttpClient] SubscriptionHelper             INFO  Subscribed to channel /event/PointHistoryEvent__e
[ead #39 - SalesforceHttpClient] SubscriptionHelper             INFO  Subscribed to channel /event/AccountEvent__e
[lesforceHttpClient@6ad5923a-32] route2                         INFO  Received Platform Event: PlatformEvent: createdById: 0050p000004qnldAAA, createdId: 2024-07-13T08:53:00.408Z, data: {Data__c=ChineseLastName__c:mulesofttest -> Test camel, ID__c=001H100001PGZAGIA5}
json: {"created":1720860780.408000000,"createdById":"0050p000004qnldAAA","eventData":{"Data__c":"ChineseLastName__c:mulesofttest -> Test camel","ID__c":"001H100001PGZAGIA5"}}
[lesforceHttpClient@6ad5923a-32] ReactorConnectionCache         INFO  {"az.sdk.message":"The connection is closed, requesting a new connection.","entityPath":"N/A","connectionId":"MF_b31637_1720860344474"}
[lesforceHttpClient@6ad5923a-32] ReactorConnectionCache         INFO  {"az.sdk.message":"Waiting to connect and become active.","entityPath":"N/A","connectionId":"MF_959b2e_1720860804288"}
[lesforceHttpClient@6ad5923a-32] ReactorConnection              INFO  {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_959b2e_1720860804288","hostName":"jsonservicebus.servicebus.windows.net","port":5671}
[lesforceHttpClient@6ad5923a-32] ReactorExecutor                INFO  {"az.sdk.message":"Starting reactor.","connectionId":"MF_959b2e_1720860804288"}

@oscerd
Copy link
Contributor

oscerd commented Jul 22, 2024

/component-tests salesforce

Result ❌ The tests failed please check the logs

Copy link
Contributor

🤖 The Apache Camel test robot will run the tests for you 👍

@oscerd oscerd merged commit c32244d into apache:main Jul 22, 2024
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants