diff --git a/benches/support/tokiort.rs b/benches/support/tokiort.rs index 8ab7aa9502..24e9557830 100644 --- a/benches/support/tokiort.rs +++ b/benches/support/tokiort.rs @@ -42,6 +42,10 @@ impl Timer for TokioTimer { }) } + fn now(&self) -> Instant { + tokio::time::Instant::now().into() + } + fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { sleep.reset(new_deadline) diff --git a/src/common/time.rs b/src/common/time.rs index 8055cb364b..ad88cb3cde 100644 --- a/src/common/time.rs +++ b/src/common/time.rs @@ -50,6 +50,13 @@ impl Time { } } + pub(crate) fn now(&self) -> Instant { + match *self { + Time::Empty => Instant::now(), + Time::Timer(ref t) => t.now(), + } + } + pub(crate) fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { match *self { Time::Empty => { diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 7c168e005e..3d71ed5bc5 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -6,7 +6,7 @@ use std::marker::{PhantomData, Unpin}; use std::pin::Pin; use std::task::{Context, Poll}; #[cfg(feature = "server")] -use std::time::{Duration, Instant}; +use std::time::Duration; use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; @@ -218,7 +218,7 @@ where #[cfg(feature = "server")] if !self.state.h1_header_read_timeout_running { if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout { - let deadline = Instant::now() + h1_header_read_timeout; + let deadline = self.state.timer.now() + h1_header_read_timeout; self.state.h1_header_read_timeout_running = true; match self.state.h1_header_read_timeout_fut { Some(ref mut h1_header_read_timeout_fut) => { diff --git a/src/proto/h2/ping.rs b/src/proto/h2/ping.rs index b5ebd08bda..4952e38518 100644 --- a/src/proto/h2/ping.rs +++ b/src/proto/h2/ping.rs @@ -37,7 +37,7 @@ pub(super) fn disabled() -> Recorder { Recorder { shared: None } } -pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) { +pub(super) fn channel(ping_pong: PingPong, config: Config, timer: Time) -> (Recorder, Ponger) { debug_assert!( config.is_enabled(), "ping channel requires bdp or keep-alive config", @@ -51,8 +51,10 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re stable_count: 0, }); + let now = timer.now(); + let (bytes, next_bdp_at) = if bdp.is_some() { - (Some(0), Some(Instant::now())) + (Some(0), Some(now)) } else { (None, None) }; @@ -61,12 +63,12 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re interval, timeout: config.keep_alive_timeout, while_idle: config.keep_alive_while_idle, - sleep: __timer.sleep(interval), + sleep: timer.sleep(interval), state: KeepAliveState::Init, - timer: __timer, + timer: timer.clone(), }); - let last_read_at = keep_alive.as_ref().map(|_| Instant::now()); + let last_read_at = keep_alive.as_ref().map(|_| now); let shared = Arc::new(Mutex::new(Shared { bytes, @@ -75,6 +77,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re ping_pong, ping_sent_at: None, next_bdp_at, + timer, })); ( @@ -130,6 +133,7 @@ struct Shared { last_read_at: Option, is_keep_alive_timed_out: bool, + timer: Time, } struct Bdp { @@ -200,7 +204,7 @@ impl Recorder { // if not, we don't need to record bytes either if let Some(ref next_bdp_at) = locked.next_bdp_at { - if Instant::now() < *next_bdp_at { + if locked.timer.now() < *next_bdp_at { return; } else { locked.next_bdp_at = None; @@ -259,8 +263,8 @@ impl Recorder { impl Ponger { pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll { - let now = Instant::now(); let mut locked = self.shared.lock().unwrap(); + let now = locked.timer.now(); // hoping this is fine to move within the lock let is_idle = self.is_idle(); if let Some(ref mut ka) = self.keep_alive { @@ -329,7 +333,7 @@ impl Shared { fn send_ping(&mut self) { match self.ping_pong.send_ping(Ping::opaque()) { Ok(()) => { - self.ping_sent_at = Some(Instant::now()); + self.ping_sent_at = Some(self.timer.now()); trace!("sent ping"); } Err(_err) => { @@ -344,7 +348,7 @@ impl Shared { fn update_last_read_at(&mut self) { if self.last_read_at.is_some() { - self.last_read_at = Some(Instant::now()); + self.last_read_at = Some(self.timer.now()); } } @@ -468,7 +472,7 @@ impl KeepAlive { trace!("keep-alive interval ({:?}) reached", self.interval); shared.send_ping(); self.state = KeepAliveState::PingSent; - let timeout = Instant::now() + self.timeout; + let timeout = self.timer.now() + self.timeout; self.timer.reset(&mut self.sleep, timeout); } KeepAliveState::Init | KeepAliveState::PingSent => (), diff --git a/src/rt/timer.rs b/src/rt/timer.rs index c6a6f1dbc0..b54fde5426 100644 --- a/src/rt/timer.rs +++ b/src/rt/timer.rs @@ -74,6 +74,16 @@ pub trait Timer { /// Return a future that resolves at `deadline`. fn sleep_until(&self, deadline: Instant) -> Pin>; + /// Return an `Instant` representing the current time + /// + /// This function is currently unstable to implement - its + /// definition might change in a future version of Hyper + /// + /// FIXME: do we want to mention this function in the module doc comment? + fn now(&self) -> Instant { + Instant::now() + } + /// Reset a future to resolve at `new_deadline` instead. fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { *sleep = self.sleep_until(new_deadline);