Skip to content

Commit

Permalink
Use Instant (monotonic) time instead of SystemTime (#422)
Browse files Browse the repository at this point in the history
This continues on the #367 pull request.

Tarpc should use relative time, especially since RPC serialization no
longer uses SystemTime but Duration.

With this commit, tarpc uses Instant time and on the edges converts
Instant time to SystemTime for the Opentelemetry spans.

Signed-off-by: Mislav Novakovic <mislav.novakovic@gmail.com>
  • Loading branch information
mislavn committed Feb 1, 2024
1 parent d91a9f8 commit 9ed8696
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 51 deletions.
7 changes: 5 additions & 2 deletions tarpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pub mod stub;

use crate::{
cancellations::{cancellations, CanceledRequests, RequestCancellation},
context, trace, ChannelError, ClientMessage, Request, Response, ServerError, Transport,
context, trace,
util::TimeUntil,
ChannelError, ClientMessage, Request, Response, ServerError, Transport,
};
use futures::{prelude::*, ready, stream::Fuse, task::*};
use in_flight_requests::InFlightRequests;
Expand All @@ -24,6 +26,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::SystemTime,
};
use tokio::sync::{mpsc, oneshot};
use tracing::Span;
Expand Down Expand Up @@ -117,7 +120,7 @@ impl<Req, Resp> Channel<Req, Resp> {
skip(self, ctx, request_name, request),
fields(
rpc.trace_id = tracing::field::Empty,
rpc.deadline = %humantime::format_rfc3339(ctx.deadline),
rpc.deadline = %humantime::format_rfc3339(SystemTime::now() + ctx.deadline.time_until()),
otel.kind = "client",
otel.name = request_name)
)]
Expand Down
28 changes: 13 additions & 15 deletions tarpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use opentelemetry::trace::TraceContextExt;
use static_assertions::assert_impl_all;
use std::{
convert::TryFrom,
time::{Duration, SystemTime},
time::{Duration, Instant},
};
use tracing_opentelemetry::OpenTelemetrySpanExt;

