Skip to content

Commit e500f91

Browse files
authored
fix: reset send streams the futures get aborted and improve error reporting on the receive side of the quic transport (#2566)
1 parent 3ad26ca commit e500f91

File tree

4 files changed

+94
-131
lines changed

4 files changed

+94
-131
lines changed

rs/p2p/quic_transport/src/connection_handle.rs

Lines changed: 19 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,22 @@
22
//!
33
use std::sync::atomic::{AtomicU64, Ordering};
44

5-
use anyhow::Context;
65
use bytes::Bytes;
76
use http::{Method, Request, Response, Version};
87
use ic_protobuf::transport::v1 as pb;
98
use prost::Message;
10-
use quinn::{Connection, ReadToEndError, SendStream, StoppedError, VarInt};
9+
use quinn::Connection;
1110

1211
use crate::{
1312
metrics::{
14-
observe_conn_error, observe_read_error, observe_write_error, QuicTransportMetrics,
15-
INFALIBBLE,
13+
observe_conn_error, observe_read_to_end_error, observe_stopped_error, observe_write_error,
14+
QuicTransportMetrics, INFALIBBLE,
1615
},
17-
ConnId, MessagePriority, MAX_MESSAGE_SIZE_BYTES,
16+
ConnId, MessagePriority, ResetStreamOnDrop, MAX_MESSAGE_SIZE_BYTES,
1817
};
1918

20-
/// QUIC error code for stream cancellation. See
21-
/// https://datatracker.ietf.org/doc/html/draft-ietf-quic-transport-03#section-12.3.
22-
const QUIC_STREAM_CANCELLED: VarInt = VarInt::from_u32(6);
23-
2419
static CONN_ID_SEQ: AtomicU64 = AtomicU64::new(1);
2520

26-
/// Drop guard to send a [`SendStream::reset`] frame on drop. QUINN sends a [`SendStream::finish`] frame by default when dropping a [`SendStream`],
27-
/// which can lead to the peer receiving the stream thinking a complete message was sent. This guard is used to send a reset frame instead, to signal
28-
/// that the transmission of the message was cancelled.
29-
struct SendStreamDropGuard {
30-
send_stream: SendStream,
31-
}
32-
33-
impl SendStreamDropGuard {
34-
fn new(send_stream: SendStream) -> Self {
35-
Self { send_stream }
36-
}
37-
}
38-
39-
impl Drop for SendStreamDropGuard {
40-
fn drop(&mut self) {
41-
// fails silently if the stream is already closed.
42-
let _ = self.send_stream.reset(QUIC_STREAM_CANCELLED);
43-
}
44-
}
45-
4621
#[derive(Clone, Debug)]
4722
pub struct ConnectionHandle {
4823
conn: Connection,
@@ -77,7 +52,7 @@ impl ConnectionHandle {
7752
/// is delegated solely to the transport layer. This differs from typical client-server architectures,
7853
/// where connections can be managed directly by the caller.
7954
///
80-
/// Note: This method provides the same cancellation safety guarantees as the `quinn::Connection` methods.
55+
/// Note: The method is cancel-safe.
8156
pub async fn rpc(&self, request: Request<Bytes>) -> Result<Response<Bytes>, anyhow::Error> {
8257
let _timer = self
8358
.metrics
@@ -98,7 +73,7 @@ impl ConnectionHandle {
9873
observe_conn_error(err, "open_bi", &self.metrics.connection_handle_errors_total);
9974
})?;
10075

101-
let mut send_stream_guard = SendStreamDropGuard::new(send_stream);
76+
let mut send_stream_guard = ResetStreamOnDrop::new(send_stream);
10277
let send_stream = &mut send_stream_guard.send_stream;
10378

10479
let priority = request
@@ -130,36 +105,18 @@ impl ConnectionHandle {
130105
.inc();
131106
})?;
132107

133-
send_stream.stopped().await.inspect_err(|err| match err {
134-
StoppedError::ConnectionLost(conn_err) => {
135-
observe_conn_error(
136-
conn_err,
137-
"stopped",
138-
&self.metrics.connection_handle_errors_total,
139-
);
140-
}
141-
StoppedError::ZeroRttRejected => {
142-
self.metrics
143-
.connection_handle_errors_total
144-
.with_label_values(&["stopped", INFALIBBLE])
145-
.inc();
146-
}
108+
send_stream.stopped().await.inspect_err(|err| {
109+
observe_stopped_error(err, "stopped", &self.metrics.connection_handle_errors_total)
147110
})?;
148-
149111
let response_bytes = recv_stream
150112
.read_to_end(MAX_MESSAGE_SIZE_BYTES)
151113
.await
152-
.inspect_err(|err| match err {
153-
ReadToEndError::TooLong => self
154-
.metrics
155-
.connection_handle_errors_total
156-
.with_label_values(&["read_to_end", INFALIBBLE])
157-
.inc(),
158-
ReadToEndError::Read(read_err) => observe_read_error(
159-
read_err,
114+
.inspect_err(|err| {
115+
observe_read_to_end_error(
116+
err,
160117
"read_to_end",
161118
&self.metrics.connection_handle_errors_total,
162-
),
119+
)
163120
})?;
164121

165122
let response = to_response(response_bytes)?;
@@ -169,14 +126,10 @@ impl ConnectionHandle {
169126
}
170127
}
171128

129+
// The function returns infallible error.
172130
fn to_response(response_bytes: Vec<u8>) -> Result<Response<Bytes>, anyhow::Error> {
173-
let response_proto = pb::HttpResponse::decode(response_bytes.as_slice())
174-
.with_context(|| "Failed to decode response header.")?;
175-
176-
let status: u16 = response_proto
177-
.status_code
178-
.try_into()
179-
.with_context(|| "Failed to decode status code.")?;
131+
let response_proto = pb::HttpResponse::decode(response_bytes.as_slice())?;
132+
let status: u16 = response_proto.status_code.try_into()?;
180133

181134
let mut response = Response::builder().status(status).version(Version::HTTP_3);
182135
for h in response_proto.headers {
@@ -185,9 +138,7 @@ fn to_response(response_bytes: Vec<u8>) -> Result<Response<Bytes>, anyhow::Error
185138
}
186139
// This consumes the body without requiring allocation or cloning the whole content.
187140
let body_bytes = Bytes::from(response_proto.body);
188-
response
189-
.body(body_bytes)
190-
.with_context(|| "Failed to build response.")
141+
Ok(response.body(body_bytes)?)
191142
}
192143

193144
fn into_request_bytes(request: Request<Bytes>) -> Vec<u8> {
@@ -242,11 +193,11 @@ mod tests {
242193
use tokio::sync::Barrier;
243194
use turmoil::Builder;
244195

245-
use crate::connection_handle::SendStreamDropGuard;
196+
use crate::ResetStreamOnDrop;
246197

247198
const MAX_READ_SIZE: usize = 10_000;
248199

249-
/// Test that [`SendStreamDropGuard`] sends a reset frame on drop. Also tests that
200+
/// Test that [`ResetStreamOnDrop`] sends a reset frame on drop. Also tests that
250201
/// the receiver will receive the message if the stream is finished and stopped,
251202
/// before dropping the guard.
252203
#[rstest]
@@ -341,7 +292,7 @@ mod tests {
341292
.unwrap();
342293

343294
let (send_stream, _recv_stream) = connection.open_bi().await.unwrap();
344-
let mut drop_guard = SendStreamDropGuard::new(send_stream);
295+
let mut drop_guard = ResetStreamOnDrop::new(send_stream);
345296
let send_stream = &mut drop_guard.send_stream;
346297
send_stream
347298
.write_chunk(Bytes::from(&b"hello wo"[..]))

rs/p2p/quic_transport/src/lib.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use ic_interfaces_registry::RegistryClient;
4949
use ic_logger::{info, ReplicaLogger};
5050
use ic_metrics::MetricsRegistry;
5151
use phantom_newtype::AmountOf;
52-
use quinn::AsyncUdpSocket;
52+
use quinn::{AsyncUdpSocket, SendStream, VarInt};
5353
use tokio::sync::watch;
5454
use tokio::task::{JoinError, JoinHandle};
5555
use tokio_util::{sync::CancellationToken, task::task_tracker::TaskTracker};
@@ -180,6 +180,30 @@ impl QuicTransport {
180180
}
181181
}
182182

183+
/// QUIC error code for stream cancellation. See
184+
/// https://datatracker.ietf.org/doc/html/draft-ietf-quic-transport-03#section-12.3.
185+
const QUIC_STREAM_CANCELLED: VarInt = VarInt::from_u32(6);
186+
187+
/// Drop guard to send a [`SendStream::reset`] frame on drop. QUINN sends a [`SendStream::finish`] frame by default when dropping a [`SendStream`],
188+
/// which can lead to the peer receiving the stream thinking a complete message was sent. This guard is used to send a reset frame instead, to signal
189+
/// that the transmission of the message was cancelled.
190+
struct ResetStreamOnDrop {
191+
send_stream: SendStream,
192+
}
193+
194+
impl ResetStreamOnDrop {
195+
fn new(send_stream: SendStream) -> Self {
196+
Self { send_stream }
197+
}
198+
}
199+
200+
impl Drop for ResetStreamOnDrop {
201+
fn drop(&mut self) {
202+
// fails silently if the stream is already closed.
203+
let _ = self.send_stream.reset(QUIC_STREAM_CANCELLED);
204+
}
205+
}
206+
183207
#[async_trait]
184208
impl Transport for QuicTransport {
185209
#[instrument(skip(self, request))]

rs/p2p/quic_transport/src/metrics.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use ic_metrics::{
33
buckets::decimal_buckets, tokio_metrics_collector::TokioTaskMetricsCollector, MetricsRegistry,
44
};
55
use prometheus::{GaugeVec, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec};
6-
use quinn::{Connection, ConnectionError, ReadError, WriteError};
6+
use quinn::{Connection, ConnectionError, ReadError, ReadToEndError, StoppedError, WriteError};
77
use tokio_metrics::TaskMonitor;
88

99
const CONNECTION_RESULT_LABEL: &str = "status";
@@ -15,13 +15,8 @@ const ERROR_TYPE_LABEL: &str = "error";
1515
const REQUEST_TYPE_LABEL: &str = "request";
1616
pub(crate) const CONNECTION_RESULT_SUCCESS_LABEL: &str = "success";
1717
pub(crate) const CONNECTION_RESULT_FAILED_LABEL: &str = "failed";
18-
pub(crate) const ERROR_TYPE_ACCEPT: &str = "accept";
1918
pub(crate) const ERROR_TYPE_APP: &str = "app";
20-
pub(crate) const ERROR_TYPE_FINISH: &str = "finish";
21-
pub(crate) const ERROR_TYPE_STOPPED: &str = "stopped";
22-
pub(crate) const ERROR_TYPE_READ: &str = "read";
2319
pub(crate) const INFALIBBLE: &str = "infallible";
24-
pub(crate) const ERROR_TYPE_WRITE: &str = "write";
2520
const ERROR_CLOSED_STREAM: &str = "closed_stream";
2621
const ERROR_RESET_STREAM: &str = "reset_stream";
2722
const ERROR_STOPPED_STREAM: &str = "stopped_stream";
@@ -247,3 +242,17 @@ pub fn observe_read_error(err: &ReadError, op: &str, counter: &IntCounterVec) {
247242
}
248243
}
249244
}
245+
246+
pub fn observe_stopped_error(err: &StoppedError, op: &str, counter: &IntCounterVec) {
247+
match err {
248+
StoppedError::ConnectionLost(conn_err) => observe_conn_error(conn_err, op, counter),
249+
StoppedError::ZeroRttRejected => counter.with_label_values(&[op, INFALIBBLE]).inc(),
250+
}
251+
}
252+
253+
pub fn observe_read_to_end_error(err: &ReadToEndError, op: &str, counter: &IntCounterVec) {
254+
match err {
255+
ReadToEndError::TooLong => counter.with_label_values(&[op, INFALIBBLE]).inc(),
256+
ReadToEndError::Read(read_err) => observe_read_error(read_err, op, counter),
257+
}
258+
}

rs/p2p/quic_transport/src/request_handler.rs

Lines changed: 35 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ use tracing::instrument;
2727
use crate::{
2828
connection_handle::ConnectionHandle,
2929
metrics::{
30-
QuicTransportMetrics, ERROR_TYPE_ACCEPT, ERROR_TYPE_APP, ERROR_TYPE_FINISH,
31-
ERROR_TYPE_READ, ERROR_TYPE_STOPPED, ERROR_TYPE_WRITE, STREAM_TYPE_BIDI,
30+
observe_conn_error, observe_read_to_end_error, observe_stopped_error, observe_write_error,
31+
QuicTransportMetrics, ERROR_TYPE_APP, INFALIBBLE, STREAM_TYPE_BIDI,
3232
},
33-
ConnId, MAX_MESSAGE_SIZE_BYTES,
33+
ConnId, ResetStreamOnDrop, MAX_MESSAGE_SIZE_BYTES,
3434
};
3535

3636
const QUIC_METRIC_SCRAPE_INTERVAL: Duration = Duration::from_secs(5);
@@ -70,15 +70,9 @@ pub async fn run_stream_acceptor(
7070
)
7171
);
7272
}
73-
Err(e) => {
74-
info!(log, "Error accepting bi stream {}", e.to_string());
75-
metrics
76-
.request_handle_errors_total
77-
.with_label_values(&[
78-
STREAM_TYPE_BIDI,
79-
ERROR_TYPE_ACCEPT,
80-
])
81-
.inc();
73+
Err(err) => {
74+
info!(log, "Error accepting bi stream {}", err.to_string());
75+
observe_conn_error(&err, "accept_bi", &metrics.request_handle_errors_total);
8276
break;
8377
}
8478
}
@@ -105,26 +99,30 @@ pub async fn run_stream_acceptor(
10599
inflight_requests.shutdown().await;
106100
}
107101

108-
#[instrument(skip(metrics, router, bi_tx, bi_rx))]
102+
#[instrument(skip(metrics, router, send_stream, recv_stream))]
103+
/// Note: The method is cancel-safe.
109104
async fn handle_bi_stream(
110105
peer_id: NodeId,
111106
conn_id: ConnId,
112107
metrics: QuicTransportMetrics,
113108
router: Router,
114-
mut bi_tx: SendStream,
115-
bi_rx: RecvStream,
109+
send_stream: SendStream,
110+
mut recv_stream: RecvStream,
116111
) -> Result<(), anyhow::Error> {
117-
let mut request = read_request(bi_rx).await.inspect_err(|_| {
118-
metrics
119-
.request_handle_errors_total
120-
.with_label_values(&[STREAM_TYPE_BIDI, ERROR_TYPE_READ])
121-
.inc();
122-
})?;
112+
let mut send_stream_guard = ResetStreamOnDrop::new(send_stream);
113+
let send_stream = &mut send_stream_guard.send_stream;
114+
let request_bytes = recv_stream
115+
.read_to_end(MAX_MESSAGE_SIZE_BYTES)
116+
.await
117+
.inspect_err(|err| {
118+
observe_read_to_end_error(err, "read_to_end", &metrics.request_handle_errors_total)
119+
})?;
120+
let mut request = to_request(request_bytes)?;
123121
request.extensions_mut().insert::<NodeId>(peer_id);
124122
request.extensions_mut().insert::<ConnId>(conn_id);
125123

126124
let svc = router.oneshot(request);
127-
let stopped = bi_tx.stopped();
125+
let stopped = send_stream.stopped();
128126
let response = tokio::select! {
129127
response = svc => response.expect("Infallible"),
130128
stopped_res = stopped => {
@@ -143,40 +141,29 @@ async fn handle_bi_stream(
143141
// We can ignore the errors because if both peers follow the protocol an errors will only occur
144142
// if the other peer has closed the connection. In this case `accept_bi` in the peer event
145143
// loop will close this connection.
146-
write_response(&mut bi_tx, response)
144+
let response_bytes = to_response_bytes(response).await?;
145+
send_stream
146+
.write_all(&response_bytes)
147147
.await
148-
.inspect_err(|_| {
149-
metrics
150-
.request_handle_errors_total
151-
.with_label_values(&[STREAM_TYPE_BIDI, ERROR_TYPE_WRITE])
152-
.inc();
148+
.inspect_err(|err| {
149+
observe_write_error(err, "write_all", &metrics.request_handle_errors_total);
153150
})?;
154-
bi_tx.finish().inspect_err(|_| {
151+
send_stream.finish().inspect_err(|_| {
155152
metrics
156153
.request_handle_errors_total
157-
.with_label_values(&[STREAM_TYPE_BIDI, ERROR_TYPE_FINISH])
154+
.with_label_values(&["finish", INFALIBBLE])
158155
.inc();
159156
})?;
160-
bi_tx.stopped().await.inspect_err(|_| {
161-
metrics
162-
.request_handle_errors_total
163-
.with_label_values(&[STREAM_TYPE_BIDI, ERROR_TYPE_STOPPED])
164-
.inc();
157+
send_stream.stopped().await.inspect_err(|err| {
158+
observe_stopped_error(err, "stopped", &metrics.request_handle_errors_total);
165159
})?;
166160
Ok(())
167161
}
168162

169-
async fn read_request(mut recv_stream: RecvStream) -> Result<Request<Body>, anyhow::Error> {
170-
let raw_msg = recv_stream
171-
.read_to_end(MAX_MESSAGE_SIZE_BYTES)
172-
.await
173-
.with_context(|| "Failed to read request from the stream.")?;
174-
175-
let request_proto = pb::HttpRequest::decode(raw_msg.as_slice())
176-
.with_context(|| "Failed to decode http request.")?;
177-
178-
let pb_http_method = pb::HttpMethod::try_from(request_proto.method)
179-
.with_context(|| "Failed to decode http method.")?;
163+
// The function returns infallible error.
164+
fn to_request(request_bytes: Vec<u8>) -> Result<Request<Body>, anyhow::Error> {
165+
let request_proto = pb::HttpRequest::decode(request_bytes.as_slice())?;
166+
let pb_http_method = pb::HttpMethod::try_from(request_proto.method)?;
180167
let http_method = match pb_http_method {
181168
pb::HttpMethod::Get => Some(Method::GET),
182169
pb::HttpMethod::Post => Some(Method::POST),
@@ -207,10 +194,7 @@ async fn read_request(mut recv_stream: RecvStream) -> Result<Request<Body>, anyh
207194
.with_context(|| "Failed to build request.")
208195
}
209196

210-
async fn write_response(
211-
send_stream: &mut SendStream,
212-
response: Response<Body>,
213-
) -> Result<(), anyhow::Error> {
197+
async fn to_response_bytes(response: Response<Body>) -> Result<Vec<u8>, anyhow::Error> {
214198
let (parts, body) = response.into_parts();
215199
// Check for axum error in body
216200
// TODO: Think about this. What is the error that can happen here?
@@ -231,10 +215,5 @@ async fn write_response(
231215
.collect(),
232216
body: body.into(),
233217
};
234-
235-
let response_bytes = response_proto.encode_to_vec();
236-
send_stream
237-
.write_all(&response_bytes)
238-
.await
239-
.with_context(|| "Failed to write response to stream.")
218+
Ok(response_proto.encode_to_vec())
240219
}

0 commit comments

Comments
 (0)