Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Response::remote_addr() #489

Merged
merged 5 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ impl<R: Read + Sized + Done + Into<Stream>> Read for PoolReturnRead<R> {

#[cfg(test)]
mod tests {
use crate::stream::remote_addr_for_test;
use crate::ReadWrite;

use super::*;
Expand All @@ -311,7 +312,7 @@ mod tests {

impl NoopStream {
fn stream() -> Stream {
Stream::new(NoopStream)
Stream::new(NoopStream, remote_addr_for_test())
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/response.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::io::{self, Cursor, Read};
use std::net::SocketAddr;
use std::str::FromStr;
use std::{fmt, io::BufRead};

Expand Down Expand Up @@ -69,6 +70,8 @@ pub struct Response {
// Boxed to avoid taking up too much size.
unit: Box<Unit>,
reader: Box<dyn Read + Send + Sync + 'static>,
/// The socket address of the server that sent the response.
remote_addr: SocketAddr,
/// The redirect history of this response, if any. The history starts with
/// the first response received and ends with the response immediately
/// previous to this one.
Expand Down Expand Up @@ -223,6 +226,11 @@ impl Response {
charset_from_content_type(self.header("content-type"))
}

/// The socket address of the server that sent the response.
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}

/// Turn this response into a `impl Read` of the body.
///
/// 1. If `Transfer-Encoding: chunked`, the returned reader will unchunk it
Expand Down Expand Up @@ -495,6 +503,7 @@ impl Response {
///
/// assert_eq!(resp.status(), 401);
pub(crate) fn do_from_stream(stream: Stream, unit: Unit) -> Result<Response, Error> {
let remote_addr = stream.remote_addr;
//
// HTTP/1.1 200 OK\r\n
let mut stream = stream::DeadlineStream::new(stream, unit.deadline);
Expand Down Expand Up @@ -540,6 +549,7 @@ impl Response {
headers,
unit: Box::new(unit),
reader: Box::new(Cursor::new(vec![])),
remote_addr,
history: vec![],
length,
compression,
Expand Down Expand Up @@ -668,7 +678,8 @@ impl FromStr for Response {
/// # }
/// ```
fn from_str(s: &str) -> Result<Self, Self::Err> {
let stream = Stream::new(ReadOnlyStream::new(s.into()));
let remote_addr = "0.0.0.0:0".parse().unwrap();
let stream = Stream::new(ReadOnlyStream::new(s.into()), remote_addr);
let request_url = "https://example.com".parse().unwrap();
let request_reader = SizedReader {
size: crate::body::BodySize::Empty,
Expand Down Expand Up @@ -1029,7 +1040,10 @@ mod tests {
OK",
);
let v = cow.to_vec();
let s = Stream::new(ReadOnlyStream::new(v));
let s = Stream::new(
ReadOnlyStream::new(v),
crate::stream::remote_addr_for_test(),
);
let request_url = "https://example.com".parse().unwrap();
let request_reader = SizedReader {
size: crate::body::BodySize::Empty,
Expand Down
42 changes: 25 additions & 17 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub trait TlsConnector: Send + Sync {

pub(crate) struct Stream {
inner: BufReader<Box<dyn ReadWrite>>,
/// The remote address the stream is connected to.
pub(crate) remote_addr: SocketAddr,
}

impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> {
Expand Down Expand Up @@ -177,9 +179,10 @@ impl fmt::Debug for Stream {
}

impl Stream {
pub(crate) fn new(t: impl ReadWrite) -> Stream {
pub(crate) fn new(t: impl ReadWrite, remote_addr: SocketAddr) -> Stream {
Stream::logged_create(Stream {
inner: BufReader::new(Box::new(t)),
remote_addr,
})
}

Expand All @@ -192,12 +195,6 @@ impl Stream {
self.inner.buffer()
}

fn from_tcp_stream(t: TcpStream) -> Stream {
Stream::logged_create(Stream {
inner: BufReader::new(Box::new(t)),
})
}

// Check if the server has closed a stream by performing a one-byte
// non-blocking read. If this returns EOF, the server has closed the
// connection: return true. If this returns a successful read, there are
Expand Down Expand Up @@ -307,20 +304,25 @@ pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error>
//
let port = unit.url.port().unwrap_or(80);

connect_host(unit, hostname, port).map(Stream::from_tcp_stream)
connect_host(unit, hostname, port).map(|(t, r)| Stream::new(t, r))
}

pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
let port = unit.url.port().unwrap_or(443);

let sock = connect_host(unit, hostname, port)?;
let (sock, remote_addr) = connect_host(unit, hostname, port)?;

let tls_conf = &unit.agent.config.tls_config;
let https_stream = tls_conf.connect(hostname, Box::new(sock))?;
Ok(Stream::new(https_stream))
Ok(Stream::new(https_stream, remote_addr))
}

pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<TcpStream, Error> {
/// If successful, returns a `TcpStream` and the remote address it is connected to.
pub(crate) fn connect_host(
unit: &Unit,
hostname: &str,
port: u16,
) -> Result<(TcpStream, SocketAddr), Error> {
let connect_deadline: Option<Instant> =
if let Some(timeout_connect) = unit.agent.config.timeout_connect {
Instant::now().checked_add(timeout_connect)
Expand All @@ -347,7 +349,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
let proto = proxy.as_ref().map(|proxy| proxy.proto);

let mut any_err = None;
let mut any_stream = None;
let mut any_stream_and_addr = None;
// Find the first sock_addr that accepts a connection
for sock_addr in sock_addrs {
// ensure connect timeout or overall timeout aren't yet hit.
Expand Down Expand Up @@ -376,15 +378,15 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
};

if let Ok(stream) = stream {
any_stream = Some(stream);
any_stream_and_addr = Some((stream, sock_addr));
break;
} else if let Err(err) = stream {
any_err = Some(err);
}
}

let mut stream = if let Some(stream) = any_stream {
stream
let (mut stream, remote_addr) = if let Some(stream_and_addr) = any_stream_and_addr {
stream_and_addr
} else if let Some(e) = any_err {
return Err(ErrorKind::ConnectionFailed.msg("Connect error").src(e));
} else {
Expand Down Expand Up @@ -425,7 +427,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
}
}

Ok(stream)
Ok((stream, remote_addr))
}

#[cfg(feature = "socks-proxy")]
Expand Down Expand Up @@ -611,6 +613,12 @@ pub(crate) fn connect_test(unit: &Unit) -> Result<Stream, Error> {
Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme '{}'", unit.url.scheme())))
}

#[cfg(test)]
pub(crate) fn remote_addr_for_test() -> SocketAddr {
use std::net::{Ipv4Addr, SocketAddrV4};
SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -662,7 +670,7 @@ mod tests {
let recorder = ReadRecorder {
reads: reads.clone(),
};
let stream = Stream::new(recorder);
let stream = Stream::new(recorder, remote_addr_for_test());
let mut deadline_stream = DeadlineStream::new(stream, None);
let mut buf = [0u8; 1];
for _ in 0..8193 {
Expand Down
12 changes: 9 additions & 3 deletions src/test/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::Error;
use crate::stream::{ReadOnlyStream, Stream};
use crate::stream::{remote_addr_for_test, ReadOnlyStream, Stream};
use crate::unit::Unit;
use crate::ReadWrite;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -52,7 +52,10 @@ pub(crate) fn make_response(
}
write!(&mut buf, "\r\n").ok();
buf.append(&mut body);
Ok(Stream::new(ReadOnlyStream::new(buf)))
Ok(Stream::new(
ReadOnlyStream::new(buf),
remote_addr_for_test(),
))
}

pub(crate) fn resolve_handler(unit: &Unit) -> Result<Stream, Error> {
Expand Down Expand Up @@ -97,7 +100,10 @@ impl Recorder {

fn stream(&self) -> Stream {
let cursor = Cursor::new(b"HTTP/1.1 200 OK\r\n\r\n");
Stream::new(TestStream::new(cursor, self.clone()))
Stream::new(
TestStream::new(cursor, self.clone()),
remote_addr_for_test(),
)
}
}

Expand Down