Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PUSH example #412

Closed
nappa85 opened this issue Sep 26, 2019 · 11 comments
Closed

PUSH example #412

nappa85 opened this issue Sep 26, 2019 · 11 comments

Comments

@nappa85
Copy link

nappa85 commented Sep 26, 2019

Hellom
I'm trying to use the new push_promise feature, I edited the server example to reply a index.html page and push a script.js file, but it ends up with a PROTOCOL_ERROR and I'm unable to debug it:


use http::{Response, Request, StatusCode};

use tokio::net::TcpListener;

#[tokio::main]
pub async fn main() {
    let mut listener = TcpListener::bind("127.0.0.1:5927").await.expect("Bind error");

    // Accept all incoming TCP connections.
    loop {
        if let Ok((socket, _peer_addr)) = listener.accept().await {
            // Spawn a new task to process each connection.
            tokio::spawn(async move {
                // Start the HTTP/2.0 connection handshake
                let mut h2 = server::handshake(socket).await.expect("Handshake error");
                // Accept all inbound HTTP/2.0 streams sent over the
                // connection.
                while let Some(request) = h2.accept().await {
                    let (request, mut respond) = request.expect("Incoming request error");
                    println!("Received request: {:?}", request);

                    // Build a response with no body
                    let response = Response::builder()
                        .status(StatusCode::OK)
                        .body(())
                        .expect("First response build error");

                    // Send the response back to the client
                    let mut stream = respond.send_response(response, false)
                        .expect("First response send error");

                    let contents = b"index.html contents".to_vec();
                    stream.send_data(contents.into(), true).expect("index.html send error");

                    // prepare the PUSH request
                    let request = Request::builder()
                        .uri("script.js")
                        .body(())
                        .expect("script.js request build error");
                    
                    // init the PUSH sequence
                    let mut push = respond.push_request(request)
                        .expect("script.js request send error");

                    // Build a response with no body
                    let response = Response::builder()
                        .status(StatusCode::OK)
                        .body(())
                        .expect("Second response build error");

                    // Send the response back to the client
                    let mut stream = push.send_response(response, false)
                        .expect("Second response send error");

                    let contents = b"script.js contents".to_vec();
                    stream.send_data(contents.into(), true).expect("script.js send error");
                }
            });
        }
    }
}

Can you please help me creating a working PUSH example?
Thank you

@RuralDependencies

This comment has been minimized.

@michaelbeaumont
Copy link
Contributor

Could it be that you're ending the initial stream (second argument to send_data):

 stream.send_data(contents.into(), true).expect("index.html** send error");

and then pushing a request on it? Try pushing before sending end of stream.

@nappa85
Copy link
Author

nappa85 commented Sep 30, 2019

The error becomes CANCEL.

Sending only the PUSH, the error is still a CANCEL.

Reverting the operations order, sending first the PUSH and then the standard response, there is no error, but the client doesn't see the PUSH, only the standard response.

@nappa85
Copy link
Author

nappa85 commented Sep 30, 2019

I used wireshark to look at the data transmitted (sorry for not thinking about it before).
With any combination (first standard response then PUSH, reverse order, only push, with or without closing the stream) I've never seen PUSH data passing by

@michaelbeaumont
Copy link
Contributor

What exactly don't you see being transmitted? Perhaps your client doesn't support push messages properly.

See examples/server.rs improved with pushing:
michaelbeaumont@d5219dc

When using nghttp as client, this works for me.

@nappa85
Copy link
Author

nappa85 commented Oct 1, 2019

The client I'm using is the client example from this crate, improved with push promises:

use std::error::Error;

use h2::client;

use http::{Request, Method};

use tokio::net::TcpStream;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    // Establish TCP connection to the server.
    let tcp = TcpStream::connect("127.0.0.1:5927").await?;
    let (h2, connection) = client::Builder::new()
        .enable_push(true)
        .handshake::<_, Vec<u8>>(tcp).await?;
    tokio::spawn(async move {
        connection.await.expect("Connection error");
    });

    let mut h2 = h2.ready().await?;
    // Prepare the HTTP request to send to the server.
    let request = Request::builder()
                    .method(Method::GET)
                    .uri("https://www.example.com/")
                    .body(())
                    .expect("Request error");

    // Send the request. The second tuple item allows the caller
    // to stream a request body.
    let (mut response, _) = h2.send_request(request, true).expect("Send request error");

    let mut pushes = response.push_promises();
    let (head, mut body) = response.await?.into_parts();

    println!("Received response: {:?}", head);

    // The `release_capacity` handle allows the caller to manage
    // flow control.
    //
    // Whenever data is received, the caller is responsible for
    // releasing capacity back to the server once it has freed
    // the data from memory.
    let mut release_capacity = body.release_capacity().clone();

    while let Some(chunk) = body.data().await {
        let push = pushes.push_promise().await;
        println!("push {:?}", push);

        println!("raw data {:?}", chunk);
        let chunk = chunk?;
        println!("RX: {:?}", chunk);

        // Let the server send more data.
        let _ = release_capacity.release_capacity(chunk.len());
    }

    Ok(())
}

@nappa85
Copy link
Author

nappa85 commented Oct 1, 2019

Ok, so, having your working example, turns out the only thing to change in my server was the building of the PUSH Request, using the incoming URI as base:

        let mut pushed_uri_parts: uri::Parts  = request.into_parts().0.uri.into();
        pushed_uri_parts.path_and_query = uri::PathAndQuery::from_static("/script.js").into();

        let pushed_req = Request::builder()
            .uri(uri::Uri::from_parts(pushed_uri_parts).unwrap())
            .body(())
            .expect("script.js request build error");

And the client was working correctly.

@ivanceras
Copy link

The example works now, and I have a new problem: Adding another push request will result an error.

@ivanceras
Copy link

use h2::server;

use bytes::*;
use http::uri;
use http::{Request, Response, StatusCode};

use std::error::Error;
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let _ = env_logger::try_init();

    let mut listener = TcpListener::bind("127.0.0.1:5928").await?;

    println!("listening on {:?}", listener.local_addr());

    loop {
        if let Ok((socket, _peer_addr)) = listener.accept().await {
            tokio::spawn(async move {
                if let Err(e) = handle(socket).await {
                    println!("  -> err={:?}", e);
                }
            });
        }
    }
}

async fn handle(socket: TcpStream) -> Result<(), Box<dyn Error>> {
    let mut connection = server::handshake(socket).await?;
    println!("H2 connection bound");

    while let Some(result) = connection.accept().await {
        let (request, mut respond) = result?;
        println!("GOT request: {:?}", request);
        let response = Response::builder().status(StatusCode::OK).body(()).unwrap();

        let mut pushed_uri_parts: uri::Parts  = request.into_parts().0.uri.into();
        pushed_uri_parts.path_and_query = uri::PathAndQuery::from_static("/pushed").into();
        let uri1 = uri::Uri::from_parts(pushed_uri_parts).unwrap();
        println!("uri1 {}", uri1);

        let uri2 = uri::Uri::from_static("http://127.0.0.1:5928/pushed2");
        //let uri2 = uri::Uri::from_static("https://http2.akamai.com/pushed2");

        println!("uri2 {}", uri2);

        let pushed_req = Request::builder()
            .uri(uri1)
            .body(())
            .unwrap();

        let pushed_req2 = Request::builder()
            .uri(uri2)
            .body(())
            .unwrap();

        let pushed_rsp = http::Response::builder().status(200).body(()).unwrap();
        let pushed_rsp2 = http::Response::builder().status(200).body(()).unwrap();
        let mut send_pushed = respond
            .push_request(pushed_req)
            .unwrap()
            .send_response(pushed_rsp, false)
            .unwrap();

        let mut send_pushed2 = respond
            .push_request(pushed_req2)
            .unwrap()
            .send_response(pushed_rsp2, false)
            .unwrap();

        let mut send = respond.send_response(response, false)?;

        println!(">>>> pushing data");
        send_pushed.send_data(Bytes::from_static(b"Pushed data!\n"), false)?;
        send_pushed2.send_data(Bytes::from_static(b"Another Pushed data!\n"), false)?;
        println!(">>>> sending data");
        send.send_data(Bytes::from_static(b"hello world"), true)?;
    }

    println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~");

    Ok(())
}

Using nghttp to get a response

>nghttp http://127.0.0.1:5928
  Some requests were not processed. total=1, processed=0

The error from the server says:

H2 connection bound
GOT request: Request { method: GET, uri: http://127.0.0.1:5928/, version: HTTP/2.0, headers: {"accept": "*/*", "accept-encoding": "gzip, deflate", "user-agent": "nghttp2/1.36.0"}, body: RecvStream { inner: ReleaseCapacity { inner: OpaqueStreamRef { stream_id: StreamId(13), ref_count: 2 } } } }
uri1 http://127.0.0.1:5928/pushed
uri2 http://127.0.0.1:5928/pushed2
>>>> pushing data
>>>> sending data
thread 'tokio-runtime-worker-0' panicked at 'attempt to subtract with overflow', src/proto/streams/streams.rs:1269:5

@nox
Copy link
Contributor

nox commented May 4, 2021

FWIW, this was where the overflow happened back in 2019:

me.refs -= 1;

@michaelbeaumont
Copy link
Contributor

That issue was fixed with #479, this issue can probably be closed.

@nox nox closed this as completed May 4, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants