Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions benches/support/tokiort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl Timer for TokioTimer {
})
}

fn now(&self) -> Instant {
tokio::time::Instant::now().into()
}

fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
sleep.reset(new_deadline)
Expand Down
7 changes: 7 additions & 0 deletions src/common/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ impl Time {
}
}

pub(crate) fn now(&self) -> Instant {
match *self {
Time::Empty => Instant::now(),
Time::Timer(ref t) => t.now(),
}
}

pub(crate) fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
match *self {
Time::Empty => {
Expand Down
4 changes: 2 additions & 2 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::marker::{PhantomData, Unpin};
use std::pin::Pin;
use std::task::{Context, Poll};
#[cfg(feature = "server")]
use std::time::{Duration, Instant};
use std::time::Duration;

use crate::rt::{Read, Write};
use bytes::{Buf, Bytes};
Expand Down Expand Up @@ -218,7 +218,7 @@ where
#[cfg(feature = "server")]
if !self.state.h1_header_read_timeout_running {
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
let deadline = Instant::now() + h1_header_read_timeout;
let deadline = self.state.timer.now() + h1_header_read_timeout;
self.state.h1_header_read_timeout_running = true;
match self.state.h1_header_read_timeout_fut {
Some(ref mut h1_header_read_timeout_fut) => {
Expand Down
24 changes: 14 additions & 10 deletions src/proto/h2/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub(super) fn disabled() -> Recorder {
Recorder { shared: None }
}

pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) {
pub(super) fn channel(ping_pong: PingPong, config: Config, timer: Time) -> (Recorder, Ponger) {
debug_assert!(
config.is_enabled(),
"ping channel requires bdp or keep-alive config",
Expand All @@ -51,8 +51,10 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
stable_count: 0,
});

let now = timer.now();

let (bytes, next_bdp_at) = if bdp.is_some() {
(Some(0), Some(Instant::now()))
(Some(0), Some(now))
} else {
(None, None)
};
Expand All @@ -61,12 +63,12 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
interval,
timeout: config.keep_alive_timeout,
while_idle: config.keep_alive_while_idle,
sleep: __timer.sleep(interval),
sleep: timer.sleep(interval),
state: KeepAliveState::Init,
timer: __timer,
timer: timer.clone(),
});

let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
let last_read_at = keep_alive.as_ref().map(|_| now);

let shared = Arc::new(Mutex::new(Shared {
bytes,
Expand All @@ -75,6 +77,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
ping_pong,
ping_sent_at: None,
next_bdp_at,
timer,
}));

(
Expand Down Expand Up @@ -130,6 +133,7 @@ struct Shared {
last_read_at: Option<Instant>,

is_keep_alive_timed_out: bool,
timer: Time,
}

struct Bdp {
Expand Down Expand Up @@ -200,7 +204,7 @@ impl Recorder {
// if not, we don't need to record bytes either

if let Some(ref next_bdp_at) = locked.next_bdp_at {
if Instant::now() < *next_bdp_at {
if locked.timer.now() < *next_bdp_at {
return;
} else {
locked.next_bdp_at = None;
Expand Down Expand Up @@ -259,8 +263,8 @@ impl Recorder {

impl Ponger {
pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
let now = Instant::now();
let mut locked = self.shared.lock().unwrap();
let now = locked.timer.now(); // hoping this is fine to move within the lock
let is_idle = self.is_idle();

if let Some(ref mut ka) = self.keep_alive {
Expand Down Expand Up @@ -329,7 +333,7 @@ impl Shared {
fn send_ping(&mut self) {
match self.ping_pong.send_ping(Ping::opaque()) {
Ok(()) => {
self.ping_sent_at = Some(Instant::now());
self.ping_sent_at = Some(self.timer.now());
trace!("sent ping");
}
Err(_err) => {
Expand All @@ -344,7 +348,7 @@ impl Shared {

fn update_last_read_at(&mut self) {
if self.last_read_at.is_some() {
self.last_read_at = Some(Instant::now());
self.last_read_at = Some(self.timer.now());
}
}

Expand Down Expand Up @@ -468,7 +472,7 @@ impl KeepAlive {
trace!("keep-alive interval ({:?}) reached", self.interval);
shared.send_ping();
self.state = KeepAliveState::PingSent;
let timeout = Instant::now() + self.timeout;
let timeout = self.timer.now() + self.timeout;
self.timer.reset(&mut self.sleep, timeout);
}
KeepAliveState::Init | KeepAliveState::PingSent => (),
Expand Down
10 changes: 10 additions & 0 deletions src/rt/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ pub trait Timer {
/// Return a future that resolves at `deadline`.
fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>>;

/// Return an `Instant` representing the current time
///
/// This function is currently unstable to implement - its
/// definition might change in a future version of Hyper
///
/// FIXME: do we want to mention this function in the module doc comment?
fn now(&self) -> Instant {
Instant::now()
}

/// Reset a future to resolve at `new_deadline` instead.
fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
*sleep = self.sleep_until(new_deadline);
Expand Down