Expand All @@ -30,7 +30,7 @@ pub struct Context {
#[cfg_attr(feature = "serde1", serde(default = "ten_seconds_from_now"))]
// Serialized as a Duration to prevent clock skew issues.
#[cfg_attr(feature = "serde1", serde(with = "absolute_to_relative_time"))]
pub deadline: SystemTime,
pub deadline: Instant,
/// Uniquely identifies requests originating from the same source.
/// When a service handles a request by making requests itself, those requests should
/// include the same `trace_id` as that included on the original request. This way,
Expand All @@ -41,33 +41,31 @@ pub struct Context {
#[cfg(feature = "serde1")]
mod absolute_to_relative_time {
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub use std::time::{Duration, SystemTime};
pub use std::time::{Duration, Instant};

pub fn serialize<S>(deadline: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
pub fn serialize<S>(deadline: &Instant, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let deadline = deadline
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO);
let deadline = deadline.duration_since(Instant::now());
deadline.serialize(serializer)
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
where
D: Deserializer<'de>,
{
let deadline = Duration::deserialize(deserializer)?;
Ok(SystemTime::now() + deadline)
Ok(Instant::now() + deadline)
}

#[cfg(test)]
#[derive(serde::Serialize, serde::Deserialize)]
struct AbsoluteToRelative(#[serde(with = "self")] SystemTime);
struct AbsoluteToRelative(#[serde(with = "self")] Instant);

#[test]
fn test_serialize() {
let now = SystemTime::now();
let now = Instant::now();
let deadline = now + Duration::from_secs(10);
let serialized_deadline = bincode::serialize(&AbsoluteToRelative(deadline)).unwrap();
let deserialized_deadline: Duration = bincode::deserialize(&serialized_deadline).unwrap();
Expand All @@ -82,14 +80,14 @@ mod absolute_to_relative_time {
let AbsoluteToRelative(deserialized_deadline) =
bincode::deserialize(&serialized_deadline).unwrap();
// TODO: how to avoid flakiness?
assert!(deserialized_deadline > SystemTime::now() + Duration::from_secs(9));
assert!(deserialized_deadline > Instant::now() + Duration::from_secs(9));
}
}

assert_impl_all!(Context: Send, Sync);

fn ten_seconds_from_now() -> SystemTime {
SystemTime::now() + Duration::from_secs(10)
fn ten_seconds_from_now() -> Instant {
Instant::now() + Duration::from_secs(10)
}

/// Returns the context for the current request, or a default Context if no request is active.
Expand All @@ -98,7 +96,7 @@ pub fn current() -> Context {
}

#[derive(Clone)]
struct Deadline(SystemTime);
struct Deadline(Instant);

impl Default for Deadline {
fn default() -> Self {
Expand Down
4 changes: 2 additions & 2 deletions tarpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub use crate::transport::sealed::Transport;
use anyhow::Context as _;
use futures::task::*;
use std::sync::Arc;
use std::{error::Error, fmt::Display, io, time::SystemTime};
use std::{error::Error, fmt::Display, io, time::Instant};

/// A message from a client to a server.
#[derive(Debug)]
Expand Down Expand Up @@ -360,7 +360,7 @@ impl ServerError {

impl<T> Request<T> {
/// Returns the deadline for this request.
pub fn deadline(&self) -> &SystemTime {
pub fn deadline(&self) -> &Instant {
&self.context.deadline
}
}
Expand Down
18 changes: 11 additions & 7 deletions tarpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
use crate::{
cancellations::{cancellations, CanceledRequests, RequestCancellation},
context::{self, SpanExt},
trace, ChannelError, ClientMessage, Request, Response, ServerError, Transport,
trace,
util::TimeUntil,
ChannelError, ClientMessage, Request, Response, ServerError, Transport,
};
use ::tokio::sync::mpsc;
use futures::{
Expand All @@ -21,7 +23,9 @@ use futures::{
};
use in_flight_requests::{AlreadyExistsError, InFlightRequests};
use pin_project::pin_project;
use std::{convert::TryFrom, error::Error, fmt, marker::PhantomData, pin::Pin, sync::Arc};
use std::{
convert::TryFrom, error::Error, fmt, marker::PhantomData, pin::Pin, sync::Arc, time::SystemTime,
};
use tracing::{info_span, instrument::Instrument, Span};

mod in_flight_requests;
Expand Down Expand Up @@ -348,7 +352,7 @@ where
let span = info_span!(
"RPC",
rpc.trace_id = %request.context.trace_id(),
rpc.deadline = %humantime::format_rfc3339(request.context.deadline),
rpc.deadline = %humantime::format_rfc3339(SystemTime::now() + request.context.deadline.time_until()),
otel.kind = "server",
otel.name = tracing::field::Empty,
);
Expand Down Expand Up @@ -1116,7 +1120,7 @@ mod tests {
io,
pin::Pin,
task::Poll,
time::{Duration, Instant, SystemTime},
time::{Duration, Instant},
};

fn test_channel<Req, Resp>() -> (
Expand Down Expand Up @@ -1186,7 +1190,7 @@ mod tests {

#[tokio::test]
async fn serve_before_mutates_context() -> anyhow::Result<()> {
struct SetDeadline(SystemTime);
struct SetDeadline(Instant);
impl<Req> BeforeRequest<Req> for SetDeadline {
async fn before(
&mut self,
Expand All @@ -1198,8 +1202,8 @@ mod tests {
}
}

let some_time = SystemTime::UNIX_EPOCH + Duration::from_secs(37);
let some_other_time = SystemTime::UNIX_EPOCH + Duration::from_secs(83);
let some_time = Instant::now() + Duration::from_secs(37);
let some_other_time = Instant::now() + Duration::from_secs(83);

let serve = serve(move |ctx: context::Context, i| async move {
assert_eq!(ctx.deadline, some_time);
Expand Down
12 changes: 6 additions & 6 deletions tarpc/src/server/in_flight_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::future::{AbortHandle, AbortRegistration};
use std::{
collections::hash_map,
task::{Context, Poll},
time::SystemTime,
time::Instant,
};
use tokio_util::time::delay_queue::{self, DelayQueue};
use tracing::Span;
Expand Down Expand Up @@ -43,7 +43,7 @@ impl InFlightRequests {
pub fn start_request(
&mut self,
request_id: u64,
deadline: SystemTime,
deadline: Instant,
span: Span,
) -> Result<AbortRegistration, AlreadyExistsError> {
match self.request_data.entry(request_id) {
Expand Down Expand Up @@ -141,7 +141,7 @@ mod tests {
let mut in_flight_requests = InFlightRequests::default();
assert_eq!(in_flight_requests.len(), 0);
in_flight_requests
.start_request(0, SystemTime::now(), Span::current())
.start_request(0, Instant::now(), Span::current())
.unwrap();
assert_eq!(in_flight_requests.len(), 1);
}
Expand All @@ -150,7 +150,7 @@ mod tests {
async fn polling_expired_aborts() {
let mut in_flight_requests = InFlightRequests::default();
let abort_registration = in_flight_requests
.start_request(0, SystemTime::now(), Span::current())
.start_request(0, Instant::now(), Span::current())
.unwrap();
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));

Expand All @@ -172,7 +172,7 @@ mod tests {
async fn cancel_request_aborts() {
let mut in_flight_requests = InFlightRequests::default();
let abort_registration = in_flight_requests
.start_request(0, SystemTime::now(), Span::current())
.start_request(0, Instant::now(), Span::current())
.unwrap();
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));

Expand All @@ -192,7 +192,7 @@ mod tests {
let abort_registration = in_flight_requests
.start_request(
0,
SystemTime::now() + std::time::Duration::from_secs(10),
Instant::now() + std::time::Duration::from_secs(10),
Span::current(),
)
.unwrap();
Expand Down
14 changes: 3 additions & 11 deletions tarpc/src/server/limits/requests_per_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod tests {
use pin_utils::pin_mut;
use std::{
marker::PhantomData,
time::{Duration, SystemTime},
time::{Duration, Instant},
};
use tracing::Span;

Expand All @@ -201,11 +201,7 @@ mod tests {
throttler
.inner
.in_flight_requests
.start_request(
i,
SystemTime::now() + Duration::from_secs(1),
Span::current(),
)
.start_request(i, Instant::now() + Duration::from_secs(1), Span::current())
.unwrap();
}
assert_eq!(throttler.as_mut().in_flight_requests(), 5);
Expand Down Expand Up @@ -324,11 +320,7 @@ mod tests {
throttler
.inner
.in_flight_requests
.start_request(
0,
SystemTime::now() + Duration::from_secs(1),
Span::current(),
)
.start_request(0, Instant::now() + Duration::from_secs(1), Span::current())
.unwrap();
throttler
.as_mut()
Expand Down
4 changes: 2 additions & 2 deletions tarpc/src/server/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
};
use futures::{task::*, Sink, Stream};
use pin_project::pin_project;
use std::{collections::VecDeque, io, pin::Pin, time::SystemTime};
use std::{collections::VecDeque, io, pin::Pin, time::Instant};
use tracing::Span;

#[pin_project]
Expand Down Expand Up @@ -93,7 +93,7 @@ impl<Req, Resp> FakeChannel<io::Result<TrackedRequest<Req>>, Response<Resp>> {
self.stream.push_back(Ok(TrackedRequest {
request: Request {
context: context::Context {
deadline: SystemTime::UNIX_EPOCH,
deadline: Instant::now(),
trace_context: Default::default(),
},
id,
Expand Down
8 changes: 4 additions & 4 deletions tarpc/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@
use std::{
collections::HashMap,
hash::{BuildHasher, Hash},
time::{Duration, SystemTime},
time::{Duration, Instant},
};

#[cfg(feature = "serde1")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde1")))]
pub mod serde;

/// Extension trait for [SystemTimes](SystemTime) in the future, i.e. deadlines.
/// Extension trait for [Instants](Instant) in the future, i.e. deadlines.
pub trait TimeUntil {
/// How much time from now until this time is reached.
fn time_until(&self) -> Duration;
}

impl TimeUntil for SystemTime {
impl TimeUntil for Instant {
fn time_until(&self) -> Duration {
self.duration_since(SystemTime::now()).unwrap_or_default()
self.duration_since(Instant::now())
}
}

Expand Down
4 changes: 2 additions & 2 deletions tarpc/tests/service_functional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::{
future::{join_all, ready},
prelude::*,
};
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant};
use tarpc::{
client::{self},
context,
Expand Down Expand Up @@ -78,7 +78,7 @@ async fn dropped_channel_aborts_in_flight_requests() -> anyhow::Result<()> {
let client = LoopClient::new(client::Config::default(), tx).spawn();

let mut ctx = context::current();
ctx.deadline = SystemTime::now() + Duration::from_secs(60 * 60);
ctx.deadline = Instant::now() + Duration::from_secs(60 * 60);
let _ = client.r#loop(ctx).await;
});

Expand Down

0 comments on commit 9ed8696

Please sign in to comment.