Skip to content

Commit

Permalink
[git-protocol] async capabilities and arguments abstractions
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Jun 5, 2021
1 parent bbd75d8 commit aa3eacb
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion git-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ doctest = false
[features]
serde1 = ["serde", "bstr/serde1", "git-transport/serde1", "git-hash/serde1"]
blocking-client = ["git-transport/blocking-client"]
async-client = ["git-transport/async-client", "async-trait", "futures-io"]
async-client = ["git-transport/async-client", "async-trait", "futures-io", "futures-lite"]

[[test]]
name = "blocking-client-protocol"
Expand All @@ -40,6 +40,7 @@ btoi = "0.4.2"
# for async-client
async-trait = { version = "0.1.50", optional = true }
futures-io = { version = "0.3.15", optional = true }
futures-lite = { version = "1.12.0", optional = true }

[dev-dependencies]
async-std = { version = "1.9.0", features = ["attributes"] }
Expand Down
68 changes: 68 additions & 0 deletions git-protocol/src/fetch/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,74 @@ impl Arguments {
}
}

#[cfg(all(not(feature = "blocking-client"), feature = "async-client"))]
mod async_io {
use crate::fetch::{Arguments, Command};
use bstr::ByteSlice;
use futures_lite::io::AsyncWriteExt;
use git_transport::{client, client::TransportV2Ext};

impl Arguments {
pub(crate) async fn send<'a, T: client::Transport + Send + 'a>(
&mut self,
transport: &'a mut T,
add_done_argument: bool,
) -> Result<Box<dyn client::ExtendedBufRead + Unpin + 'a>, client::Error> {
if self.haves.is_empty() {
assert!(add_done_argument, "If there are no haves, is_done must be true.");
}
match self.version {
git_transport::Protocol::V1 => {
let on_into_read = if add_done_argument {
client::MessageKind::Text(&b"done"[..])
} else {
client::MessageKind::Flush
};
let retained_state = if transport.is_stateful() {
None
} else {
Some(self.args.clone())
};
let mut line_writer =
transport.request(client::WriteMode::OneLfTerminatedLinePerWriteCall, on_into_read)?;

if let Some(first_arg_position) = self.args.iter().position(|l| l.starts_with_str("want ")) {
self.args.swap(first_arg_position, 0);
}
let had_args = !self.args.is_empty();
for arg in self.args.drain(..) {
line_writer.write_all(&arg).await?;
}
if had_args {
line_writer.write_message(client::MessageKind::Flush).await?;
}
for line in self.haves.drain(..) {
line_writer.write_all(&line).await?;
}
if let Some(next_args) = retained_state {
self.args = next_args;
}
Ok(line_writer.into_read().await?)
}
git_transport::Protocol::V2 => {
let retained_state = self.args.clone();
self.args.extend(self.haves.drain(..));
if add_done_argument {
self.args.push("done".into());
}
transport
.invoke(
Command::Fetch.as_str(),
self.features.iter().filter(|(_, v)| v.is_some()).cloned(),
Some(std::mem::replace(&mut self.args, retained_state).into_iter()),
)
.await
}
}
}
}
}

#[cfg(feature = "blocking-client")]
mod blocking_io {
use std::io::Write;
Expand Down
58 changes: 50 additions & 8 deletions git-protocol/src/fetch/tests/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,48 @@ mod impls {
}
}

#[cfg(all(not(feature = "blocking-client"), feature = "async-client"))]
mod impls {
use crate::fetch::tests::arguments::Transport;
use async_trait::async_trait;
use git_transport::{
client,
client::{Error, Identity, MessageKind, RequestWriter, SetServiceResponse, WriteMode},
Protocol, Service,
};

#[async_trait]
impl<T: client::Transport + Send> client::Transport for Transport<T> {
async fn handshake(&mut self, service: Service) -> Result<SetServiceResponse<'_>, Error> {
self.inner.handshake(service).await
}

fn set_identity(&mut self, identity: Identity) -> Result<(), Error> {
self.inner.set_identity(identity)
}

fn request(&mut self, write_mode: WriteMode, on_into_read: MessageKind) -> Result<RequestWriter<'_>, Error> {
self.inner.request(write_mode, on_into_read)
}

