Skip to content

Commit

Permalink
[async-client] unblock the async delegate in the cheapest possible way…
Browse files Browse the repository at this point in the history
…which seems fair as each of its delgate calls must be considered
blocking in most cases.
  • Loading branch information
Byron committed Jun 9, 2021
1 parent 2ba452f commit a3b5d75
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 32 deletions.
5 changes: 5 additions & 0 deletions git-packetline/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ impl<T> StreamingPeekableIter<T> {
self.fail_on_err_lines = false;
prev
}

/// Return the inner read
pub fn into_inner(self) -> T {
self.read
}
}

#[cfg(feature = "blocking-io")]
Expand Down
36 changes: 29 additions & 7 deletions git-protocol/src/fetch/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,37 @@ use std::io;
///
/// _Note_ that depending on the `delegate`, the actual action performed can be `ls-refs`, `clone` or `fetch`.
#[maybe_async]
pub async fn fetch<F, D>(
mut transport: impl client::Transport,
pub async fn fetch<F, D, T>(
transport: T,
delegate: D,
authenticate: F,
progress: impl Progress + Send + 'static,
) -> Result<(D, T), Error>
where
F: FnMut(credentials::Action<'_>) -> credentials::Result + Send + 'static,
D: Delegate + Send + 'static,
T: client::Transport + Send + 'static,
{
#[cfg(feature = "blocking-client")]
return fetch_inner(transport, delegate, authenticate, progress);
#[cfg(feature = "async-client")]
return blocking::unblock(move || {
futures_lite::future::block_on(fetch_inner(transport, delegate, authenticate, progress))
})
.await;
}

#[maybe_async]
async fn fetch_inner<F, D, T>(
mut transport: T,
mut delegate: D,
mut authenticate: F,
mut progress: impl Progress,
) -> Result<D, Error>
mut progress: impl Progress + Send + 'static,
) -> Result<(D, T), Error>
where
F: FnMut(credentials::Action<'_>) -> credentials::Result,
F: FnMut(credentials::Action<'_>) -> credentials::Result + Send + 'static,
D: Delegate + Send + 'static,
T: client::Transport + Send + 'static,
{
let (protocol_version, mut parsed_refs, capabilities, call_ls_refs) = {
progress.init(None, progress::steps());
Expand Down Expand Up @@ -135,7 +157,7 @@ where

if next == Action::Close {
transport.close().await?;
return Ok(delegate);
return Ok((delegate, transport));
}

Response::check_required_features(protocol_version, &fetch_features)?;
Expand Down Expand Up @@ -168,7 +190,7 @@ where
}
}
}
Ok(delegate)
Ok((delegate, transport))
}

fn setup_remote_progress(
Expand Down
24 changes: 21 additions & 3 deletions git-protocol/tests/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,29 @@ pub fn oid(hex_sha: &str) -> git_hash::ObjectId {
git_hash::ObjectId::from_hex(hex_sha.as_bytes()).expect("valid input")
}

pub fn transport<'a>(
out: &'a mut Vec<u8>,
#[cfg(feature = "async-client")]
pub fn transport<'a, W: futures_io::AsyncWrite + Unpin>(
out: W,
path: &str,
version: git_transport::Protocol,
) -> git_transport::client::git::Connection<Cursor, W> {
let response = fixture_bytes(path);
git_transport::client::git::Connection::new(
Cursor::new(response),
out,
version,
b"does/not/matter".as_bstr().to_owned(),
None::<(&str, _)>,
git_transport::client::git::ConnectMode::Process,
)
}

#[cfg(feature = "blocking-client")]
pub fn transport<'a, W: std::io::Write>(
out: W,
path: &str,
version: git_transport::Protocol,
) -> git_transport::client::git::Connection<Cursor, &'a mut Vec<u8>> {
) -> git_transport::client::git::Connection<Cursor, W> {
let response = fixture_bytes(path);
git_transport::client::git::Connection::new(
Cursor::new(response),
Expand Down
15 changes: 8 additions & 7 deletions git-protocol/tests/fetch/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,26 @@ use git_transport::Protocol;

#[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))]
async fn clone() -> crate::Result {
let mut out = Vec::new();
let out = Vec::new();
let dlg = CloneDelegate::default();
let dlg = git_protocol::fetch(
transport(&mut out, "v1/clone.response", Protocol::V1),
transport(out, "v1/clone.response", Protocol::V1),
dlg,
git_protocol::credentials::helper,
progress::Discard,
)
.await?;
.await?
.0;
assert_eq!(dlg.pack_bytes, 876, "It be able to read pack bytes");
Ok(())
}

#[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))]
async fn ls_remote() -> crate::Result {
let mut out = Vec::new();
let out = Vec::new();
let delegate = LsRemoteDelegate::default();
let delegate = git_protocol::fetch(
transport(&mut out, "v1/clone.response", Protocol::V1),
let (delegate, out) = git_protocol::fetch(
transport(out, "v1/clone.response", Protocol::V1),
delegate,
git_protocol::credentials::helper,
progress::Discard,
Expand All @@ -46,7 +47,7 @@ async fn ls_remote() -> crate::Result {
]
);
assert_eq!(
out.as_bstr(),
out.into_inner().1.as_bstr(),
b"0000".as_bstr(),
"we dont have to send anything in V1, except for the final flush byte to indicate we are done"
);
Expand Down
8 changes: 4 additions & 4 deletions git-protocol/tests/fetch/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use git_transport::Protocol;

#[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))]
async fn ls_remote() -> crate::Result {
let mut out = Vec::new();
let out = Vec::new();
let delegate = LsRemoteDelegate::default();
let delegate = git_protocol::fetch(
transport(&mut out, "v2/clone.response", Protocol::V2),
let (delegate, out) = git_protocol::fetch(
transport(out, "v2/clone.response", Protocol::V2),
delegate,
git_protocol::credentials::helper,
progress::Discard,
Expand All @@ -31,7 +31,7 @@ async fn ls_remote() -> crate::Result {
]
);
assert_eq!(
out.as_bstr(),
out.into_inner().1.as_bstr(),
format!(
"0014command=ls-refs
001aagent={}
Expand Down
2 changes: 1 addition & 1 deletion git-transport/src/client/blocking_io/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use crate::client::non_io_types::connect::Error;
/// and if compiled in connections to [git repositories over https][crate::client::http::connect()].
///
/// Use `desired_version` to set the desired protocol version to use when connecting, but not that the server may downgrade it.
pub fn connect(url: &[u8], desired_version: crate::Protocol) -> Result<Box<dyn Transport>, Error> {
pub fn connect(url: &[u8], desired_version: crate::Protocol) -> Result<Box<dyn Transport + Send>, Error> {
let urlb = url;
let url = git_url::parse(urlb)?;
Ok(match url.scheme {
Expand Down
10 changes: 8 additions & 2 deletions git-transport/src/client/git/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ pub struct Connection<R, W> {
pub(in crate::client) mode: ConnectMode,
}

pub(crate) mod message {
use bstr::{BString, ByteVec};
impl<R, W> Connection<R, W> {
/// Return the inner reader and writer
pub fn into_inner(self) -> (R, W) {
(self.line_provider.into_inner(), self.writer)
}
}

pub(crate) mod message {
use crate::{Protocol, Service};
use bstr::{BString, ByteVec};

pub fn connect(
service: Service,
Expand Down
2 changes: 1 addition & 1 deletion gitoxide-core/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mod async_io {
pub async fn connect(
url: &[u8],
desired_version: transport::Protocol,
) -> Result<Box<dyn client::Transport>, Error> {
) -> Result<Box<dyn client::Transport + Send>, Error> {
let urlb = url;
let url = git_repository::url::parse(urlb)?;
Ok(match url.scheme {
Expand Down
6 changes: 3 additions & 3 deletions gitoxide-core/src/pack/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ mod blocking_io {
}
}

pub fn receive<P: Progress, W: io::Write>(
pub fn receive<P: Progress, W: io::Write + Send + 'static>(
protocol: Option<net::Protocol>,
url: &str,
directory: Option<PathBuf>,
Expand All @@ -138,13 +138,13 @@ mod blocking_io {
ctx: Context<W>,
) -> anyhow::Result<()> {
let transport = net::connect(url.as_bytes(), protocol.unwrap_or_default().into())?;
let mut delegate = CloneDelegate {
let delegate = CloneDelegate {
ctx,
directory,
refs_directory,
ref_filter: None,
};
protocol::fetch(transport, &mut delegate, protocol::credentials::helper, progress)?;
protocol::fetch(transport, delegate, protocol::credentials::helper, progress)?;
Ok(())
}
}
Expand Down
10 changes: 6 additions & 4 deletions gitoxide-core/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ pub mod refs {
) -> anyhow::Result<()> {
let transport = net::connect(url.as_bytes(), protocol.unwrap_or_default().into()).await?;
let mut delegate = LsRemotes::default();
let delegate = protocol::fetch(transport, delegate, protocol::credentials::helper, progress).await?;
let delegate = protocol::fetch(transport, delegate, protocol::credentials::helper, progress)
.await?
.0;

blocking::unblock(move || match ctx.format {
OutputFormat::Human => drop(print(ctx.out, &delegate.refs)),
Expand Down Expand Up @@ -119,11 +121,11 @@ pub mod refs {
protocol: Option<net::Protocol>,
url: &str,
progress: impl Progress,
ctx: Context<impl io::Write>,
ctx: Context<impl io::Write + Send + 'static>,
) -> anyhow::Result<()> {
let transport = net::connect(url.as_bytes(), protocol.unwrap_or_default().into())?;
let mut delegate = LsRemotes::default();
protocol::fetch(transport, &mut delegate, protocol::credentials::helper, progress)?;
let delegate = LsRemotes::default();
let delegate = protocol::fetch(transport, delegate, protocol::credentials::helper, progress)?.0;

match ctx.format {
OutputFormat::Human => drop(print(ctx.out, &delegate.refs)),
Expand Down

0 comments on commit a3b5d75

Please sign in to comment.