Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can cometd client code be configured to receive the full List of Messages instead of 1 at a time #1248

Open
brendanjlynch4014 opened this issue Oct 6, 2022 · 7 comments
Labels

Comments

@brendanjlynch4014
Copy link

CometD version(4.0x)
Java version & vendor (use: java -version) OpenJDK Runtime Environment Temurin-11.0.14+9 (build 11.0.14+9)

Question
I'm looking way to optimize my consumption process and would like to know if there is a way to receive the entire response content that I can see is received by the LongPollingTransport

The org.cometd.client.transport.LongPollingTransport is receiving a batch of messages as content from server which I can see from DEBUG logs ( Example at bottom) and then looks like the org.cometd.client.BayeuxClient may disperse these as separate Messages.

LOG EXAMPLE below:

10/06 11:26:34.319 DEBUG LongPollingTransport$2.onComplete: Received messages [{data={schema=qy5YQv-sJxUibkXe5FO-Tw, payload={Sync_Type__c=1664815340_D, CreatedById=0053h000003iaYhAAI, Payload__c=null, CreatedDate=2022-10-03T16:42:20.504Z, Event_Type__c=null}, event={replayId=6351356}}, channel=/event/Dynamic_Sync__e}, {data={schema=qy5YQv-sJxUibkXe5FO-Tw, payload={Sync_Type__c=1664815575_D, CreatedById=0053h000003iaYhAAI, Payload__c=null, CreatedDate=2022-10-03T16:46:14.920Z, Event_Type__c=null}, event={replayId=6351365}}, channel=/event/Dynamic_Sync__e}, {clientId=3j7q50ueelhrvxc32my8hh7xm2g, channel=/meta/connect, id=4, successful=true}]

10/06 11:26:34.319 DEBUG BayeuxClient.processMessages: Processing {data={schema=qy5YQv-sJxUibkXe5FO-Tw, payload={Sync_Type__c=1664815340_D, CreatedById=0053h000003iaYhAAI, Payload__c=null, CreatedDate=2022-10-03T16:42:20.504Z, Event_Type__c=null}, event={replayId=6351356}}, channel=/event/Dynamic_Sync__e}

10/06 11:26:39.375 DEBUG BayeuxClient.processMessages: Processing {data={schema=qy5YQv-sJxUibkXe5FO-Tw, payload={Sync_Type__c=1664815575_D, CreatedById=0053h000003iaYhAAI, Payload__c=null, CreatedDate=2022-10-03T16:46:14.920Z, Event_Type__c=null}, event={replayId=6351365}}, channel=/event/Dynamic_Sync__e}

@sbordet
Copy link
Member

sbordet commented Oct 6, 2022

No, it is currently not possible.

Main reason being that each message may be treated differently by extensions and listeners that may be user-specific code.
Also, from your logs above, you received 3 messages, but only 2 were "application" messages, so CometD still need to do some processing.

However, at the very bottom, CometD does a for loop and calls your code, which does not seem much different from you doing a for loop and calling your code.

Have you evidence that the current model is inefficient, and by how much?

@brendanjlynch4014
Copy link
Author

The log example was shorted by me and was just to help convey that long polling receives an entire Batch of messages and that's what I want to receive.

There is a very specific performance issue that I'm dealing with which is the reason for my questions.

The performance is not necessarily in cometd code but because of latency in downstream processes it causes issues with the cometd connections.

We have issues completing our downstream process for larger batches within the 40 seconds required before the Server drops by subscription. This causes the 403 connect errors to be raised . And, my APP has issues recovering from the 403 error and needs to be Killed and Restarted. [was trying to resolve for months without success]

I was hoping I could configure client to tell the Server to limit the batch size of messages. But, as I understand it I have no control from client side to control the BATCH size of the messages sent Server to Client. (I saw a separate thread of yours where I think you mentioned needing add Extensions on client and server in order to do it)