async fn close(&mut self) -> Result<(), Error> {
self.inner.close().await
}

fn to_url(&self) -> String {
self.inner.to_url()
}

fn desired_protocol_version(&self) -> Protocol {
self.inner.desired_protocol_version()
}

fn is_stateful(&self) -> bool {
self.stateful
}
}
}

fn transport(
out: &mut Vec<u8>,
stateful: bool,
Expand Down Expand Up @@ -88,7 +130,7 @@ mod v1 {

arguments.want(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c907"));
arguments.want(id("ff333369de1221f9bfbbe03a3a13e9a09bc1ffff"));
arguments.send(&mut t, true).expect("sending to buffer to work");
arguments.send(&mut t, true).await.expect("sending to buffer to work");
assert_eq!(
out.as_bstr(),
b"0046want 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 feature-a feature-b
Expand All @@ -111,10 +153,10 @@ mod v1 {
arguments.deepen_since(12345);
arguments.deepen_not("refs/heads/main".into());
arguments.have(id("0000000000000000000000000000000000000000"));
arguments.send(&mut t, false).expect("sending to buffer to work");
arguments.send(&mut t, false).await.expect("sending to buffer to work");

arguments.have(id("1111111111111111111111111111111111111111"));
arguments.send(&mut t, true).expect("sending to buffer to work");
arguments.send(&mut t, true).await.expect("sending to buffer to work");
assert_eq!(
out.as_bstr(),
b"005cwant 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 feature-a shallow deepen-since deepen-not
Expand Down Expand Up @@ -144,10 +186,10 @@ mod v1 {
arguments.deepen(1);
arguments.want(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c907"));
arguments.have(id("0000000000000000000000000000000000000000"));
arguments.send(&mut t, false).expect("sending to buffer to work");
arguments.send(&mut t, false).await.expect("sending to buffer to work");

arguments.have(id("1111111111111111111111111111111111111111"));
arguments.send(&mut t, true).expect("sending to buffer to work");
arguments.send(&mut t, true).await.expect("sending to buffer to work");
assert_eq!(
out.as_bstr(),
b"0044want 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 feature-a shallow
Expand Down Expand Up @@ -175,7 +217,7 @@ mod v2 {
arguments.deepen_relative();
arguments.want(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c907"));
arguments.want(id("ff333369de1221f9bfbbe03a3a13e9a09bc1ffff"));
arguments.send(&mut t, true).expect("sending to buffer to work");
arguments.send(&mut t, true).await.expect("sending to buffer to work");
assert_eq!(
out.as_bstr(),
b"0012command=fetch
Expand Down Expand Up @@ -205,10 +247,10 @@ mod v2 {
arguments.want(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c907"));
arguments.deepen_not("refs/heads/main".into());
arguments.have(id("0000000000000000000000000000000000000000"));
arguments.send(&mut t, false).expect("sending to buffer to work");
arguments.send(&mut t, false).await.expect("sending to buffer to work");

arguments.have(id("1111111111111111111111111111111111111111"));
arguments.send(&mut t, true).expect("sending to buffer to work");
arguments.send(&mut t, true).await.expect("sending to buffer to work");
assert_eq!(
out.as_bstr(),
b"0012command=fetch
Expand Down
2 changes: 1 addition & 1 deletion git-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ default = []
serde1 = ["serde"]
http-client-curl = ["curl", "base64", "git-features/io-pipe", "blocking-client"]
blocking-client = ["git-packetline/blocking-io"]
async-client = ["async-trait", "futures-lite", "futures-io", "git-packetline/async-io", "pin-project-lite"]
async-client = ["git-packetline/async-io", "async-trait", "futures-lite", "futures-io", "pin-project-lite"]

[[test]]
name = "blocking-transport"
Expand Down

0 comments on commit aa3eacb

Please sign in to comment.