Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming response bodies #189

Open
kbillings opened this issue Mar 22, 2018 · 9 comments
Open

Streaming response bodies #189

kbillings opened this issue Mar 22, 2018 · 9 comments
Labels

Comments

@kbillings
Copy link

@kbillings kbillings commented Mar 22, 2018

Is it possible to stream a response?

I tried using Body::pair() but I was not able to get it working.

@smangelsdorf

This comment has been minimized.

Copy link
Contributor

@smangelsdorf smangelsdorf commented Mar 22, 2018

This is definitely something I'd like to see supported, but is not something that has received any attention yet. When you used Body::pair(), did you see a type error, or something else?

@kbillings

This comment has been minimized.

Copy link
Author

@kbillings kbillings commented Mar 22, 2018

Well I first tried this:

let mut response = Response::new();
set_headers(&state, &mut response, Some(::mime::TEXT_PLAIN), None);
let (sender, body) = Body::pair();

let stream = stream::iter_ok(vec![
    Chunk::from("1\n"),
    Chunk::from("2\n"),
    Chunk::from("3\n")
].into_iter()).map(Ok);

response.set_body(body);

Box::new(
    sender.send_all(stream)
        .map_err(|e| e.into_handler_error())
        .then(|x| match x {
            Ok(_) => Ok((state, response)),
            Err(e) => Err((state, e)),
        })
)

But it just waits forever not returning anything. Then I tried this:

let mut response = Response::new();
set_headers(&state, &mut response, Some(::mime::TEXT_PLAIN), None);
let (sender, body) = Body::pair();

thread::spawn(|| {
    let stream = stream::iter_ok(vec![
        Chunk::from("1\n"),
        Chunk::from("2\n"),
        Chunk::from("3\n")
    ].into_iter()).map(Ok);

    sender.send_all(stream).wait().unwrap()
});

response.set_body(body);

But that just returns immediately with no body.

Gotham currently forces the body of a response to be a hyper::Body, if that was lifted you could set the body to any Stream<Item = Chunk> and I think it would work.

@smangelsdorf

This comment has been minimized.

Copy link
Contributor

@smangelsdorf smangelsdorf commented Mar 23, 2018

Thanks. I'll have a look into this.

@smangelsdorf smangelsdorf self-assigned this Mar 23, 2018
@millardjn

This comment has been minimized.

Copy link

@millardjn millardjn commented Apr 21, 2018

I think the problem in @kbillings's example is due to the ContentLength header is being set to 0 by set_headers(&state, &mut response, Some(::mime::TEXT_PLAIN), None);, preventing continued streaming of the response.

I just got streaming working with:

let mut res = Response::new();
set_headers(&state, &mut res, Some(ext_to_mime(&ext)), None);
res.headers_mut().set(CacheControl(vec![
			CacheDirective::MaxAge(86400u32),
			CacheDirective::Public,
		]));
res.headers_mut().remove::<ContentLength>();

let (sender, body) = Body::pair();
res.set_body(body);

let stream = FS_POOL.read(pathbuf)
	.map(|bytes| Ok(bytes.into()));

let sender = sender
	.sink_map_err(|e| hyper::Error::from(::std::io::Error::new(::std::io::ErrorKind::Other, e)));

let streaming_future = sender.send_all(stream)
	.map(|(_sink, _stream)| ())
	.map_err(|_e| error!("Streaming error"));

// Pass streaming future to tokio
// TODO handle at_capacity error
DefaultExecutor::current().spawn(Box::new(streaming_future)).unwrap();

Box::new(future::ok((state, res)))

where FS_POOL is a futures_fs::FsPool returning stream from a file.

@smangelsdorf

This comment has been minimized.

Copy link
Contributor

@smangelsdorf smangelsdorf commented Apr 22, 2018

Thanks @millardjn … That would explain what I saw when I looked at this briefly. Does this fix it for you @kbillings?

@ChristophWurst

This comment has been minimized.

Copy link
Contributor

@ChristophWurst ChristophWurst commented Apr 25, 2018

Thanks for this example code, @millardjn! Did you test it with larger files? I just wrote an integration of the futures-fs crate for gotham and while streaming to responses works for small text files, it gets stuck with larger files after the FsReadStream delivers two chunks.

I've also tried a different approach where I used a hyper::Response<Box<Stream<Item = Chunk, Error = hyper::Error>>> which works from a Hyper perspective, but Gotham assumes handler futures to resolve to a Response<Body> it seems and therefore I got compilation errors.

You can find my PoC here: https://github.com/ChristophWurst/gotham-middleware-fs/pull/1/files

@millardjn

This comment has been minimized.

Copy link

@millardjn millardjn commented Apr 25, 2018

Mostly image files up to 10-20 MB, nothing in the GB/TB range, but enough that I should be way past 2 chunks. All I can think of is that the future from the stream-sink interaction has to be handed to a tokio executor or a new thread so it keeps getting polled independently of the response future.

Good idea with the middleware. If you want to share the stalled code in a gist I'll take a look.

@ChristophWurst

This comment has been minimized.

Copy link
Contributor

@ChristophWurst ChristophWurst commented Apr 25, 2018

Mostly image files up to 10-20 MB, nothing in the GB/TB range, but enough that I should be way past 2 chunks.

My test file was a ~80 MB one. According to the printf statement I added for debugging purposes two chunks of 8192 bytes are read before it stalls.

All I can think of is that the future from the stream-sink interaction has to be handed to a tokio executor or a new thread so it keeps getting polled.

I'm returning the future which resolves when the read stream has been forwarded to the body stream. So I assumed that using this future to build the request handler future will eventually be passed to the event loop by Gotham and therefore will be polled.

If you want to share the stalled code in a gist I'll take a look.

See https://github.com/ChristophWurst/gotham-middleware-fs/pull/1/files and the newly added response_stream example. Replace Cargo.toml with a large file on your disk and start it with cargo run --example response_stream.

@kpcyrd

This comment has been minimized.

Copy link

@kpcyrd kpcyrd commented May 28, 2019

I ran into this issue as well and it it's difficult to get the snippets to work without a Cargo.toml and imports, can somebody add this to the examples/ folder? :)

@smangelsdorf smangelsdorf removed their assignment May 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.