Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weekly sync #242

Merged
merged 4 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
28022cfd206a7dfa1670a9884eb5b8bf621e8bfd
2c9d4c55853235e908a1acd20454ebe7b979d246
18 changes: 12 additions & 6 deletions pingora-core/src/connectors/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::net::SocketAddr as InetSocketAddr;
use std::os::unix::io::AsRawFd;

use crate::protocols::l4::ext::{
connect as tcp_connect, connect_uds, set_recv_buf, set_tcp_keepalive,
connect_uds, connect_with as tcp_connect, set_recv_buf, set_tcp_fastopen_connect,
set_tcp_keepalive,
};
use crate::protocols::l4::socket::SocketAddr;
use crate::protocols::l4::stream::Stream;
Expand All @@ -39,7 +40,16 @@ where
let peer_addr = peer.address();
let mut stream: Stream = match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref());
let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
if peer.tcp_fast_open() {
set_tcp_fastopen_connect(socket.as_raw_fd())?;
}
if let Some(recv_buf) = peer.tcp_recv_buf() {
debug!("Setting recv buf size");
set_recv_buf(socket.as_raw_fd(), recv_buf)?;
}
Ok(())
});
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
Expand All @@ -55,10 +65,6 @@ where
debug!("Setting tcp keepalive");
set_tcp_keepalive(&socket, ka)?;
}
if let Some(recv_buf) = peer.tcp_recv_buf() {
debug!("Setting recv buf size");
set_recv_buf(socket.as_raw_fd(), recv_buf)?;
}
Ok(socket.into())
}
Err(e) => {
Expand Down
34 changes: 34 additions & 0 deletions pingora-core/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ mod tests {
use super::*;
use crate::tls::ssl::SslMethod;
use crate::upstreams::peer::BasicPeer;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixListener;

// 192.0.2.1 is effectively a black hole
const BLACK_HOLE: &str = "192.0.2.1:79";
Expand Down Expand Up @@ -400,6 +402,38 @@ mod tests {
assert!(reused);
}

const MOCK_UDS_PATH: &str = "/tmp/test_unix_transport_connector.sock";

// one-off mock server
async fn mock_connect_server() {
let _ = std::fs::remove_file(MOCK_UDS_PATH);
let listener = UnixListener::bind(MOCK_UDS_PATH).unwrap();
if let Ok((mut stream, _addr)) = listener.accept().await {
stream.write_all(b"it works!").await.unwrap();
// wait a bit so that the client can read
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let _ = std::fs::remove_file(MOCK_UDS_PATH);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_connect_uds() {
tokio::spawn(async {
mock_connect_server().await;
});
// create a new service at /tmp
let connector = TransportConnector::new(None);
let peer = BasicPeer::new_uds(MOCK_UDS_PATH).unwrap();
// make a new connection to mock uds
let mut stream = connector.new_stream(&peer).await.unwrap();
let mut buf = [0; 9];
let _ = stream.read(&mut buf).await.unwrap();
assert_eq!(&buf, b"it works!");
connector.release_stream(stream, peer.reuse_hash(), None);

let (_, reused) = connector.get_stream(&peer).await.unwrap();
assert!(reused);
}

async fn do_test_conn_timeout(conf: Option<ConnectorOptions>) {
let connector = TransportConnector::new(conf);
let mut peer = BasicPeer::new(BLACK_HOLE);
Expand Down
27 changes: 21 additions & 6 deletions pingora-core/src/listeners/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::os::unix::net::UnixListener as StdUnixListener;
use std::time::Duration;
use tokio::net::TcpSocket;

use crate::protocols::l4::ext::set_tcp_fastopen_backlog;
use crate::protocols::l4::listener::Listener;
pub use crate::protocols::l4::stream::Stream;
use crate::server::ListenFds;
Expand All @@ -51,12 +52,16 @@ impl AsRef<str> for ServerAddress {
}

/// TCP socket configuration options.
#[derive(Clone, Debug)]
#[non_exhaustive]
#[derive(Clone, Debug, Default)]
pub struct TcpSocketOptions {
/// IPV6_V6ONLY flag (if true, limit socket to IPv6 communication only).
/// This is mostly useful when binding to `[::]`, which on most Unix distributions
/// will bind to both IPv4 and IPv6 addresses by default.
pub ipv6_only: bool,
/// Enable TCP fast open and set the backlog size of it.
/// See the [man page](https://man7.org/linux/man-pages/man7/tcp.7.html) for more information.
pub tcp_fastopen: Option<usize>,
// TODO: allow configuring reuseaddr, backlog, etc. from here?
}

Expand Down Expand Up @@ -121,10 +126,17 @@ fn apply_tcp_socket_options(sock: &TcpSocket, opt: Option<&TcpSocketOptions>) ->
let Some(opt) = opt else {
return Ok(());
};
let socket_ref = socket2::SockRef::from(sock);
socket_ref
.set_only_v6(opt.ipv6_only)
.or_err(BindError, "failed to set IPV6_V6ONLY")
if opt.ipv6_only {
let socket_ref = socket2::SockRef::from(sock);
socket_ref
.set_only_v6(opt.ipv6_only)
.or_err(BindError, "failed to set IPV6_V6ONLY")?;
}

if let Some(backlog) = opt.tcp_fastopen {
set_tcp_fastopen_backlog(sock.as_raw_fd(), backlog)?;
}
Ok(())
}

fn from_raw_fd(address: &ServerAddress, fd: i32) -> Result<Listener> {
Expand Down Expand Up @@ -279,7 +291,10 @@ mod test {

#[tokio::test]
async fn test_listen_tcp_ipv6_only() {
let sock_opt = Some(TcpSocketOptions { ipv6_only: true });
let sock_opt = Some(TcpSocketOptions {
ipv6_only: true,
..Default::default()
});
let mut listener = ListenerEndpoint::new(ServerAddress::Tcp("[::]:7101".into(), sock_opt));
listener.listen(None).await.unwrap();
tokio::spawn(async move {
Expand Down
79 changes: 71 additions & 8 deletions pingora-core/src/protocols/l4/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,45 @@ pub fn set_recv_buf(_fd: RawFd, _: usize) -> Result<()> {
Ok(())
}

/*
* this extension is needed until the following are addressed
* https://github.com/tokio-rs/tokio/issues/1543
* https://github.com/tokio-rs/mio/issues/1257
* https://github.com/tokio-rs/mio/issues/1211
*/
/// connect() to the given address while optionally bind to the specific source address
/// Enable client side TCP fast open.
#[cfg(target_os = "linux")]
pub fn set_tcp_fastopen_connect(fd: RawFd) -> Result<()> {
set_opt(
fd,
libc::IPPROTO_TCP,
libc::TCP_FASTOPEN_CONNECT,
1 as c_int,
)
.or_err(ConnectError, "failed to set TCP_FASTOPEN_CONNECT")
}

#[cfg(not(target_os = "linux"))]
pub fn set_tcp_fastopen_connect(_fd: RawFd) -> Result<()> {
Ok(())
}

/// Enable server side TCP fast open.
#[cfg(target_os = "linux")]
pub fn set_tcp_fastopen_backlog(fd: RawFd, backlog: usize) -> Result<()> {
set_opt(fd, libc::IPPROTO_TCP, libc::TCP_FASTOPEN, backlog as c_int)
.or_err(ConnectError, "failed to set TCP_FASTOPEN")
}

#[cfg(not(target_os = "linux"))]
pub fn set_tcp_fastopen_backlog(_fd: RawFd, _backlog: usize) -> Result<()> {
Ok(())
}

/// connect() to the given address while optionally binding to the specific source address.
///
/// The `set_socket` callback can be used to tune the socket before `connect()` is called.
///
/// `IP_BIND_ADDRESS_NO_PORT` is used.
pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result<TcpStream> {
pub(crate) async fn connect_with<F: FnOnce(&TcpSocket) -> Result<()>>(
addr: &SocketAddr,
bind_to: Option<&SocketAddr>,
set_socket: F,
) -> Result<TcpStream> {
let socket = if addr.is_ipv4() {
TcpSocket::new_v4()
} else {
Expand All @@ -252,12 +281,21 @@ pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result<
}
// TODO: add support for bind on other platforms

set_socket(&socket)?;

socket
.connect(*addr)
.await
.map_err(|e| wrap_os_connect_error(e, format!("Fail to connect to {}", *addr)))
}

/// connect() to the given address while optionally binding to the specific source address.
///
/// `IP_BIND_ADDRESS_NO_PORT` is used.
pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result<TcpStream> {
connect_with(addr, bind_to, |_| Ok(())).await
}

/// connect() to the given Unix domain socket
pub async fn connect_uds(path: &std::path::Path) -> Result<UnixStream> {
UnixStream::connect(path)
Expand Down Expand Up @@ -334,4 +372,29 @@ mod test {
assert_eq!(recv_size, 102400 * 2);
}
}

#[cfg(target_os = "linux")]
#[ignore] // this test requires the Linux system to have net.ipv4.tcp_fastopen set
#[tokio::test]
async fn test_set_fast_open() {
use std::time::Instant;

// connect once to make sure their is a SYN cookie to use for TFO
connect_with(&"1.1.1.1:80".parse().unwrap(), None, |socket| {
set_tcp_fastopen_connect(socket.as_raw_fd())
})
.await
.unwrap();

let start = Instant::now();
connect_with(&"1.1.1.1:80".parse().unwrap(), None, |socket| {
set_tcp_fastopen_connect(socket.as_raw_fd())
})
.await
.unwrap();
let connection_time = start.elapsed();

// connect() return right away as the SYN goes out only when the first write() is called.
assert!(connection_time.as_millis() < 4);
}
}
30 changes: 27 additions & 3 deletions pingora-core/src/upstreams/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Defines where to connect to and how to connect to a remote server

use ahash::AHasher;
use pingora_error::{ErrorType::InternalError, OrErr, Result};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -168,6 +169,13 @@ pub trait Peer: Display + Clone {
self.get_peer_options().and_then(|o| o.tcp_recv_buf)
}

/// Whether to enable TCP fast open.
fn tcp_fast_open(&self) -> bool {
self.get_peer_options()
.map(|o| o.tcp_fast_open)
.unwrap_or_default()
}

fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
self.address().check_fd_match(fd)
}
Expand All @@ -186,11 +194,24 @@ pub struct BasicPeer {
}

impl BasicPeer {
/// Create a new [`BasicPeer`]
/// Create a new [`BasicPeer`].
pub fn new(address: &str) -> Self {
let addr = SocketAddr::Inet(address.parse().unwrap()); // TODO: check error
Self::new_from_sockaddr(addr)
}

/// Create a new [`BasicPeer`] with the given path to a Unix domain socket.
pub fn new_uds<P: AsRef<Path>>(path: P) -> Result<Self> {
let addr = SocketAddr::Unix(
UnixSocketAddr::from_pathname(path.as_ref())
.or_err(InternalError, "while creating BasicPeer")?,
);
Ok(Self::new_from_sockaddr(addr))
}

fn new_from_sockaddr(sockaddr: SocketAddr) -> Self {
BasicPeer {
_address: SocketAddr::Inet(address.parse().unwrap()), // TODO: check error, add support
// for UDS
_address: sockaddr,
sni: "".to_string(), // TODO: add support for SNI
options: PeerOptions::new(),
}
Expand Down Expand Up @@ -287,6 +308,8 @@ pub struct PeerOptions {
pub curves: Option<&'static str>,
// see ssl_use_second_key_share
pub second_keyshare: bool,
// whether to enable TCP fast open
pub tcp_fast_open: bool,
// use Arc because Clone is required but not allowed in trait object
pub tracer: Option<Tracer>,
}
Expand Down Expand Up @@ -314,6 +337,7 @@ impl PeerOptions {
extra_proxy_headers: BTreeMap::new(),
curves: None,
second_keyshare: true, // default true and noop when not using PQ curves
tcp_fast_open: false,
tracer: None,
}
}
Expand Down
4 changes: 3 additions & 1 deletion pingora/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ pub fn main() {
.unwrap();

let mut echo_service_http = service::echo::echo_service_http();
echo_service_http.add_tcp("0.0.0.0:6145");
let mut options = pingora::listeners::TcpSocketOptions::default();
options.tcp_fastopen = Some(10);
echo_service_http.add_tcp_with_settings("0.0.0.0:6145", options);
echo_service_http.add_uds("/tmp/echo.sock", None);

let dynamic_cert = DynamicCert::new(&cert_path, &key_path);
Expand Down