Skip to content

Commit

Permalink
fix: http transports can now reuse a connection.
Browse files Browse the repository at this point in the history
This makes connections more efficient generally and `cargo` relies
on that behaviour in their tests as well.
  • Loading branch information
Byron committed Dec 10, 2022
1 parent 9079b9d commit ff0332e
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 51 deletions.
2 changes: 1 addition & 1 deletion git-repository/src/config/cache/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Cache {
"HOME" => Some(home_env),
_ => None,
}
.and_then(|perm| std::env::var_os(name).and_then(|val| perm.check_opt(val)))
.and_then(|perm| perm.check_opt(name).and_then(std::env::var_os))
})
.map(|p| (source, p.into_owned()))
})
Expand Down
59 changes: 30 additions & 29 deletions git-transport/src/client/blocking_io/http/curl/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl Handler {
fn reset(&mut self) {
self.checked_status = false;
self.last_status = 0;
self.follow = FollowRedirects::default();
}
fn parse_status_inner(data: &[u8]) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
let code = data
Expand Down Expand Up @@ -68,37 +69,31 @@ impl curl::easy::Handler for Handler {
}

fn header(&mut self, data: &[u8]) -> bool {
match self.send_header.as_mut() {
Some(writer) => {
if self.checked_status {
writer.write_all(data).is_ok()
} else {
self.checked_status = true;
self.last_status = 200;
match Handler::parse_status(data, self.follow) {
None => true,
Some((status, err)) => {
self.last_status = status;
writer
.channel
.send(Err(io::Error::new(
if status == 401 {
io::ErrorKind::PermissionDenied
} else if (500..600).contains(&status) {
io::ErrorKind::ConnectionAborted
} else {
io::ErrorKind::Other
},
err,
)))
.ok();
false
}
}
if let Some(writer) = self.send_header.as_mut() {
if self.checked_status {
writer.write_all(data).ok();
} else {
self.checked_status = true;
self.last_status = 200;
if let Some((status, err)) = Handler::parse_status(data, self.follow) {
self.last_status = status;
writer
.channel
.send(Err(io::Error::new(
if status == 401 {
io::ErrorKind::PermissionDenied
} else if (500..600).contains(&status) {
io::ErrorKind::ConnectionAborted
} else {
io::ErrorKind::Other
},
err,
)))
.ok();
}
}
None => false,
}
};
true
}
}

Expand All @@ -125,6 +120,10 @@ pub fn new() -> (
let (res_send, res_recv) = sync_channel(0);
let handle = std::thread::spawn(move || -> Result<(), Error> {
let mut handle = Easy2::new(Handler::default());
// We don't wait for the possibility for pipelining to become clear, and curl tries to reuse connections by default anyway.
handle.pipewait(false)?;
handle.tcp_keepalive(true)?;

let mut follow = None;
let mut redirected_base_url = None::<String>;

Expand Down Expand Up @@ -158,6 +157,8 @@ pub fn new() -> (
for header in extra_headers {
headers.append(&header)?;
}
// needed to avoid sending Expect: 100-continue, which adds another response and only CURL wants that
headers.append("Expect:")?;
handle.verbose(verbose)?;

let mut proxy_auth_action = None;
Expand Down
1 change: 0 additions & 1 deletion git-transport/src/client/blocking_io/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ impl<H: Http> client::TransportWithoutIO for Transport<H> {
Cow::Borrowed(self.user_agent_header),
Cow::Owned(format!("Content-Type: application/x-{}-request", service.as_str())),
format!("Accept: application/x-{}-result", service.as_str()).into(),
"Expect:".into(), // needed to avoid sending Expect: 100-continue, which adds another response and only CURL wants that
];
let mut dynamic_headers = Vec::new();
self.add_basic_auth_if_present(&mut dynamic_headers)?;
Expand Down
11 changes: 6 additions & 5 deletions git-transport/src/client/blocking_io/http/reqwest/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ impl Default for Remote {
let (req_send, req_recv) = std::sync::mpsc::sync_channel(0);
let (res_send, res_recv) = std::sync::mpsc::sync_channel(0);
let handle = std::thread::spawn(move || -> Result<(), Error> {
// We may error while configuring, which is expected as part of the internal protocol. The error will be
// received and the sender of the request might restart us.
let client = reqwest::blocking::ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(20))
.http1_title_case_headers()
.build()?;
for Request {
url,
headers,
upload,
config,
} in req_recv
{
// We may error while configuring, which is expected as part of the internal protocol. The error will be
// received and the sender of the request might restart us.
let client = reqwest::blocking::ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(20))
.build()?;
let mut req_builder = if upload { client.post(url) } else { client.get(url) }.headers(headers);
let (post_body_tx, post_body_rx) = pipe::unidirectional(0);
if upload {
Expand Down
2 changes: 2 additions & 0 deletions git-transport/tests/blocking-transport-http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
extern crate core;

use std::path::PathBuf;

pub type Error = Box<dyn std::error::Error>;
Expand Down
110 changes: 95 additions & 15 deletions git-transport/tests/client/blocking_io/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,88 @@ fn http_status_500_is_communicated_via_special_io_error() -> crate::Result {
Ok(())
}

// based on a test in cargo
#[test]
fn http_will_use_pipelining() {
let server = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();

fn headers(rdr: &mut dyn BufRead) -> HashSet<String> {
let valid = ["GET", "Authorization", "Accept"];
rdr.lines()
.map(|s| s.unwrap())
.take_while(|s| s.len() > 2)
.map(|s| s.trim().to_string())
.filter(|s| valid.iter().any(|prefix| s.starts_with(*prefix)))
.collect()
}

let thread = std::thread::spawn({
move || {
let mut conn = std::io::BufReader::new(server.accept().unwrap().0);
let req = headers(&mut conn);
conn.get_mut()
.write_all(
b"HTTP/1.1 401 Unauthorized\r\n\
WWW-Authenticate: Basic realm=\"wheee\"\r\n\
Content-Length: 0\r\n\
\r\n",
)
.unwrap();
assert_eq!(
req,
vec![
"GET /reponame/info/refs?service=git-upload-pack HTTP/1.1",
"Accept: */*"
]
.into_iter()
.map(|s| s.to_string())
.collect()
);

let req = headers(&mut conn);
conn.get_mut()
.write_all(
b"HTTP/1.1 401 Unauthorized\r\n\
WWW-Authenticate: Basic realm=\"testenv\"\r\n\
\r\n",
)
.unwrap();
assert_eq!(
req,
vec![
"GET /reponame/info/refs?service=git-upload-pack HTTP/1.1",
"Authorization: Basic Zm9vOmJhcg==",
"Accept: */*",
]
.into_iter()
.map(|s| s.to_string())
.collect()
);
}
});

let url = format!("http://{}:{}/reponame", &addr.ip().to_string(), &addr.port(),);
let mut client = git_transport::client::http::connect(&url, git_transport::Protocol::V2);
match client.handshake(git_transport::Service::UploadPack, &[]) {
Ok(_) => unreachable!("expecting permission denied to be detected"),
Err(git_transport::client::Error::Io(err)) if err.kind() == std::io::ErrorKind::PermissionDenied => {}
Err(err) => unreachable!("{err:?}"),
};
client
.set_identity(git_sec::identity::Account {
username: "foo".into(),
password: "bar".into(),
})
.unwrap();
match client.handshake(git_transport::Service::UploadPack, &[]) {
Ok(_) => unreachable!("expecting permission denied to be detected"),
Err(git_transport::client::Error::Io(err)) if err.kind() == std::io::ErrorKind::PermissionDenied => {}
Err(err) => unreachable!("{err:?}"),
};
thread.join().unwrap();
}

#[test]
fn http_authentication_error_can_be_differentiated_and_identity_is_transmitted() -> crate::Result {
let (server, mut client) = assert_error_status(401, std::io::ErrorKind::PermissionDenied)?;
Expand Down Expand Up @@ -544,15 +626,9 @@ Git-Protocol: version=2
let messages = Rc::try_unwrap(messages).expect("no other handle").into_inner();
assert_eq!(messages.len(), 5);

assert_eq!(
server
.received_as_string()
.lines()
.filter(|l| !l.starts_with("expect: "))
.map(|l| l.to_lowercase())
.collect::<HashSet<_>>(),
format!(
"POST /path/not/important/due/to/mock/git-upload-pack HTTP/1.1
let actual = server.received_as_string();
let expected = format!(
"POST /path/not/important/due/to/mock/git-upload-pack HTTP/1.1
Host: 127.0.0.1:{}
Transfer-Encoding: chunked
User-Agent: git/oxide-{}
Expand All @@ -566,12 +642,16 @@ Git-Protocol: version=2
0
",
server.addr.port(),
env!("CARGO_PKG_VERSION")
)
.lines()
.map(|l| l.to_lowercase())
.collect::<HashSet<_>>()
server.addr.port(),
env!("CARGO_PKG_VERSION")
);
assert_eq!(
actual
.lines()
.filter(|l| !l.starts_with("expect: "))
.map(|l| l.to_lowercase())
.collect::<HashSet<_>>(),
expected.lines().map(|l| l.to_lowercase()).collect::<HashSet<_>>()
);
Ok(())
}
Expand Down

0 comments on commit ff0332e

Please sign in to comment.