Skip to content

Commit

Permalink
Add Response::remote_addr() (#489)
Browse files Browse the repository at this point in the history
Fixes #488.
  • Loading branch information
mvforell committed Oct 3, 2022
1 parent b0796c1 commit 855f20e
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 23 deletions.
3 changes: 2 additions & 1 deletion src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ impl<R: Read + Sized + Done + Into<Stream>> Read for PoolReturnRead<R> {

#[cfg(test)]
mod tests {
use crate::stream::remote_addr_for_test;
use crate::ReadWrite;

use super::*;
Expand All @@ -311,7 +312,7 @@ mod tests {

impl NoopStream {
fn stream() -> Stream {
Stream::new(NoopStream)
Stream::new(NoopStream, remote_addr_for_test())
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/response.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::io::{self, Cursor, Read};
use std::net::SocketAddr;
use std::str::FromStr;
use std::{fmt, io::BufRead};

Expand Down Expand Up @@ -69,6 +70,8 @@ pub struct Response {
// Boxed to avoid taking up too much size.
unit: Box<Unit>,
reader: Box<dyn Read + Send + Sync + 'static>,
/// The socket address of the server that sent the response.
remote_addr: SocketAddr,
/// The redirect history of this response, if any. The history starts with
/// the first response received and ends with the response immediately
/// previous to this one.
Expand Down Expand Up @@ -223,6 +226,11 @@ impl Response {
charset_from_content_type(self.header("content-type"))
}

/// The socket address of the server that sent the response.
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}

/// Turn this response into a `impl Read` of the body.
///
/// 1. If `Transfer-Encoding: chunked`, the returned reader will unchunk it
Expand Down Expand Up @@ -495,6 +503,7 @@ impl Response {
///
/// assert_eq!(resp.status(), 401);
pub(crate) fn do_from_stream(stream: Stream, unit: Unit) -> Result<Response, Error> {
let remote_addr = stream.remote_addr;
//
// HTTP/1.1 200 OK\r\n
let mut stream = stream::DeadlineStream::new(stream, unit.deadline);
Expand Down Expand Up @@ -540,6 +549,7 @@ impl Response {
headers,
unit: Box::new(unit),
reader: Box::new(Cursor::new(vec![])),
remote_addr,
history: vec![],
length,
compression,
Expand Down Expand Up @@ -668,7 +678,8 @@ impl FromStr for Response {
/// # }
/// ```
fn from_str(s: &str) -> Result<Self, Self::Err> {
let stream = Stream::new(ReadOnlyStream::new(s.into()));
let remote_addr = "0.0.0.0:0".parse().unwrap();
let stream = Stream::new(ReadOnlyStream::new(s.into()), remote_addr);
let request_url = "https://example.com".parse().unwrap();
let request_reader = SizedReader {
size: crate::body::BodySize::Empty,
Expand Down Expand Up @@ -1029,7 +1040,10 @@ mod tests {
OK",
);
let v = cow.to_vec();
let s = Stream::new(ReadOnlyStream::new(v));
let s = Stream::new(
ReadOnlyStream::new(v),
crate::stream::remote_addr_for_test(),
);
let request_url = "https://example.com".parse().unwrap();
let request_reader = SizedReader {
size: crate::body::BodySize::Empty,
Expand Down
42 changes: 25 additions & 17 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub trait TlsConnector: Send + Sync {

pub(crate) struct Stream {
inner: BufReader<Box<dyn ReadWrite>>,
/// The remote address the stream is connected to.
pub(crate) remote_addr: SocketAddr,
}

impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> {
Expand Down Expand Up @@ -177,9 +179,10 @@ impl fmt::Debug for Stream {
}

impl Stream {
pub(crate) fn new(t: impl ReadWrite) -> Stream {
pub(crate) fn new(t: impl ReadWrite, remote_addr: SocketAddr) -> Stream {
Stream::logged_create(Stream {
inner: BufReader::new(Box::new(t)),
remote_addr,
})
}

Expand All @@ -192,12 +195,6 @@ impl Stream {
self.inner.buffer()
}

fn from_tcp_stream(t: TcpStream) -> Stream {
Stream::logged_create(Stream {
inner: BufReader::new(Box::new(t)),
})
}

// Check if the server has closed a stream by performing a one-byte
// non-blocking read. If this returns EOF, the server has closed the
// connection: return true. If this returns a successful read, there are
Expand Down Expand Up @@ -307,20 +304,25 @@ pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error>
//
let port = unit.url.port().unwrap_or(80);

connect_host(unit, hostname, port).map(Stream::from_tcp_stream)
connect_host(unit, hostname, port).map(|(t, r)| Stream::new(t, r))
}

pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
let port = unit.url.port().unwrap_or(443);

let sock = connect_host(unit, hostname, port)?;
let (sock, remote_addr) = connect_host(unit, hostname, port)?;

let tls_conf = &unit.agent.config.tls_config;
let https_stream = tls_conf.connect(hostname, Box::new(sock))?;
Ok(Stream::new(https_stream))
Ok(Stream::new(https_stream, remote_addr))
}

pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<TcpStream, Error> {
/// If successful, returns a `TcpStream` and the remote address it is connected to.
pub(crate) fn connect_host(
unit: &Unit,
hostname: &str,
port: u16,
) -> Result<(TcpStream, SocketAddr), Error> {
let connect_deadline: Option<Instant> =
if let Some(timeout_connect) = unit.agent.config.timeout_connect {
Instant::now().checked_add(timeout_connect)
Expand All @@ -347,7 +349,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
let proto = proxy.as_ref().map(|proxy| proxy.proto);

let mut any_err = None;
let mut any_stream = None;
let mut any_stream_and_addr = None;
// Find the first sock_addr that accepts a connection
for sock_addr in sock_addrs {
// ensure connect timeout or overall timeout aren't yet hit.
Expand Down Expand Up @@ -376,15 +378,15 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
};

if let Ok(stream) = stream {
any_stream = Some(stream);
any_stream_and_addr = Some((stream, sock_addr));
break;
} else if let Err(err) = stream {
any_err = Some(err);
}
}

let mut stream = if let Some(stream) = any_stream {
stream
let (mut stream, remote_addr) = if let Some(stream_and_addr) = any_stream_and_addr {
stream_and_addr
} else if let Some(e) = any_err {
return Err(ErrorKind::ConnectionFailed.msg("Connect error").src(e));
} else {
Expand Down Expand Up @@ -425,7 +427,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
}
}

Ok(stream)
Ok((stream, remote_addr))
}

#[cfg(feature = "socks-proxy")]
Expand Down Expand Up @@ -611,6 +613,12 @@ pub(crate) fn connect_test(unit: &Unit) -> Result<Stream, Error> {
Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme '{}'", unit.url.scheme())))
}

#[cfg(test)]
pub(crate) fn remote_addr_for_test() -> SocketAddr {
use std::net::{Ipv4Addr, SocketAddrV4};
SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -662,7 +670,7 @@ mod tests {
let recorder = ReadRecorder {
reads: reads.clone(),
};
let stream = Stream::new(recorder);
let stream = Stream::new(recorder, remote_addr_for_test());
let mut deadline_stream = DeadlineStream::new(stream, None);
let mut buf = [0u8; 1];
for _ in 0..8193 {
Expand Down
12 changes: 9 additions & 3 deletions src/test/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::Error;
use crate::stream::{ReadOnlyStream, Stream};
use crate::stream::{remote_addr_for_test, ReadOnlyStream, Stream};
use crate::unit::Unit;
use crate::ReadWrite;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -52,7 +52,10 @@ pub(crate) fn make_response(
}
write!(&mut buf, "\r\n").ok();
buf.append(&mut body);
Ok(Stream::new(ReadOnlyStream::new(buf)))
Ok(Stream::new(
ReadOnlyStream::new(buf),
remote_addr_for_test(),
))
}

pub(crate) fn resolve_handler(unit: &Unit) -> Result<Stream, Error> {
Expand Down Expand Up @@ -97,7 +100,10 @@ impl Recorder {

fn stream(&self) -> Stream {
let cursor = Cursor::new(b"HTTP/1.1 200 OK\r\n\r\n");
Stream::new(TestStream::new(cursor, self.clone()))
Stream::new(
TestStream::new(cursor, self.clone()),
remote_addr_for_test(),
)
}
}

Expand Down

0 comments on commit 855f20e

Please sign in to comment.