-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: peer connection handling #4929
Conversation
bb6b2a7
to
8000eb9
Compare
Since `request` is used in query strategies, it will often be canceled, possibly cancelling the connection attempts. That's not great. By using Jit, we can get connection to peers happen in the background, and avoid the problem. While at it - keep attempting to reconnect. The strategy will cancel the request anyway.
let shared: Arc<_> = tokio::sync::Mutex::new(FederationPeerClientShared::new()).into(); | ||
|
||
Self { | ||
client: Self::new_jit_client(peer_id, url, shared.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm on the fence here if a new FederationPeerClient
should connect in the background immediately, or start with Err("Not connected")
Jit state.
By connecting immediately, it can be read immediately when it's actually needed, which might make things slightly more snappy.
On the downside - if no request is ever made, the connection is kind of unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just connect asap. If it's the CLI we don't care about wasted resources imo as long as it doesn't impact latency. For anything more long running we'll want a connection anyway 99% of times.
I would rather land and follow up with any fixes due to #4940 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly dislike reducing our unit testing coverage, but there's nothing broken about the code afaik. So ACK under the condition that we add back some testing for all that reconnect logic (especially concurrency-safety).
} | ||
|
||
#[test_log::test(tokio::test)] | ||
async fn concurrent_requests() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing if concurrency works seems kinda important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous test was just testing implementation details of previous implementation, that's why it no longer worked after it changed. I don't really want to add another test like it.
I think about a test that creates an Api and then calls in a loop for N threads for some time and checks everything.
@@ -1516,13 +1516,26 @@ where | |||
{ | |||
#[instrument(level = "trace", fields(peer = %self.peer_id, %method), skip_all)] | |||
pub async fn request(&self, method: &str, params: &[Value]) -> JsonRpcResult<Value> { | |||
loop { | |||
for attempts in 0.. { | |||
let rclient = self.client.read().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could assert attempts <= 1
here. Took me a bit to figure out that invariant.
.unwrap_or_default(); | ||
|
||
let sleep_duration = desired_timeout.saturating_sub(since_last_connect); | ||
if Duration::from_millis(0) < sleep_duration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if Duration::from_millis(0) < sleep_duration { | |
if sleep_duration > Duration::ZERO { |
Err(e) => { | ||
// Strategies using timeouts often depend on failing requests returning quickly, | ||
// so every request gets only one reconnection attempt. | ||
if 0 < attempts { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Else log something?
%err, "Unable to connect to peer"); | ||
return Err(err)?; | ||
} | ||
_ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this Ok(_client)
? Better be explicit about it.
return Err(err)?; | ||
} | ||
_ => { | ||
if 0 < attempts { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Else log something?
let mut wclient = self.client.write().await; | ||
match wclient.client.get_try().await { | ||
Ok(client) if client.is_connected() => { | ||
// someone else connected, just loop again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug!/trace! ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor stuff, can be implemented later
All but extra tests in #4946 |
See commit messages
Fix #4837
Summary:
To avoid very spamming with reconnection attempts, track connection attempts and add some delay.
Since
request
is cancellable, we could drop the future in the middle of connecting. Do it in a background task (Jit) instead.