Skip to content

Commit 4bd76d3

Browse files
authored
chore: add more documentation to the quic transport code and make some errors infallible (#2515)
1 parent dda784b commit 4bd76d3

File tree

4 files changed

+71
-86
lines changed

4 files changed

+71
-86
lines changed

rs/p2p/quic_transport/src/connection_handle.rs

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! The module implements the RPC abstraction over an established QUIC connection.
22
//!
3-
use anyhow::{anyhow, Context};
3+
use anyhow::Context;
44
use bytes::Bytes;
55
use http::{Method, Request, Response, Version};
66
use ic_protobuf::transport::v1 as pb;
@@ -42,39 +42,34 @@ impl Drop for SendStreamDropGuard {
4242
#[derive(Clone, Debug)]
4343
pub struct ConnectionHandle {
4444
pub connection: Connection,
45-
metrics: QuicTransportMetrics,
46-
conn_id: ConnId,
45+
pub metrics: QuicTransportMetrics,
46+
pub conn_id: ConnId,
4747
}
4848

4949
impl ConnectionHandle {
50-
pub fn new(connection: Connection, metrics: QuicTransportMetrics, conn_id: ConnId) -> Self {
51-
Self {
52-
connection,
53-
metrics,
54-
conn_id,
55-
}
56-
}
57-
58-
pub fn conn_id(&self) -> ConnId {
59-
self.conn_id
60-
}
61-
62-
/// Performs an RPC operation on the already established connection.
50+
/// Executes an RPC operation over an already-established connection.
51+
///
52+
/// This method leverages the QUIC transport layer, which continuously monitors the connection’s health
53+
/// and automatically attempts reconnection as necessary. As a result, any errors returned by this method
54+
/// should be considered transient (retryable).
55+
///
56+
/// In this P2P architecture, where there is a designated dialer and receiver, connection management
57+
/// is delegated solely to the transport layer. This differs from typical client-server architectures,
58+
/// where connections can be managed directly by the caller.
6359
///
64-
/// Since the QUIC transport layer continuously monitors connection health and automatically reconnects as needed,
65-
/// all errors returned by this method can be safely treated as transient (retryable).
66-
/// Therefore, we return an anyhow::Error for error handling.
60+
/// Note: This method provides the same cancellation safety guarantees as the `quinn::Connection` methods.
6761
pub async fn rpc(&self, request: Request<Bytes>) -> Result<Response<Bytes>, anyhow::Error> {
6862
let _timer = self
6963
.metrics
7064
.connection_handle_duration_seconds
7165
.with_label_values(&[request.uri().path()])
7266
.start_timer();
73-
self.metrics
67+
68+
let bytes_sent_counter = self
69+
.metrics
7470
.connection_handle_bytes_sent_total
75-
.with_label_values(&[request.uri().path()])
76-
.inc_by(request.body().len() as u64);
77-
let in_counter = self
71+
.with_label_values(&[request.uri().path()]);
72+
let bytes_received_counter = self
7873
.metrics
7974
.connection_handle_bytes_received_total
8075
.with_label_values(&[request.uri().path()]);
@@ -95,6 +90,7 @@ impl ConnectionHandle {
9590
.unwrap_or_default();
9691
let _ = send_stream.set_priority(priority.into());
9792

93+
bytes_sent_counter.inc_by(request.body().len() as u64);
9894
write_request(send_stream, request).await.inspect_err(|_| {
9995
self.metrics
10096
.connection_handle_errors_total
@@ -123,7 +119,7 @@ impl ConnectionHandle {
123119
.inc();
124120
})?;
125121

126-
in_counter.inc_by(response.body().len() as u64);
122+
bytes_received_counter.inc_by(response.body().len() as u64);
127123
Ok(response)
128124
}
129125
}
@@ -137,16 +133,10 @@ async fn read_response(mut recv_stream: RecvStream) -> Result<Response<Bytes>, a
137133
let response_proto = pb::HttpResponse::decode(raw_msg.as_slice())
138134
.with_context(|| "Failed to decode response header.")?;
139135

140-
let status: u16 = match response_proto.status_code.try_into() {
141-
Ok(status) => status,
142-
Err(e) => {
143-
return Err(anyhow!(
144-
"Received invalid status code {} {}",
145-
response_proto.status_code,
146-
e
147-
))
148-
}
149-
};
136+
let status: u16 = response_proto
137+
.status_code
138+
.try_into()
139+
.with_context(|| "Failed to decode status code.")?;
150140

151141
let mut response = Response::builder().status(status).version(Version::HTTP_3);
152142
for h in response_proto.headers {
@@ -188,7 +178,7 @@ async fn write_request(
188178
Method::CONNECT => pb::HttpMethod::Connect.into(),
189179
Method::PATCH => pb::HttpMethod::Patch.into(),
190180
Method::TRACE => pb::HttpMethod::Trace.into(),
191-
_ => return Err(anyhow!("invalid method")),
181+
_ => pb::HttpMethod::Unspecified.into(),
192182
},
193183
body: body.into(),
194184
};
@@ -197,8 +187,7 @@ async fn write_request(
197187
send_stream
198188
.write_all(&request_bytes)
199189
.await
200-
.with_context(|| "Failed to write request to stream.")?;
201-
Ok(())
190+
.with_context(|| "Failed to write request to stream.")
202191
}
203192

204193
// tests

rs/p2p/quic_transport/src/connection_manager.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -523,12 +523,14 @@ impl ConnectionManager {
523523
self.conn_id_counter.inc_assign();
524524
let conn_id = self.conn_id_counter;
525525

526-
let connection_handle =
527-
ConnectionHandle::new(connection, self.metrics.clone(), conn_id);
528-
let req_handler_connection_handle = connection_handle.clone();
526+
let connection_handle = ConnectionHandle {
527+
connection,
528+
metrics: self.metrics.clone(),
529+
conn_id,
530+
};
529531

530532
// dropping the old connection will result in closing it
531-
if let Some(old_conn) = peer_map_mut.insert(peer_id, connection_handle) {
533+
if let Some(old_conn) = peer_map_mut.insert(peer_id, connection_handle.clone()) {
532534
old_conn
533535
.connection
534536
.close(VarInt::from_u32(0), b"using newer connection");
@@ -549,9 +551,7 @@ impl ConnectionManager {
549551
run_stream_acceptor(
550552
self.log.clone(),
551553
peer_id,
552-
req_handler_connection_handle.conn_id(),
553-
req_handler_connection_handle.connection,
554-
self.metrics.clone(),
554+
connection_handle,
555555
self.router.clone(),
556556
),
557557
&self.rt,

rs/p2p/quic_transport/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl Transport for QuicTransport {
203203
.read()
204204
.unwrap()
205205
.iter()
206-
.map(|(n, c)| (*n, c.conn_id()))
206+
.map(|(n, c)| (*n, c.conn_id))
207207
.collect()
208208
}
209209
}

rs/p2p/quic_transport/src/request_handler.rs

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,20 @@
1212
//!
1313
use std::time::Duration;
1414

15-
use anyhow::{anyhow, Context};
16-
use axum::{
17-
body::Body,
18-
http::{Method, Request, Response, Version},
19-
Router,
20-
};
15+
use anyhow::Context;
16+
use axum::{body::Body, Router};
2117
use bytes::Bytes;
18+
use http::{Method, Request, Response, Version};
2219
use ic_base_types::NodeId;
2320
use ic_logger::{info, ReplicaLogger};
2421
use ic_protobuf::transport::v1 as pb;
2522
use prost::Message;
26-
use quinn::{Connection, RecvStream, SendStream};
23+
use quinn::{RecvStream, SendStream};
2724
use tower::ServiceExt;
2825
use tracing::instrument;
2926

3027
use crate::{
28+
connection_handle::ConnectionHandle,
3129
metrics::{
3230
QuicTransportMetrics, ERROR_TYPE_ACCEPT, ERROR_TYPE_APP, ERROR_TYPE_FINISH,
3331
ERROR_TYPE_READ, ERROR_TYPE_STOPPED, ERROR_TYPE_WRITE, STREAM_TYPE_BIDI,
@@ -37,12 +35,10 @@ use crate::{
3735

3836
const QUIC_METRIC_SCRAPE_INTERVAL: Duration = Duration::from_secs(5);
3937

40-
pub(crate) async fn run_stream_acceptor(
38+
pub async fn run_stream_acceptor(
4139
log: ReplicaLogger,
4240
peer_id: NodeId,
43-
conn_id: ConnId,
44-
connection: Connection,
45-
metrics: QuicTransportMetrics,
41+
conn_handle: ConnectionHandle,
4642
router: Router,
4743
) {
4844
let mut inflight_requests = tokio::task::JoinSet::new();
@@ -55,18 +51,18 @@ pub(crate) async fn run_stream_acceptor(
5551
loop {
5652
tokio::select! {
5753
_ = quic_metrics_scrape.tick() => {
58-
metrics.collect_quic_connection_stats(&connection, &peer_id);
54+
conn_handle.metrics.collect_quic_connection_stats(&conn_handle.connection, &peer_id);
5955
}
60-
bi = connection.accept_bi() => {
56+
bi = conn_handle.connection.accept_bi() => {
6157
match bi {
6258
Ok((bi_tx, bi_rx)) => {
6359
inflight_requests.spawn(
64-
metrics.request_task_monitor.instrument(
60+
conn_handle.metrics.request_task_monitor.instrument(
6561
handle_bi_stream(
6662
log.clone(),
6763
peer_id,
68-
conn_id,
69-
metrics.clone(),
64+
conn_handle.conn_id,
65+
conn_handle.metrics.clone(),
7066
router.clone(),
7167
bi_tx,
7268
bi_rx
@@ -76,7 +72,7 @@ pub(crate) async fn run_stream_acceptor(
7672
}
7773
Err(e) => {
7874
info!(log, "Error accepting bi stream {}", e.to_string());
79-
metrics
75+
conn_handle.metrics
8076
.request_handle_errors_total
8177
.with_label_values(&[
8278
STREAM_TYPE_BIDI,
@@ -87,8 +83,8 @@ pub(crate) async fn run_stream_acceptor(
8783
}
8884
}
8985
},
90-
_ = connection.accept_uni() => {},
91-
_ = connection.read_datagram() => {},
86+
_ = conn_handle.connection.accept_uni() => {},
87+
_ = conn_handle.connection.read_datagram() => {},
9288
Some(completed_request) = inflight_requests.join_next() => {
9389
if let Err(err) = completed_request {
9490
// Cancelling tasks is ok. Panicking tasks are not.
@@ -181,33 +177,34 @@ async fn read_request(mut recv_stream: RecvStream) -> Result<Request<Body>, anyh
181177
let request_proto = pb::HttpRequest::decode(raw_msg.as_slice())
182178
.with_context(|| "Failed to decode http request.")?;
183179

184-
let mut request = Request::builder()
185-
.method(match pb::HttpMethod::try_from(request_proto.method) {
186-
Ok(pb::HttpMethod::Get) => Method::GET,
187-
Ok(pb::HttpMethod::Post) => Method::POST,
188-
Ok(pb::HttpMethod::Put) => Method::PUT,
189-
Ok(pb::HttpMethod::Delete) => Method::DELETE,
190-
Ok(pb::HttpMethod::Head) => Method::HEAD,
191-
Ok(pb::HttpMethod::Options) => Method::OPTIONS,
192-
Ok(pb::HttpMethod::Connect) => Method::CONNECT,
193-
Ok(pb::HttpMethod::Patch) => Method::PATCH,
194-
Ok(pb::HttpMethod::Trace) => Method::TRACE,
195-
Ok(pb::HttpMethod::Unspecified) => {
196-
return Err(anyhow!("received http method unspecified."));
197-
}
198-
Err(e) => {
199-
return Err(anyhow!("received invalid method {}", e));
200-
}
201-
})
180+
let pb_http_method = pb::HttpMethod::try_from(request_proto.method)
181+
.with_context(|| "Failed to decode http method.")?;
182+
let http_method = match pb_http_method {
183+
pb::HttpMethod::Get => Some(Method::GET),
184+
pb::HttpMethod::Post => Some(Method::POST),
185+
pb::HttpMethod::Put => Some(Method::PUT),
186+
pb::HttpMethod::Delete => Some(Method::DELETE),
187+
pb::HttpMethod::Head => Some(Method::HEAD),
188+
pb::HttpMethod::Options => Some(Method::OPTIONS),
189+
pb::HttpMethod::Connect => Some(Method::CONNECT),
190+
pb::HttpMethod::Patch => Some(Method::PATCH),
191+
pb::HttpMethod::Trace => Some(Method::TRACE),
192+
pb::HttpMethod::Unspecified => None,
193+
};
194+
let mut request_builder = Request::builder();
195+
if let Some(http_method) = http_method {
196+
request_builder = request_builder.method(http_method);
197+
}
198+
request_builder = request_builder
202199
.version(Version::HTTP_3)
203200
.uri(request_proto.uri);
204201
for h in request_proto.headers {
205202
let pb::HttpHeader { key, value } = h;
206-
request = request.header(key, value);
203+
request_builder = request_builder.header(key, value);
207204
}
208205
// This consumes the body without requiring allocation or cloning the whole content.
209206
let body_bytes = Bytes::from(request_proto.body);
210-
request
207+
request_builder
211208
.body(Body::from(body_bytes))
212209
.with_context(|| "Failed to build request.")
213210
}
@@ -241,6 +238,5 @@ async fn write_response(
241238
send_stream
242239
.write_all(&response_bytes)
243240
.await
244-
.with_context(|| "Failed to write request to stream.")?;
245-
Ok(())
241+
.with_context(|| "Failed to write request to stream.")
246242
}

0 commit comments

Comments
 (0)