That is how I came up with the Question. Since Server is returning 1 response with multiple messages then I might be nice option to allow it to be received at Client as 1 response with multiple messages. Yes, I'm eventually going to parse it into individual events but I could be in control of where that happens.

Thanks for responding so quickly.

@sbordet
Copy link
Member

sbordet commented Oct 6, 2022

The performance is not necessarily in cometd code but because of latency in downstream processes it causes issues with the cometd connections.

Can't you just dispatch the processing to another thread (or otherwise asynchronously), so that the CometD listener returns immediately so that the CometD thread can continue the processing?

Also, you can just queue up on the client batches of N messages, with a timer to drain the queue for processing.
This also will free up the CometD thread to continue the processing.

Error 403 is typically related to protocol or security errors, you should not be getting it for lack of processing.
What CometD version are you using?

@brendanjlynch4014
Copy link
Author

in my opinion the Async method is susceptible to receiving duplicate events or even missing events when my client has to reconnect from a Last ReplayId. I can't have 2 threads working on different batches because I have to maintain 1 ReplayId as last committed.

I'm not sure how to implement that Timer to drain queue. Are you trying to say I should try to set some other Timer that is less then the 40 seconds.

The 403 error occurs because the Server as dropped my Client's subscription when 40 seconds elapsed. Then after I finally complete the batch that I'm working on I'll encounter the {advice={reconnect=handshake, interval=0}, channel=/meta/connect, id=3140, error=403::Unknown client, successful=false}

@sbordet
Copy link
Member

sbordet commented Oct 6, 2022

in my opinion the Async method is susceptible to receiving duplicate events or even missing events when my client has to reconnect from a Last ReplayId. I can't have 2 threads working on different batches because I have to maintain 1 ReplayId as last committed.

This seems all logic outside CometD, which has no ReplayId, whatever that is.
Not sure what you mean by "receiving duplicate events", CometD does not do that, unless the application does that.

I'm not sure how to implement that Timer to drain queue. Are you trying to say I should try to set some other Timer that is less then the 40 seconds.

I'm saying that if you receive M messages over the network in a single /meta/connect response, you can queue them (one by one) elsewhere, and then have a timer that every N seconds drains the queue.
Now you can process your messages in order, as a batch, in a different thread, without compromising the CometD processing.

403::Unknown client is not an error reported by this project.
Is this Salesforce?
Otherwise, you still have not specified what CometD version.

@brendanjlynch4014
Copy link
Author

Thanks again for engaging in conversation.

I am talking to a Salesforce Server. And, my client side code is using cometd version : 4.0.9

I guess 'ReplayId' must be unique to Salesforce implementation. But, every message is uniquely identified and used as a placeholder for where to begin the Subscription whenever starting , reconnecting, or resubscribing to a topic(channel)

I'm trying to queue them one by one elsewhere but that downstream process can sometimes take longer than this 40 second limit for the batch N messages received. And, I'm not able to configure anything to tell the server to send smaller Batch.

Now I find myself trying to queue them on another queueing system (like kafka or mq) which may be more responsive and try to beat the 40 second timer.

implementation 'org.cometd.java:bayeux-api:4.0.9'
implementation 'org.cometd.java:cometd-java-client:4.0.9'
implementation 'org.cometd.java:cometd-java-common:4.0.9'

@sbordet
Copy link
Member

sbordet commented Oct 7, 2022

I'm trying to queue them one by one elsewhere but that downstream process can sometimes take longer than this 40 second limit for the batch N messages received.

It should not matter. You have the CometD thread that just does the queueing, so it is super-quick and you can continue to receive messages.

On a different thread, you dequeue and it does not matter how long the processing takes, because now you're decoupled from CometD. The only problem could be that you receive messages faster than you can process, and you'll have a memory problem. That is outside of CometD too.

CometD 4.0.x is End-of-Life since a long while now, and CometD 5.0.x is going to be End of Community Support very soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants