Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using hyper as reverse proxy like in the example yields Connection failed: hyper::Error(Shutdown, Os { code: 107, kind: NotConnected, message: "Transport endpoint is not connected" }) #2959

Closed
firstdorsal opened this issue Aug 24, 2022 · 3 comments
Labels
C-bug Category: bug. Something is wrong. This is bad!

Comments

@firstdorsal
Copy link

firstdorsal commented Aug 24, 2022

Version

hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
anyhow="1"
tokio-rustls={ version = "0.23.4"}
futures-util="0.3.23"
rustls="0.20.6"
rustls-pemfile="1"
http="0.2.8"
hyper-tls="0.5.0"

Platform
Linux blackbox 5.19.1-arch2-1 #1 SMP PREEMPT_DYNAMIC Thu, 11 Aug 2022 16:06:13 +0000 x86_64 GNU/Linux

Description
I tried out the example http_proxy

async fn handle(mut req: Request<Body>) -> hyper::Result<Response<Body>> {
    let host = "127.0.0.1";
    let addr_str = format!("{}:{}", host, 8092);
    let addr = SocketAddr::from_str(&addr_str).unwrap();
    let stream = TcpStream::connect(addr)
        .await
        .expect("failed to connect tcp stream");

    req.headers_mut().remove("host");
    req.headers_mut()
        .append("host", HeaderValue::from_str(&addr_str).unwrap());
    //dbg!(&req);

    let (mut sender, conn) = Builder::new()
        .http1_preserve_header_case(true)
        .http1_title_case_headers(true)
        .handshake(stream)
        .await?;

    tokio::task::spawn(async move {
        if let Err(err) = conn.await {
            println!("Connection failed: {:?}", err);
        }
    });

    sender.send_request(req).await
}

I wanted to compare the performance between traefik as reverse proxy and hyper so in ran a few tests:

returning hello world from hyper

705,343.03

Running 10s test @ http://localhost:3000
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   114.46us   68.42us   1.55ms   85.65%
    Req/Sec   177.67k     2.88k  191.55k    71.71%
  7123930 requests in 10.10s, 597.86MB read
Requests/sec: 705343.03
Transfer/sec:     59.19MB

direct (no proxy) to traefik/whoami:

40,989.66

Running 10s test @ http://localhost:8092
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.62ms    6.00ms 132.21ms   93.16%
    Req/Sec    10.35k     1.21k   17.89k    97.26%
  413999 requests in 10.10s, 107.00MB read
Requests/sec:  40989.66
Transfer/sec:     10.59MB

traefik no tls to traefik/whoami:

29,725.48

Running 10s test @ http://localhost:8090
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     4.30ms    6.78ms 167.00ms   95.00%
    Req/Sec     7.52k     1.25k   25.50k    97.76%
  300222 requests in 10.10s, 132.56MB read
Requests/sec:  29725.48
Transfer/sec:     13.13MB

hyper no tls to traefik/whoami:

This works without problems when using curl for some manual requests:

*   Trying 127.0.0.1:3000...
* Connected to localhost (127.0.0.1) port 3000 (#0)
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/7.84.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Date: Wed, 24 Aug 2022 13:29:35 GMT
< Content-Length: 159
< Content-Type: text/plain; charset=utf-8
< 
Hostname: 5c1dfcbcd904
IP: 127.0.0.1
IP: 172.28.0.2
RemoteAddr: 172.28.0.1:35560
GET / HTTP/1.1
Host: 127.0.0.1:8092
User-Agent: curl/7.84.0
Accept: */*

* Connection #0 to host localhost left intact

but then yields the following for every request (approximately after the first 100-1000) when hitting it with wrk:

Connection failed: hyper::Error(Shutdown, Os { code: 107, kind: NotConnected, message: "Transport endpoint is not connected" })
Running 10s test @ http://localhost:3000
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     9.81ms    7.41ms  67.30ms   72.20%
    Req/Sec     2.53k     1.55k    5.88k    75.00%
  28230 requests in 10.09s, 7.30MB read
  Socket errors: connect 0, read 10363, write 0, timeout 0
Requests/sec:   2797.67
Transfer/sec:    740.40KB

More info

main.rs

use http::HeaderValue;
use hyper::client::conn::Builder;
use hyper::server::conn::AddrIncoming;
use hyper::service::{make_service_fn, service_fn};
use hyper::upgrade::Upgraded;
use hyper::Method;
use hyper::{Body, Request, Response, Server};
use lib::tls::{error, load_certs, load_private_key, TlsAcceptor};
use lib::utils::{host_addr, tunnel};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let http_addr = SocketAddr::from_str("127.0.0.1:3000")?;

    let service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

    let tls_cfg = {
        let certs = load_certs("cert.pem")?;
        let key = load_private_key("key.pem")?;
        let mut cfg = rustls::ServerConfig::builder()
            .with_safe_defaults()
            .with_no_client_auth()
            //.with_cert_resolver(cert_resolver) select which cert to use
            .with_single_cert(certs, key)
            .map_err(|e| error(format!("{}", e)))?;
        // Configure ALPN to accept HTTP/2, HTTP/1.1 in that order.
        cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
        std::sync::Arc::new(cfg)
    };

    //let server =
    //    Server::builder(TlsAcceptor::new(tls_cfg, AddrIncoming::bind(&http_addr)?)).serve(service);
    let server = Server::bind(&http_addr).serve(service);

    //.bind(&http_addr).serve(make_svc);

    let graceful = server.with_graceful_shutdown(shutdown_signal());

    if let Err(e) = graceful.await {
        eprintln!("server error: {}", e);
    }
    Ok(())
}

async fn shutdown_signal() {
    tokio::signal::ctrl_c()
        .await
        .expect("failed to install CTRL+C signal handler");
}

async fn handle(mut req: Request<Body>) -> hyper::Result<Response<Body>> {
    let host = "127.0.0.1";
    let addr_str = format!("{}:{}", host, 8092);
    let addr = SocketAddr::from_str(&addr_str).unwrap();
    let stream = TcpStream::connect(addr)
        .await
        .expect("failed to connect tcp stream");

    req.headers_mut().remove("host");
    req.headers_mut()
        .append("host", HeaderValue::from_str(&addr_str).unwrap());
    //dbg!(&req);

    let (mut sender, conn) = Builder::new()
        .http1_preserve_header_case(true)
        .http1_title_case_headers(true)
        .handshake(stream)
        .await?;

    tokio::task::spawn(async move {
        if let Err(err) = conn.await {
            println!("Connection failed: {:?}", err);
        }
    });

    sender.send_request(req).await
}

tls.rs

use core::task::{Context, Poll};
use futures_util::ready;
use hyper::server::accept::Accept;
use hyper::server::conn::{AddrIncoming, AddrStream};
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::rustls::ServerConfig;

pub fn error(err: String) -> io::Error {
    io::Error::new(io::ErrorKind::Other, err)
}

// Load public certificate from file.
pub fn load_certs(filename: &str) -> io::Result<Vec<rustls::Certificate>> {
    // Open certificate file.
    let certfile = std::fs::File::open(filename)
        .map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
    let mut reader = io::BufReader::new(certfile);

    // Load and return certificate.
    let certs = rustls_pemfile::certs(&mut reader)
        .map_err(|_| error("failed to load certificate".into()))?;
    Ok(certs.into_iter().map(rustls::Certificate).collect())
}

// Load private key from file.
pub fn load_private_key(filename: &str) -> io::Result<rustls::PrivateKey> {
    // Open keyfile.
    let keyfile = std::fs::File::open(filename)
        .map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;

    let mut reader = io::BufReader::new(keyfile);

    // Load and return a single private key.
    let keys = rustls_pemfile::pkcs8_private_keys(&mut reader)
        .map_err(|_| error("failed to load private key".into()))?;
    if keys.len() != 1 {
        return Err(error("expected a single private key".into()));
    }

    Ok(rustls::PrivateKey(keys[0].clone()))
}

enum State {
    Handshaking(tokio_rustls::Accept<AddrStream>),
    Streaming(tokio_rustls::server::TlsStream<AddrStream>),
}

// tokio_rustls::server::TlsStream doesn't expose constructor methods,
// so we have to TlsAcceptor::accept and handshake to have access to it
// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first
pub struct TlsStream {
    state: State,
}

impl TlsStream {
    fn new(stream: AddrStream, config: Arc<ServerConfig>) -> TlsStream {
        let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream);
        TlsStream {
            state: State::Handshaking(accept),
        }
    }
}

impl AsyncRead for TlsStream {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &mut ReadBuf,
    ) -> Poll<io::Result<()>> {
        let pin = self.get_mut();
        match pin.state {
            State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) {
                Ok(mut stream) => {
                    let result = Pin::new(&mut stream).poll_read(cx, buf);
                    pin.state = State::Streaming(stream);
                    result
                }
                Err(err) => Poll::Ready(Err(err)),
            },
            State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf),
        }
    }
}

impl AsyncWrite for TlsStream {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        let pin = self.get_mut();
        match pin.state {
            State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) {
                Ok(mut stream) => {
                    let result = Pin::new(&mut stream).poll_write(cx, buf);
                    pin.state = State::Streaming(stream);
                    result
                }
                Err(err) => Poll::Ready(Err(err)),
            },
            State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf),
        }
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        match self.state {
            State::Handshaking(_) => Poll::Ready(Ok(())),
            State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx),
        }
    }

    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        match self.state {
            State::Handshaking(_) => Poll::Ready(Ok(())),
            State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx),
        }
    }
}

pub struct TlsAcceptor {
    config: Arc<ServerConfig>,
    incoming: AddrIncoming,
}

impl TlsAcceptor {
    pub fn new(config: Arc<ServerConfig>, incoming: AddrIncoming) -> TlsAcceptor {
        TlsAcceptor { config, incoming }
    }
}

impl Accept for TlsAcceptor {
    type Conn = TlsStream;
    type Error = io::Error;

    fn poll_accept(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
        let pin = self.get_mut();
        match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) {
            Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))),
            Some(Err(e)) => Poll::Ready(Some(Err(e))),
            None => Poll::Ready(None),
        }
    }
}

utils.rs

use hyper::upgrade::Upgraded;
use tokio::net::TcpStream;

pub fn host_addr(uri: &http::Uri) -> Option<String> {
    uri.authority().map(|auth| auth.to_string())
}

// Create a TCP connection to host:port, build a tunnel between the connection and
// the upgraded connection
pub async fn tunnel(mut upgraded: Upgraded, addr: String) -> std::io::Result<()> {
    // Connect to remote server
    let mut server = TcpStream::connect(addr).await?;

    // Proxying data
    let (from_client, from_server) =
        tokio::io::copy_bidirectional(&mut upgraded, &mut server).await?;

    // Print message when done
    println!(
        "client wrote {} bytes and received {} bytes",
        from_client, from_server
    );

    Ok(())
}

lib.rs

pub mod tls;
pub mod utils;

traefik/whoami docker compose

docker-compose.yml

version: "3.9"
services:
    whoami:
        image: traefik/whoami
        ports:
            - 8092:80
        restart: always
@firstdorsal firstdorsal added the C-bug Category: bug. Something is wrong. This is bad! label Aug 24, 2022
@seanmonstar
Copy link
Member

The example on master does not use a connection pool (master is going through some changes). The example from 0.14.x does, and you should see better results with that.

@firstdorsal
Copy link
Author

awesome, thank you!

@seanmonstar seanmonstar closed this as not planned Won't fix, can't repro, duplicate, stale Aug 24, 2022
@firstdorsal
Copy link
Author

Works as expected!
And it's faster than traefik!

Running 10s test @ http://localhost:3000
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.22ms    2.97ms  34.03ms   84.55%
    Req/Sec     8.96k   538.78    17.52k    90.00%
  356591 requests in 10.04s, 92.16MB read
Requests/sec:  35533.56
Transfer/sec:      9.18MB

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-bug Category: bug. Something is wrong. This is bad!
Projects
None yet
Development

No branches or pull requests

2 participants