Skip to content

Commit

Permalink
refactor(lib): Switch from pin-project to pin-project-lite (#2566)
Browse files Browse the repository at this point in the history
Note the practical affects of this change:

- Dependency count with --features full dropped from 65 to 55.
- Time to compile after a clean dropped from 48s to 35s (on a pretty underpowered VM).

Closes #2388
  • Loading branch information
jplatte committed Jun 4, 2021
1 parent 0d82405 commit 6a6a240
Show file tree
Hide file tree
Showing 15 changed files with 416 additions and 281 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -34,7 +34,7 @@ httparse = "1.4"
h2 = { version = "0.3.3", optional = true }
itoa = "0.4.1"
tracing = { version = "0.1", default-features = false, features = ["std"] }
pin-project = "1.0"
pin-project-lite = "0.2.4"
tower-service = "0.3"
tokio = { version = "1", features = ["sync"] }
want = "0.3"
Expand Down
74 changes: 52 additions & 22 deletions src/client/conn.rs
Expand Up @@ -48,7 +48,7 @@

use std::error::Error as StdError;
use std::fmt;
#[cfg(feature = "http2")]
#[cfg(not(all(feature = "http1", feature = "http2")))]
use std::marker::PhantomData;
use std::sync::Arc;
#[cfg(all(feature = "runtime", feature = "http2"))]
Expand All @@ -57,12 +57,14 @@ use std::time::Duration;
use bytes::Bytes;
use futures_util::future::{self, Either, FutureExt as _};
use httparse::ParserConfig;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::Service;

use super::dispatch;
use crate::body::HttpBody;
#[cfg(not(all(feature = "http1", feature = "http2")))]
use crate::common::Never;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
Expand All @@ -74,17 +76,33 @@ use crate::upgrade::Upgraded;
use crate::{Body, Request, Response};

#[cfg(feature = "http1")]
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, R>;
type Http1Dispatcher<T, B> =
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;

#[pin_project(project = ProtoClientProj)]
enum ProtoClient<T, B>
where
B: HttpBody,
{
#[cfg(feature = "http1")]
H1(#[pin] Http1Dispatcher<T, B, proto::h1::ClientTransaction>),
#[cfg(feature = "http2")]
H2(#[pin] proto::h2::ClientTask<B>, PhantomData<fn(T)>),
#[cfg(not(feature = "http1"))]
type Http1Dispatcher<T, B> = (Never, PhantomData<(T, Pin<Box<B>>)>);

#[cfg(feature = "http2")]
type Http2ClientTask<B> = proto::h2::ClientTask<B>;

#[cfg(not(feature = "http2"))]
type Http2ClientTask<B> = (Never, PhantomData<Pin<Box<B>>>);

pin_project! {
#[project = ProtoClientProj]
enum ProtoClient<T, B>
where
B: HttpBody,
{
H1 {
#[pin]
h1: Http1Dispatcher<T, B>,
},
H2 {
#[pin]
h2: Http2ClientTask<B>,
},
}
}

/// Returns a handshake future over some IO.
Expand Down Expand Up @@ -405,18 +423,20 @@ where
pub fn into_parts(self) -> Parts<T> {
match self.inner.expect("already upgraded") {
#[cfg(feature = "http1")]
ProtoClient::H1(h1) => {
ProtoClient::H1 { h1 } => {
let (io, read_buf, _) = h1.into_inner();
Parts {
io,
read_buf,
_inner: (),
}
}
#[cfg(feature = "http2")]
ProtoClient::H2(..) => {
ProtoClient::H2 { .. } => {
panic!("http2 cannot into_inner");
}

#[cfg(not(feature = "http1"))]
ProtoClient::H1 { h1 } => match h1.0 {},
}
}

Expand All @@ -434,9 +454,14 @@ where
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
match *self.inner.as_mut().expect("already upgraded") {
#[cfg(feature = "http1")]
ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx),
ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
#[cfg(feature = "http2")]
ProtoClient::H2(ref mut h2, _) => Pin::new(h2).poll(cx).map_ok(|_| ()),
ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),

#[cfg(not(feature = "http1"))]
ProtoClient::H1 { ref mut h1 } => match h1.0 {},
#[cfg(not(feature = "http2"))]
ProtoClient::H2 { ref mut h2, .. } => match h2.0 {},
}
}

Expand Down Expand Up @@ -465,7 +490,7 @@ where
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
#[cfg(feature = "http1")]
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
Some(ProtoClient::H1(h1)) => {
Some(ProtoClient::H1 { h1 }) => {
let (io, buf, _) = h1.into_inner();
pending.fulfill(Upgraded::new(io, buf));
Poll::Ready(Ok(()))
Expand Down Expand Up @@ -756,14 +781,14 @@ impl Builder {
}
let cd = proto::h1::dispatch::Client::new(rx);
let dispatch = proto::h1::Dispatcher::new(cd, conn);
ProtoClient::H1(dispatch)
ProtoClient::H1 { h1: dispatch }
}
#[cfg(feature = "http2")]
Proto::Http2 => {
let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
.await?;
ProtoClient::H2(h2, PhantomData)
ProtoClient::H2 { h2 }
}
};

Expand Down Expand Up @@ -817,9 +842,14 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.project() {
#[cfg(feature = "http1")]
ProtoClientProj::H1(c) => c.poll(cx),
ProtoClientProj::H1 { h1 } => h1.poll(cx),
#[cfg(feature = "http2")]
ProtoClientProj::H2(c, _) => c.poll(cx),
ProtoClientProj::H2 { h2, .. } => h2.poll(cx),

#[cfg(not(feature = "http1"))]
ProtoClientProj::H1 { h1 } => match h1.0 {},
#[cfg(not(feature = "http2"))]
ProtoClientProj::H2 { h2, .. } => match h2.0 {},
}
}
}
Expand Down
27 changes: 14 additions & 13 deletions src/client/connect/http.rs
Expand Up @@ -11,7 +11,7 @@ use std::time::Duration;

use futures_util::future::Either;
use http::uri::{Scheme, Uri};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use tokio::net::{TcpSocket, TcpStream};
use tokio::time::Sleep;

Expand Down Expand Up @@ -373,18 +373,19 @@ impl HttpInfo {
}
}

// Not publicly exported (so missing_docs doesn't trigger).
//
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
// (and thus we can change the type in the future).
#[must_use = "futures do nothing unless polled"]
#[pin_project]
#[allow(missing_debug_implementations)]
pub struct HttpConnecting<R> {
#[pin]
fut: BoxConnecting,
_marker: PhantomData<R>,
pin_project! {
// Not publicly exported (so missing_docs doesn't trigger).
//
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
// (and thus we can change the type in the future).
#[must_use = "futures do nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct HttpConnecting<R> {
#[pin]
fut: BoxConnecting,
_marker: PhantomData<R>,
}
}

type ConnectResult = Result<TcpStream, ConnectError>;
Expand Down
25 changes: 13 additions & 12 deletions src/client/pool.rs
Expand Up @@ -11,7 +11,7 @@ use futures_channel::oneshot;
use tokio::time::{Duration, Instant, Interval};

use super::client::Ver;
use crate::common::{task, exec::Exec, Future, Pin, Poll, Unpin};
use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin};

// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
Expand Down Expand Up @@ -714,16 +714,17 @@ impl Expiration {
}

#[cfg(feature = "runtime")]
#[pin_project::pin_project]
struct IdleTask<T> {
#[pin]
interval: Interval,
pool: WeakOpt<Mutex<PoolInner<T>>>,
// This allows the IdleTask to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped.
#[pin]
pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
pin_project_lite::pin_project! {
struct IdleTask<T> {
#[pin]
interval: Interval,
pool: WeakOpt<Mutex<PoolInner<T>>>,
// This allows the IdleTask to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped.
#[pin]
pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
}
}

#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -776,7 +777,7 @@ mod tests {
use std::time::Duration;

use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
use crate::common::{task, exec::Exec, Future, Pin};
use crate::common::{exec::Exec, task, Future, Pin};

/// Test unique reservations.
#[derive(Debug, PartialEq, Eq)]
Expand Down
19 changes: 10 additions & 9 deletions src/common/drain.rs
@@ -1,6 +1,6 @@
use std::mem;

use pin_project::pin_project;
use pin_project_lite::pin_project;
use tokio::sync::watch;

use super::{task, Future, Pin, Poll};
Expand All @@ -21,14 +21,15 @@ pub(crate) struct Watch {
rx: watch::Receiver<()>,
}

#[allow(missing_debug_implementations)]
#[pin_project]
pub struct Watching<F, FN> {
#[pin]
future: F,
state: State<FN>,
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
_rx: watch::Receiver<()>,
pin_project! {
#[allow(missing_debug_implementations)]
pub struct Watching<F, FN> {
#[pin]
future: F,
state: State<FN>,
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
_rx: watch::Receiver<()>,
}
}

enum State<F> {
Expand Down
44 changes: 24 additions & 20 deletions src/common/lazy.rs
@@ -1,4 +1,4 @@
use pin_project::pin_project;
use pin_project_lite::pin_project;

use super::{task, Future, Pin, Poll};

Expand All @@ -12,23 +12,27 @@ where
R: Future + Unpin,
{
Lazy {
inner: Inner::Init(func),
inner: Inner::Init { func },
}
}

// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
#[pin_project]
pub(crate) struct Lazy<F, R> {
#[pin]
inner: Inner<F, R>,
pin_project! {
#[allow(missing_debug_implementations)]
pub(crate) struct Lazy<F, R> {
#[pin]
inner: Inner<F, R>,
}
}

#[pin_project(project = InnerProj, project_replace = InnerProjReplace)]
enum Inner<F, R> {
Init(F),
Fut(#[pin] R),
Empty,
pin_project! {
#[project = InnerProj]
#[project_replace = InnerProjReplace]
enum Inner<F, R> {
Init { func: F },
Fut { #[pin] fut: R },
Empty,
}
}

impl<F, R> Started for Lazy<F, R>
Expand All @@ -38,8 +42,8 @@ where
{
fn started(&self) -> bool {
match self.inner {
Inner::Init(_) => false,
Inner::Fut(_) | Inner::Empty => true,
Inner::Init { .. } => false,
Inner::Fut { .. } | Inner::Empty => true,
}
}
}
Expand All @@ -54,15 +58,15 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

if let InnerProj::Fut(f) = this.inner.as_mut().project() {
return f.poll(cx);
if let InnerProj::Fut { fut } = this.inner.as_mut().project() {
return fut.poll(cx);
}

match this.inner.as_mut().project_replace(Inner::Empty) {
InnerProjReplace::Init(func) => {
this.inner.set(Inner::Fut(func()));
if let InnerProj::Fut(f) = this.inner.project() {
return f.poll(cx);
InnerProjReplace::Init { func } => {
this.inner.set(Inner::Fut { fut: func() });
if let InnerProj::Fut { fut } = this.inner.project() {
return fut.poll(cx);
}
unreachable!()
}
Expand Down
11 changes: 7 additions & 4 deletions src/proto/h1/dispatch.rs
Expand Up @@ -44,10 +44,13 @@ cfg_server! {
}

cfg_client! {
pub(crate) struct Client<B> {
callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
rx: ClientRx<B>,
rx_closed: bool,
pin_project_lite::pin_project! {
pub(crate) struct Client<B> {
callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
#[pin]
rx: ClientRx<B>,
rx_closed: bool,
}
}

type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
Expand Down

1 comment on commit 6a6a240

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'end_to_end'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 6a6a240 Previous: 0d82405 Ratio
http2_parallel_x10_req_10kb_100_chunks_adaptive_window 20813983 ns/iter (± 8896738) 9026427 ns/iter (± 63123) 2.31

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.