Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling long-running operations #135

Open
Hulkstance opened this issue Sep 4, 2023 · 2 comments
Open

Handling long-running operations #135

Hulkstance opened this issue Sep 4, 2023 · 2 comments

Comments

@Hulkstance
Copy link

Hulkstance commented Sep 4, 2023

If a message takes 5 seconds to be processed as it involves some database operations, it shouldn't slow the server processing pipeline overall as that's going to cause backpressure. In such situations I really wonder what @Marfusios you would do:

  1. Making server message processing run in parallel
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext) =>
    source
        .Select(item => Observable.FromAsync(() => onNext(item)))
        .Merge()
        .Subscribe();

instead of

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext) =>
    source
        .Select(item => Observable.FromAsync(async () => await onNext(item)))
        .Concat()
        .Subscribe();
  1. Offload the data processors/handlers to a separate thread

What do you think is best? Both solutions are working but I kinda want to know someone else's opinion.

@Marfusios
Copy link
Owner

Marfusios commented Sep 4, 2023

Hey @Hulkstance ,

good question, but it really depends on your usecase.
Can those messages be handled in parallel, or do they need to be processed one by one?
Do you need to process every message, or can you discard some of them?
What will be the load, can the server send more messages than is the size of your memory?

After answering, you can choose the solution that best fits your problem.

In my recent projects, I usually wanted to process messages one by one and keep the system safe against any server overflooding, so I went with Channels.
I have created a bounded channel, wrote into it from my subscribe method, and then a separate thread that was processing those messages one by one.

One example here - Nostr bot, although here I went with unbounded because there is a low chance of being overflooded. Also, I want to process every message even though it can slow down the overall bot reaction.

@Hulkstance
Copy link
Author

Hulkstance commented Sep 5, 2023

@Marfusios, thank you for your insights! Messages can definitely be processed in parallel because the order of the messages is not important as I've built a wrapper that matches each request with its response using IDs. This needs to happen quickly due to a strict timeout for catching responses. Missing messages is not an option, as the system will run out of sync with the exchange, and would have to fallback to REST for re-synchronization.

But maybe at some point the order of the messages does matter since there is a logon, logoff and session reconnection on server side. Talking about CQG btw.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants