@@ -56,10 +56,13 @@ use quinn::{
56
56
EndpointConfig , RecvStream , SendStream , VarInt ,
57
57
} ;
58
58
use socket2:: { Domain , Protocol , SockAddr , Socket , Type } ;
59
+ use thiserror:: Error ;
59
60
use tokio:: {
60
61
io:: { AsyncRead , AsyncWrite } ,
61
62
runtime:: Handle ,
62
63
select,
64
+ sync:: mpsc:: { channel, Receiver , Sender } ,
65
+ sync:: oneshot,
63
66
task:: JoinSet ,
64
67
} ;
65
68
use tokio_util:: time:: DelayQueue ;
@@ -122,6 +125,8 @@ struct ConnectionManager {
122
125
/// JoinMap that stores active connection handlers keyed by peer id.
123
126
active_connections : JoinMap < NodeId , ( ) > ,
124
127
128
+ /// The channel is used for graceful shutdown of the connection manager.
129
+ shutdown_rx : Receiver < oneshot:: Sender < ( ) > > ,
125
130
/// Endpoint config
126
131
endpoint : Endpoint ,
127
132
transport_config : Arc < quinn:: TransportConfig > ,
@@ -188,6 +193,24 @@ impl std::fmt::Display for ConnectionEstablishError {
188
193
}
189
194
}
190
195
196
+ #[ derive( Error , Debug ) ]
197
+ #[ error( "ShutdownError" ) ]
198
+ pub ( crate ) struct ShutdownError ;
199
+ pub ( crate ) struct ShutdownHandle ( Sender < oneshot:: Sender < ( ) > > ) ;
200
+
201
+ impl ShutdownHandle {
202
+ fn new ( ) -> ( Self , Receiver < oneshot:: Sender < ( ) > > ) {
203
+ let ( shutdown_tx, shutdown_rx) = channel ( 10 ) ;
204
+ ( ShutdownHandle ( shutdown_tx) , shutdown_rx)
205
+ }
206
+
207
+ pub ( crate ) async fn shutdown ( & mut self ) -> Result < ( ) , ShutdownError > {
208
+ let ( wait_tx, wait_rx) = oneshot:: channel ( ) ;
209
+ self . 0 . send ( wait_tx) . await . map_err ( |_| ShutdownError ) ?;
210
+ wait_rx. await . map_err ( |_| ShutdownError )
211
+ }
212
+ }
213
+
191
214
pub ( crate ) fn start_connection_manager (
192
215
log : & ReplicaLogger ,
193
216
metrics_registry : & MetricsRegistry ,
@@ -200,7 +223,7 @@ pub(crate) fn start_connection_manager(
200
223
watcher : tokio:: sync:: watch:: Receiver < SubnetTopology > ,
201
224
socket : Either < SocketAddr , impl AsyncUdpSocket > ,
202
225
router : Router ,
203
- ) {
226
+ ) -> ShutdownHandle {
204
227
let topology = watcher. borrow ( ) . clone ( ) ;
205
228
206
229
let metrics = QuicTransportMetrics :: new ( metrics_registry) ;
@@ -294,6 +317,8 @@ pub(crate) fn start_connection_manager(
294
317
. expect ( "Failed to create endpoint" ) ,
295
318
} ;
296
319
320
+ let ( shutdown_handler, shutdown_rx) = ShutdownHandle :: new ( ) ;
321
+
297
322
let manager = ConnectionManager {
298
323
log : log. clone ( ) ,
299
324
rt : rt. clone ( ) ,
@@ -311,10 +336,12 @@ pub(crate) fn start_connection_manager(
311
336
outbound_connecting : JoinMap :: new ( ) ,
312
337
inbound_connecting : JoinSet :: new ( ) ,
313
338
active_connections : JoinMap :: new ( ) ,
339
+ shutdown_rx,
314
340
router,
315
341
} ;
316
342
317
343
rt. spawn ( manager. run ( ) ) ;
344
+ shutdown_handler
318
345
}
319
346
320
347
impl ConnectionManager {
@@ -323,8 +350,11 @@ impl ConnectionManager {
323
350
}
324
351
325
352
pub async fn run ( mut self ) {
326
- loop {
353
+ let shutdown_notifier = loop {
327
354
select ! {
355
+ shutdown_notifier = self . shutdown_rx. recv( ) => {
356
+ break shutdown_notifier;
357
+ }
328
358
Some ( reconnect) = self . connect_queue. next( ) => {
329
359
self . handle_dial( reconnect. into_inner( ) )
330
360
}
@@ -334,18 +364,23 @@ impl ConnectionManager {
334
364
self . handle_topology_change( ) ;
335
365
} ,
336
366
Err ( _) => {
337
- error!( self . log, "Transport disconnected from peer manager. Shutting down." ) ;
338
- break
367
+ // If this happens it means that peer discovery is not working anymore.
368
+ // There are few options in this case
369
+ // 1. continue working with the existing set of connections (preferred)
370
+ // 2. panic
371
+ // 3. do a graceful shutdown and rely on clients of transport to handle this fallout
372
+ // (least preferred because this is not recoverable error)
373
+ error!( self . log, "Transport disconnected from peer manager." ) ;
339
374
}
340
375
}
341
376
} ,
342
377
connecting = self . endpoint. accept( ) => {
343
378
if let Some ( connecting) = connecting {
344
379
self . handle_inbound( connecting) ;
345
380
} else {
346
- info !( self . log, "Quic endpoint closed. Stopping transport." ) ;
347
- // Endpoint is closed. This indicates a shutdown.
348
- break ;
381
+ error !( self . log, "Quic endpoint closed. Stopping transport." ) ;
382
+ // Endpoint is closed. This indicates NOT graceful shutdown.
383
+ break None ;
349
384
}
350
385
} ,
351
386
Some ( conn_res) = self . outbound_connecting. join_next( ) => {
@@ -392,13 +427,23 @@ impl ConnectionManager {
392
427
self . metrics
393
428
. delay_queue_size
394
429
. set ( self . connect_queue . len ( ) as i64 ) ;
395
- }
396
- // This point is reached only in two cases - replica gracefully shutting down or
397
- // bug which makes the peer manager unavailable.
398
- // If the peer manager is unavailable, the replica needs must exist that's why
399
- // the endpoint is closed proactively.
400
- self . endpoint . close ( 0u8 . into ( ) , b"shutting down" ) ;
430
+ } ;
431
+ self . shutdown ( ) . await ;
432
+ // The send can fail iff the shutdown handler was dropped already.
433
+ let _ = shutdown_notifier
434
+ . expect ( "Transport 't stop unexpectedly. This is serious unrecoverable bug. It is better to crash." )
435
+ . send ( ( ) ) ;
436
+ }
401
437
438
+ // TODO: maybe unbind the port so we can start another transport on the same port after shutdown.
439
+ async fn shutdown ( mut self ) {
440
+ self . peer_map . write ( ) . unwrap ( ) . clear ( ) ;
441
+ self . endpoint
442
+ . close ( VarInt :: from_u32 ( 0 ) , b"graceful shutdown of endpoint" ) ;
443
+ self . connect_queue . clear ( ) ;
444
+ self . inbound_connecting . shutdown ( ) . await ;
445
+ self . outbound_connecting . shutdown ( ) . await ;
446
+ self . active_connections . shutdown ( ) . await ;
402
447
self . endpoint . wait_idle ( ) . await ;
403
448
}
404
449
0 commit comments