-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement optimized wildcard request dispatching #3
Conversation
This reduces the sub/unsub churn caused by a naive implementation of the request method. This is the method used in the official nats.go client. When a timeout on request happens, the request method is responsible for cleaning up. The request handler will always remove the subscription handle when a message has been received.
.parse()?; | ||
|
||
// Only subscribe to the wildcard subscription when requested once! | ||
if client.request_wildcard_subscription.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably move this to the connect method. I figured it might be a little bit better here so users won't find a random wildcard subscription each time they connect. TODO: Check to see what the nats people do in nats.go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it here. It looks like it is what nats.go does.
src/lib.rs
Outdated
wrapped_client: Arc<Mutex<Self>>, | ||
mut subscription_rx: MpscReceiver<Msg>, | ||
) { | ||
let disconnecting = Self::disconnecting(Arc::clone(&wrapped_client)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that cleaning up at this point (disconnecting state) might cause the client to end up in a weird place if a request slips in after the cleanup in this fn runs but before the connection is officially disconnected and then reconnected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to explicitly unsubscribe from request_wildcard_subscription
after we have detected a disconnect in the server_messages_handler
? This should cause subscription_rx.next()
to return None
which would break us out of the loop. This has two advantages. It cleans up the code a bit and ensures that we have actually disconnected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea. I have to manually drop it from the subscriptions list since the unsubscribe method will reject everything when in a disconnecting state. But it's still a lot cleaner!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. This is great! I had a few points of feedback.
src/lib.rs
Outdated
wrapped_client: Arc<Mutex<Self>>, | ||
mut subscription_rx: MpscReceiver<Msg>, | ||
) { | ||
let disconnecting = Self::disconnecting(Arc::clone(&wrapped_client)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to explicitly unsubscribe from request_wildcard_subscription
after we have detected a disconnect in the server_messages_handler
? This should cause subscription_rx.next()
to return None
which would break us out of the loop. This has two advantages. It cleans up the code a bit and ensures that we have actually disconnected.
requester_tx.send(msg).await.unwrap_or_else(|err| { | ||
warn!("Could not write response to pending request via mapping channel. Skipping! Err: {}", err); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add an else
case that logs an error and has a debug_assert!(false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the nats.go client I don't think we can safely assert the else
case won't/ shouldn't be hit. In golang, the zero-value nil channel is safe to non-blocking write to (fails silently) so if some bad-actor publishes a message to our wildcard subscription, the assert would panic, where the go client would continue just fine. Of course, the debug_assert!
won't panic in release mode, but it's certainly expected behavior. Still happy to add it though. Just thought I'd point that out first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that is a good point. I agree; an assert does not make much sense, but I think a warn level message describing what is going on would be good to have.
.parse()?; | ||
|
||
// Only subscribe to the wildcard subscription when requested once! | ||
if client.request_wildcard_subscription.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it here. It looks like it is what nats.go does.
// Make sure we clean up on error (don't leave a dangling request | ||
// inbox mapping reference. Adding an extra mutex here seems fine | ||
// since this is the error path. | ||
match rx.next().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use ok_or_else
to remove the explict match
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried playing with this before but was struggling to figure out how to make the compiler happy. Doing:
rx.next().await.ok_or_else(|| async {
let mut client = wrapped_client.lock().await;
client
.request_inbox_mapping
.remove(&reply_to)
.or_else(|| None);
Error::NoResponse
})
Would fail with
= note: expected type `std::result::Result<_, impl core::future::future::Future>`
found type `std::result::Result<_, types::error::Error>`
= note: the return type of a function must have a statically known size
I'm probably missing something obvious, but not sure what I'm missing. Pointers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gets ugly but I think something like this would work
rx.next().await.ok_or_else(move || {
tokio::spawn(async move {
let mut client = wrapped_client.lock().await;
client
.request_inbox_mapping
.remove(&reply_to.to_string())
.or_else(|| None);
});
Error::NoResponse
})
But now that I look at it the explict spawn is proably worse than the match. Either way is fine.
This is interesting. In the timeout case, there is no way for the user to clean up because they do not know the subject name and dont have access to the internal |
The nats.go client adds a timeout option and selects on both channels to always be able to cleanup before returning to the caller. It might be worth adding a timeout arg to the request function? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks for making those changes
It might be worth adding a timeout arg to the request function?
Yeah, I think that is probably the most straightforward option.
Another solution would be to have a Request
struct. A user would call something like get_request
on the Client
which would return a new Request
. The Request
struct would have at least two methods: request
(would actually do the request) and forget
(would remove the request subject from our lookup) (once async drop is supported we could forget about the request when it dropped which would be cool). This has the advantage that the user could implement any logic to forget about the request (instead of just timeouts).
We could easily do both! request_with_timeout
would be easy to implement in the context of Request
option.
@film42 I am going to go ahead and merge this PR (thanks again), but feel free to implement one or both of these.
@davidMcneil Thank you for the great feedback. I like those ideas! |
This implements the "Support improved request-reply implementation" item of the TODO list in the Readme.
This reduces the sub/unsub churn caused by a naive implementation of the request method. This is the method used in the official nats.go client.
When a timeout on request happens, the request method is responsible for cleaning up. The request handler will always remove the subscription handle when a message has been received.