Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The design of the Futures-based Body Stream #953

Closed
2 tasks done
seanmonstar opened this issue Nov 8, 2016 · 48 comments
Closed
2 tasks done

The design of the Futures-based Body Stream #953

seanmonstar opened this issue Nov 8, 2016 · 48 comments
Milestone

Comments

@seanmonstar
Copy link
Member

seanmonstar commented Nov 8, 2016

Currently exists as a futures::Stream of http::Chunk. The idea of Chunk is to keep the internals private, to allow for optimizations to happen without breaking changes, and then add From implementations that users can take advantage of.

Besides Vec<u8>, it will also likely have a form of AppendBuf, which will allow the sending slices from the same buffer when reading in a message body, instead of allocating an arbitrarily sized Vec, reading, and then shrinking.

It could also want some way to prevent double copies, as much as possible, when a user is creating Chunks.

/cc #934


Thanks to discussion below, here's the action items to complete this:

  • Allow the user-supplied body to be generic over any Stream
  • Replace MemBuf with the bytes crate in Chunk
@seanmonstar seanmonstar added this to the 0.10 milestone Nov 8, 2016
@pimeys
Copy link

pimeys commented Jan 18, 2017

Could you open a bit how this is supposed to work currently? I'm trying switch to the hyper HEAD and having a problem reading the body with this code:

let body_f = response.body().fold(body_vec, |mut acc, chunk| {
    acc.extend_from_slice(chunk.as_ref());
    Ok(acc)
}).map_err(|_| { response::FcmError::ServerError(None) });

Where compiler gives me:

error[E0283]: type annotations required: cannot resolve `hyper::Error: std::convert::From<_>`
   |
82 |             let body_f = response.body().fold(body_vec, |mut acc, chunk| {
   |                                          ^^^^

Now, is there some good practice how to read the body to a vector without blocking?

@seanmonstar
Copy link
Member Author

I have hit this exact same error when I was writing tests. I'll check in with the tokio team about the error.

I also hope to write some guides explaining how to stream a body in hyper.

@alexcrichton
Copy link
Contributor

@pimeys I think the error you're seeing there is just due to type inference in the compiler and the Stream trait. Right now fold is generic over the error returned, working with From to switch error types if necessary.

The error in your example I believe is that Ok(acc) doesn't have a known error type and the compiler is unable to infer what it should be. You can fix this with Ok::<_, hyper::Error>(acc) (eew) for now and we may want to consider making fold less generic in the future.

@pimeys
Copy link

pimeys commented Jan 18, 2017

Pretty much got it working and got my first proper use case with hyper-tokio client to compile:

https://github.com/pimeys/fcm-rust/blob/tokio/src/client/mod.rs#L82

I'm trying to write an asynchronious google push notification client with Tokio. I needed to modify Hyper a bit to to be able to modify headers and have access to some internal things which I think I should be able to refactor at some point.

But god damn, it works. Now I need to get this working with HEAD, get it fast and get rid of that Handle passing in the constructor, as noted here #1002.

But first, I want to have this working on our use case, and we will have a handle to use :)

@seanmonstar
Copy link
Member Author

@pimeys great! I'm curious about what modifications you need to make, if you could open separate issues about them, so can we keep the focus here of the design of the Body stream (and Chunk).

@seanmonstar
Copy link
Member Author

So, the Chunk has had 2 variants added to it internally.

  • Internally all reads use an internal MemBuf, which is a shared append-buf. When slices are needed, such as sending a Chunk on the Body, a MemSlice is made, which just increments a refcount, and updates the MemBuf to not touch that range of memory.
  • It can accept a Arc<Vec<u8>>, motivated by a usecase reported that a server had built up a cache of responses, and didn't want to copy the body response for every request, instead just sending an Arc copy. This one seems more arbitrary, and maybe could be adjusted.

Does it make sense to also integrate with tokio's EasyBuf? Maybe that would replace the Arc<Vec<u8>> variant of point 2? And it might make it easier to integrate with other tokio components?

Note that tokio's EasyBuf currently isn't suitable for internal use in hyper, but maybe future updates will let it be so.

@scottlamb
Copy link

scottlamb commented Jan 23, 2017

This API already will make it possible to avoid copying in several cases, but I'd prefer it use the owning_ref crate. That would let callers can supply anything that can give them a &[u8] owned in any of the usual ways. That is, replace this:

enum Inner {
    Owned(Vec<u8>),
    Referenced(Arc<Vec<u8>>),
    Mem(MemSlice),
    Static(&'static [u8]),
    // ...possibly EasyBuf as mentioned in the previous comment...
}

with this:

enum Inner {
    Static(&'static [u8]),
    Vec(owning_ref::VecRef<[u8]>),
    Box(owning_ref::BoxRef<[u8]>),
    Arc(owning_ref::ArcRef<[u8]>),
    // ...I also mentioned owning_ref::RcRef in #934 but now think it wouldn't actually work...
}

and add From implementations that accept the respective owning_refs. All of the existing Froms could be matched with the new Inner.

Here are a couple examples of how I'd use the extra flexibility in a real program (which does HTTP range serving of .mp4 files with the actual frames of video taken from files on disk and everything else from a SQLite database by dividing up into a bunch of "slices" of the file's byte positions generated in various ways):

  • running arbitrary code on Drop. I have a little struct that wraps memmap::Mmap; it does a munmap and close when dropped. I'd like to create a Chunk that references that rather than copying. (Although there's a caveat that when doing mmap, it'd also be good to control which thread reads the memory in question; if pages have to be faulted in from disk, that thread can get stuck for a long time. I don't haven't looked into how that control would work with tokio and don't know if you'll ever be interested in supporting it.)
    * subslices of a vector. I actually have a few cases where this happens: a shared per-request buffer which the boring parts of the .mp4 file are generated from (becomes a bunch of slices/Chunks interspersed with ones generated in other ways), slices that are most efficiently generated all or nothing then trimmed to become Chunks of the HTTP byte range request doesn't want all of them, and a few slices that are always generated together and can be placed in a single allocation.

(I think my use case stresses this API more than most; I'd be happy to tell you more about it if you're interested.)

The only other improvement I can think of would be if the stuff that needs to reference a per-request object could avoid reference counting by the chunks having some per-request lifetime bound. But I haven't done any benchmarking to see if an atomic reference count on each chunk is a noticeable performance impact in my program (or any realistic scenario, for that matter), and I'm not even sure what I'm vaguely describing is possible in safe Rust code.

@seanmonstar
Copy link
Member Author

@scottlamb I tried looking through the owning_ref crate, and had a hard time understanding it's purpose.

As for memmaped slices, that sounds like the possibility of a bad time, if the memory is on disk and not in RAM when hyper tries to deref the slice to write into the socket. That blocking file IO could really hurt the performance of the event loop. I'm also not sure how to ask for an async deref, such that the loading could be done on another thread. It sounds like it would complicate a lot of things.

@scottlamb
Copy link

As for memmaped slices, that sounds like the possibility of a bad time, if the memory is on disk and not in RAM when hyper tries to deref the slice to write into the socket. That blocking file IO could really hurt the performance of the event loop. I'm also not sure how to ask for an async deref, such that the loading could be done on another thread. It sounds like it would complicate a lot of things.

Okay. I'm not too surprised to hear you say that. I was thinking about it; instead of controlling which thread accesses the chunk, maybe I can get the same effect by calling mlock from a thread I choose before giving you the chunk. I probably want to use smaller chunks then (so I can send the first byte of the file before the last byte is read), and I have to be more careful about how much I have in flight to avoid running out of memory, so it's more complexity at the application layer, but I think it's possible and maybe even simpler overall.

Anyway, if the mlock approach doesn't work out and I end up wanting help from hyper with controlling the thread that accesses the memory, I'll open a separate issue as it's clearly a whole other thing to consider.

I tried looking through the owning_ref crate, and had a hard time understanding it's purpose.

I just worked through an example which I think will make this clearer to both of us. (It showed me that I had the wrong Inner definition above which probably wasn't helping.)

Here I'm using it for the mmap case. I'm starting with a Box<memmap::Mmap>, turning that into a BoxRef<memmap::Mmap, memmap::Mmap>, using the map member function to turn that into a BoxRef<memmap::Mmap, [u8]>, and then using the erase_owner member function to turn that into an ErasedBoxRef<[u8]>. The Chunk can keep around an ErasedBoxRef<[u8]> and know that it has a [u8] it can keep as long as its wants. When it drops that reference, the Box is dropped, and the munmap call happens along the way. (I verified with the strace utility that the munmap call actually happens as hoped.)

extern crate owning_ref;
extern crate libc;
extern crate memmap;

use std::io;
use std::fs::File;

pub struct Chunk(Inner);

enum Inner {
    Static(&'static [u8]),
    Vec(owning_ref::VecRef<u8, [u8]>),
    Box(owning_ref::ErasedBoxRef<[u8]>),
    Arc(owning_ref::ErasedArcRef<[u8]>),
}

impl<T: 'static> From<owning_ref::BoxRef<T, [u8]>> for Chunk {
    #[inline]
    fn from(r: owning_ref::BoxRef<T, [u8]>) -> Chunk {
        Chunk(Inner::Box(r.erase_owner()))
    }
}

fn main() {
    let f = File::open("f").unwrap();
    let mmap = Box::new(
    memmap::Mmap::open(&f, memmap::Protection::Read).unwrap());
    if unsafe { libc::mlock(mmap.ptr() as *const libc::c_void, mmap.len()) } < 0 {
        panic!("{}", io::Error::last_os_error());
    }
    let mmap = owning_ref::BoxRef::new(mmap);
    let mmap = mmap.map(|m| unsafe { m.as_slice() });
    let chunk: Chunk = mmap.into();
}

Likewise, when I want to trim a vector before sending it out on the wire, I can stuff it into a VecRef<u8, [u8]> and then use map to get a subslice to put into the chunk. The chunk owns the whole vector but only sees part of its contents.

@scottlamb
Copy link

I have a dumb question orthogonal to our owning_ref discussion: how do I send more than one Chunk? Is there an example somewhere?

In particular, a Service returns a Future<Item=hyper::server::Response, Error=hyper::Error>. If I have just one Chunk, I can call set_body(chunk) on the Response before returning it. But if I have more than one Chunk, that deadlocks. (The TokioBody has a mpsc::channel(0), which only allows the one Sender to have one value in flight.) So I need to send the chunks after the future is resolved. So I need to call ::tokio_core::Handle::spawn myself with the future, I think? But how do I get access to the Handle from my Service? My best guess so far is to stash away server.handle().remote() into a lazy_static from main, then use Remote::spawn from my Service to get a handle to launch futures. That doesn't seem ideal...

@seanmonstar
Copy link
Member Author

I think I understand now how owning_ref works. Essentially, having b"Hello World".to_vec(), but wanting to send a chunk that only references the bytes of b"World", right?

I get the appeal of that, and actually find that pretty neat. At the same time, I'm wary of linking hyper's public API to owning_ref, if for no other reason than I feel like owning_ref is an obscure crate. Maybe that feeling is baseless, or maybe there's another way to get a similar affect without relying on that crate?


Regarding sending multiple chunks, it assumes you don't have the multiple chunks ready yet to send (I would expect that normally someone has the bytes bundled into a single chunk). So, you must spawn some sort of other task that will try to send more chunks into the body as they become available, and the body has room. This could be a CPU task from futures-cpupool, or some other task in tokio (such as another network request).

If doing that feels bad, we should definitely try to find how to make that better. That'd partly be up to the design of tokio itself, and so possibly people would have input on the tokio-proto repo or the tokio gitter room, but it's also something that we can explore (and push the feedback in to tokio). We can ping Alex or Carl if we have specific feedback, too.

@scottlamb
Copy link

I think I understand now how owning_ref works. Essentially, having b"Hello World".to_vec(), but wanting to send a chunk that only references the bytes of b"World", right?

Yeah, that's among the things you can do with it, via something like this:

let v = owning_ref::VecRef(b"Hello World".to_vec()).map(|v| &v[6..]);
let chunk: Chunk = v.into();

At the same time, I'm wary of linking hyper's public API to owning_ref, if for no other reason than I feel like owning_ref is an obscure crate. Maybe that feeling is baseless, or maybe there's another way to get a similar affect without relying on that crate?

I dunno what to say about that feeling—on the one hand, it has an order of magnitude fewer downloads than hyper, on the other it's a relatively small amount of code and has been maintained for the past 18 months.

For the b"Hello World" -> b"World" case, it looks like there's a ::tokio_core::io::Window that does something similar. But I don't think it does the type erasure thing, so I don't know how I'd use it for the mmap stuff. And I think you'd need basically a non-window + window version of each of the current enum variants.

So, you must spawn some sort of other task that will try to send more chunks into the body as they become available, and the body has room.

How do I do that? I think I need a ::tokio_core::Handle and don't know where to get one easily. Maybe I'm just asking for it to be given as a parameter to NewService::new_service. Or maybe what I want is already easy and I'm missing it; I'm totally new to tokio...

@theduke
Copy link

theduke commented Jan 31, 2017

Body should really have a convenience method, maybe fold_to_vec or collect_to_vec that yields a Future<Item=Vec<u8>, Error=hyper::Error>.

@scottlamb
Copy link

fwiw, I think that's a one-liner already (untested): body.fold(Vec::new(), |a, b| { a.extend_from_slice(&b[..]); futures::ok(a) })

@pimeys
Copy link

pimeys commented Jan 31, 2017

I've tested the folding and it works. Although it was not very obvious, good documentation would help here.

@abonander
Copy link
Contributor

@seanmonstar

I'm working out a plan on how to build a fork of multipart on top of the async API once the dust settles. For the server-side API, I'm thinking of having a Multipart master struct which then becomes a stream of MultipartField, then each MultipartField is a stream of Chunk.

Of course, the main problem that multipart solves is deliminating on the multipart boundary. What I want to do is yield Chunks for each field until the boundary is hit, and then yield the data in that Chunk up to the boundary. If the byte subsequence turns out not to be the multipart boundary (such as if it was split between chunks), it needs to be yielded as well. I would prefer to do this without copying the data in the chunks and creating the new ones, if possible.

I see two potential solutions:

  • Add an API for creating an owned Chunk which is a subslice into an existing one. Memslice, which is used internally for server streams (I think), supports this already so is essentially zero-cost, same with the &'static [u8] variant. The other two variants would require copying the data but they appear to be used when the user is providing bodies and so would not be a major cost here.

  • Implement Clone for Chunk so that I can clone chunks and yield a wrapper type which subslices them on deref. The costs are comparable, but the Arc<Vec<u8>> variant would have a trivial cost as well, making Vec<u8> the only costly one, though I don't believe it's relevant to my usage anyway.

@seanmonstar
Copy link
Member Author

So, something that I think solves both @abonander and @scottlamb's cases, plus various others, is if the Chunk were basically Bytes. It has several benefits:

  • It can be used internally, completely replacing the MemBuf/MemSlice` thing. It has the same semantics, of being a shared appendable buffer.
  • The plan for tokio 0.2 is to replace EasyBuf with Bytes: Implement EasyBuf with bytes crate tokio-rs/tokio-core#68
  • The Arc<Vec> case is easily adapted to being just a Bytes that you clone.
  • For @scottlamb's case, a BytesMut can be used similar to a Vec, building up the bytes however you want. You can then slice that into a subset and send just those Bytes.
  • For @abonander's case, a Bytes can be sliced to a subset, and that can be yielded however you want.

A few questions remain regarding using Bytes in hyper:

  • If bytes 0.4 will be released soon enough, or if this shouldn't block hyper 0.11.
  • Chunk currently has a variant for static slices, which feels nice. It's fun for the hello world example, but I also really appreciated it when I made a server that replied with contents from a include_bytes!("../some_file.txt"), and knowing that it wasn't copying that full file multiple times (besides the necessary copy into socket buffer). Brief discussions with @carllerche suggested that it very well could be feasible to add static slice support into Bytes.
  • Should hyper expose the type as Bytes directly, or continue to have it be Chunk(Bytes), with From<Bytes> and Into<Bytes> implementations? Keeping the internals opaque allow for small alterations that hyper might want.

@scottlamb
Copy link

Hmm, I think that would give me the ability to subset a Chunk, but not to have it backed by something more complex than a Vec<u8> as in the mmap example I gave above.

@seanmonstar
Copy link
Member Author

@scottlamb you're right, I forgot about mmap. Asking in the tokio room, the suggested advice was basically "don't do that", because of the pauses in the event loop that can occur from loading the memory from disk. With those warnings, do you still find it compelling to try to do it anyways?

@scottlamb
Copy link

mlock from a worker thread seems like it would address the pause concern. It requires a thread handoff for each chunk, but given that no one actually uses aio_read, so does any approach to filesystem I/O other than doing writes from non-reactor threads or having enough reactor threads (with a shared event loop and/or work stealing) to not care if one blocks on disk, and I gather you and the tokio folks are looking to avoid those.

I should answer your question about if this is worthwhile with a benchmark. I'll try to work one up, but I apologize in advance for slowness: I have a tiny amount of time for my Rust projects every week and have been spreading that kinda thin, so nothing I do goes fast. I did look at it for a moment tonight and realized a problem with using owning_ref::ErasedBoxRef<[u8]>: that type isn't Send. D'oh.

Intuitively, it seems a little silly IMHO to go to the effort of the switch from sync to async (with all the pain of a non-intuitive programming model, the danger of accidentally doing slow/blocking stuff on the wrong thread, etc), in the name of performance (I think that was the main goal?) and then have to do context switches between every read and write and do extra copies. But maybe it comes down to a choice about what's important to optimize: small requests or large requests, serving from the filesystem or from elsewhere, etc. When I mention using mmap and avoiding copies, I'm thinking about filesystem-based responses much larger than the CPU cache.

@abonander
Copy link
Contributor

What about posix_memadvise and PrefetchVirtualMemory to advise the OS to page-in the mmap'ed region? You could issue that call and then schedule an event for next idle to actually attempt the access, which will hopefully give the OS enough time to perform the operation, or cause the least disruption in case a page fault still happens. The latter call is restricted to Windows 8 or later, I don't know if you'd want to do the threading thing for previous versions or just dereference at idle (or immediately) and eat the cost.

@abonander
Copy link
Contributor

abonander commented Feb 7, 2017

I just realized we're talking about serving files. You can also use posix_fadvise() or set the FILE_FLAG_SEQUENTIAL_SCAN flag when opening the file on Windows, which will advise the OS to pre-cache the file. You could again try to read only on idle so as to limit disruption if the OS can't read ahead fast enough. Reading in chunks the size of a block in the filesystem would also help.

Addendum: Linux also has readahead() which immediately initiates a read-ahead into cache (though probably equivalent to posix_fadvise(WILL_NEED) for the same arguments), however the docs say that it may block to read file metadata. If the file descriptor was recently opened, though, the metadata should still be in-cache anyway.

@scottlamb
Copy link

I brought up mlock because it makes a guarantee: on successful return, the memory is paged in and will stay that way until munlock or munmap is called. So if you call it from a worker thread and then pass the reference to the reactor thread, you can be absolutely confident accessing that memory won't cause the reactor to pause.

posix_fadvise, posix_madvise, and readahead don't do that. They might do nothing. Even if they read the file into RAM, they don't tell you when it's done, and they don't guarantee it won't be immediately undone. Calls to read (for traditional IO) or memory accesses (for mmap) can still pause.

scottlamb added a commit to scottlamb/owning-ref-rs that referenced this issue Feb 9, 2017
See issues:
hyperium/hyper#953
Kimundi#24

I'm not sure what the right way to do this is (maybe a second erased type,
SyncErased?) but this is a quick hack that will let me throw together a
benchmark of zero-copy mmap in hyper.
scottlamb added a commit to scottlamb/hyper that referenced this issue Feb 9, 2017
@ghost
Copy link

ghost commented Feb 13, 2017

I'd love an option to get the raw socket behind it, as there's times where you want to avoid userland completely, and just use a splice() or a sendfile().

Right now, this doesn't seem doable, or did I miss something during my experiments? (Granted, this is not really asynchronous, but this is still a very real use-case)

@seanmonstar
Copy link
Member Author

@Michael-Zapata There is not an option to do that for now. The ability to borrow the socket would probably require some coordination with tokio, since there is tokio-proto types in-between the user and the socket.

It may be slightly easier to instead allow passing some struct Sendfile { path: PathBuf } on the Body stream, and having hyper figure out how to do that internally, but I'm also quite wary of that. sendfile can block the reactor thread when reading the file, and it is incompatible with HTTPS, and more of a pain if trying to work with HTTP2 streams. I'm not absolutely against it, but it'd require a bit more exploration and a prototype, and so something that feels out of scope for the next release.

@seanmonstar
Copy link
Member Author

I've worked on making the user-supplied body stream generic, and it exists for both the server and client in this branch: https://github.com/hyperium/hyper/compare/outgoing-generic

A few things I either need to fiddle with, or would welcome feedback on, are:

  • This is using server::Response<B = Body>, so that it is generic over any stream, but for people who don't care to customize, they don't need to bother declaring it. Doing this made all the examples still work perfectly, while allowing me to alter an example to use something like futures::stream::Once<&'static [u8], hyper::Error>. However, with default type parameters, it can be easy to accidentally force non-generic in a library.

    For example, if someone did fn decorate(res: &mut Response) { }, no one could pass Response<MyStream> to it. However, the same situation exists with HashMap, where if someone accepts HashMap<K, V>, they've locked their function to only hashmaps that use the default hasher.

    The proper decision should favor the default case. I've figured the default is to just use hyper::Body, since it will be backed by Bytes, being perfomant and easy to use.

  • Unfortunately, though, I cannot yet decide on how to default the to hyper::Body in the Client. With the server, it was easier since a user can define their desire when implementing Service, when saying type Response = Response. They don't do that with a Client, instead passing client.request(req). So far, inference hasn't been able to detect the body that should be used, at least when making a GET request and so don't specify a body.

    It might be that a decent way forward is to adjust Client::new to be bound to hyper::Body, and if you want to change that, you need to use Client::configure.

  • I haven't explored it yet, but it might be nice if the supplied body stream didn't need to actually emit hyper::Error, but just E: Into<hyper::Error>. That'd mean that io::Error would just work...

@abonander
Copy link
Contributor

I don't think it's worth worrying about library writers shooting themselves in the foot by falling back on the default type param.

It's pretty clear to me that Client::get() should be stuck in its own impl block with the B param as a concrete type:

impl<C> Client<C, hyper::Body> where C: Connect {
    /// Send a GET Request using this Client.
    #[inline]
    pub fn get(&self, url: Url) -> FutureResponse {
        self.request(Request::new(Method::Get, url))
    }
}

(Or alternately, futures::stream::Empty<[u8; 0], ::Error> as a true no-op stream).

I doubt you'll get many users who are too lazy to write client.request(Request::new(Method::Get, url))
but still want to use their own body stream for the same Client.

However, this leads into a separate concern I have, but one that I don't think will be easy to resolve because it also digs into Tokio a good bit: I don't think Client should be parametric over the body stream, because that prohibits using different stream types with the same Client. If possible, the B parameter should be lowered to the request() method. I do understand that the parts of Tokio that Client interacts with are dependent on the specific stream type, which does make for a bit of a conundrum. I can't help the feeling that this requirement could be lifted somewhat with some more tinkering in Tokio.

@scottlamb
Copy link

@seanmonstar wrote:

I haven't explored it yet, but it might be nice if the supplied body stream didn't need to actually emit hyper::Error, but just E: Intohyper::Error. That'd mean that io::Error would just work...

Maybe a dumb question, but what is this error used for? I think it's basically just for hyper to know that it should drop the connection to indicate error to the HTTP peer? I think even Error = () would suffice for that. Given that applications/libraries are the ones producing the error, they can do logging and such themselves in whatever way they desire.

hyper::Error is kind of weird for this in that its documentation states that it's "A set of errors that can occur parsing HTTP streams.", which is way more specific than "any sort of error an application could encounter that prevents it from fully generating a HTTP stream".

(By the way, there probably should be some way for hyper::server to tell the application if the whole response including the last chunk was fully written or not, but I think that's a separate concern from the body stream.)

@abonander wrote:

I don't think Client should be parametric over the body stream, because that prohibits using different stream types with the same Client.

I think the server side is actually similar: there's only one Service, and so only one definition of B. If frameworks (such as Iron, Rocket, etc) want to define any higher-level stuff (such as having an option to build simple synchronous handlers that use io::Write as in hyper 0.10.x), they'll end up having to state an opinion about what B should be. (Hopefully they'll leave an escape hatch such as an enum variant for custom stuff like what I want to do with mmap.) It'd be nice if it were possible to avoid forcing them to make this choice, but I think it's still significantly better than hyper being too rigid. It seems like the Rust web ecosystem has almost everything based on hyper, and then various frameworks on top of that. This hyper interface gives them the flexibility to experiment, and low-level performance-oriented code can avoid using a framework entirely and still not have to reinvent hyper.

@abonander
Copy link
Contributor

This kinda goes for Service as well, I think the trait shouldn't be tied to the specific type of the byte stream, but I don't really have a good idea on how to fix that. I just know that different endpoints are likely going to want to return responses in different ways and the current trait design is going to require some kind of dynamic workaround at the user level, either boxing and type erasure or the use of enum, both of which have their associated costs.

@seanmonstar
Copy link
Member Author

Maybe a dumb question, but what is this error used for?

Huh. I traced through the code in tokio and played with some examples, and found that a user will never receive the hyper::Error. If there is an error parsing the head, the error passed back to tokio will make tokio just kill the stream. An error can occur streaming the body, but realistically it'd only ever be an io::Error, until HTTP2 is added, in which case HTTP2 frame errors are also possible.

The purpose of requiring it on the Response side is currently a limited in tokio: you can only define 1 error type to be used through out.

I know from conversations that the tokio team want to improve on error handling in newer versions, so maybe some of these points go away. For the time being, though, I wonder if it makes sense to just be an io::Error. Or really, some opaque error type that can From::from(io::Error), so someone can get the description of the error message. There isn't anything else you can do once the Request stream has sent an error; it will be shutdown regardless.


The points about the Service trait are very interesting! I'd recommend either opening an issue on the tokio-service repo, or discussing them out in the tokio gitter room.

@seanmonstar
Copy link
Member Author

Actually, thanks to another coversation in the tokio room, I realized that it is possible for users make use of the hyper::Error, such as knowing when there was a parse error before receiving a Request.

It is possible to wrap the Http protocol, with another ServerProto impl. It could probably even be a generic wrapper. It would instead find Frame::Error and translate them in a Frame::Message with a Result<Request, Error>. You could then use a new Service to handle that, like so:

struct Example;

impl Service for Example {
    type Request = Result<hyper::server::Request, hyper::Error>;
    // other types are the same
    fn call(&self, incoming: Self::Request) -> Self::Future {
        match incoming {
            Ok(req) => {

            },
            Err(parse_error) => {
                // you could check for some error types
                // like Error::TooLarge, you could respond with a 414
            },
        }
    }
}

It's possibly worth wondering if hyper::server::Http should do this itself, and the wrapper could chose to ignore parse errors, or the other way around. Probably also worth being in a separate issue.

@abonander
Copy link
Contributor

@seanmonstar I mean, I guess if there's no type erasure going on in a deeper abstraction layer that this could be rolled into somehow then the point is moot.

scottlamb added a commit to scottlamb/moonfire-nvr that referenced this issue Feb 17, 2017
See hyper issue:
hyperium/hyper#953

I wanted this for the ability to define a mmap-friendly chunk, but I haven't
used that yet. I need to put some thought into what chunk enum to use;
owning_ref and reffers are possibilities. In the meantime, I'm keeping it as
generic as possible via silly-verbose where clauses.

Another nice benefit is that I no longer have to use .send_all() and a
tokio_core::reactor::Remote to push it through a mpsc; I can directly return a
flatten stream.
@alexcrichton
Copy link
Contributor

cc rust-lang/futures-rs#390, a possible extension that came up during integration in mozilla/sccache#70

@mjc-gh
Copy link

mjc-gh commented Feb 17, 2017

I'm in the process of blogging about futures and the changes to hyper for my company's blog. I have some questions about streaming responses with hyper. I put together a small example that just creates an infinite stream and sends out a Chunk periodically via an Interval from tokio-timer. The source for this is available here: https://github.com/Sigient/hyper_examples/blob/future-streaming/src/main.rs

Firstly, is this the proper way to handle this sort of streaming? I want to eventually rewrite esper and will be keeping open a connection for an Event Source stream so I can send data as events are received from other clients. Is there a better way to send a stream of Chunks or, more generally, tie an arbitrary Stream to a response body? I'm thinking of implementing the Stream trait for some message queue type in esper and defining a poll function which knows when there are new messages available. I'm still trying to wrap my head around futures before I attempt implementing something this complex though.

Secondly, when I run this example and connect a client to the stream, I see the CPU usage for the process spikes to 100%. I think this is likely because I am doing something wrong with the Interval type but I'm not really sure...

@seanmonstar
Copy link
Member Author

@mikeycgto The link to the example doesn't resolve, so I'm able to comment on most of your questions...

@mjc-gh
Copy link

mjc-gh commented Feb 18, 2017

Sorry about that @seanmonstar! The repo was private and I have now made it public. Thanks!

@seanmonstar
Copy link
Member Author

@mikeycgto:

Firstly, is this the proper way to handle this sort of streaming?

That's certainly a way to do it.

Is there a better way to send a stream of Chunks or, more generally, tie an arbitrary Stream to a response body?

I don't know about a 'better' way, but the outgoing (user-supplied) body stream is generic, so you can certainly create your own Stream type. It looks like this example is already making use of that, since before you had to send a hyper::Body body, but this is sending a Box<Stream>.

I see the CPU usage for the process spikes to 100%. I think this is likely because I am doing something wrong with the Interval type but I'm not really sure...

I'm not familiar at all with how the tokio-timer crate works. To try to locate where the CPU usage is coming from, you could compare with different kind of infinite stream. A naive implementation could spawn a thread, use let (tx, body) = hyper::Body::pair(), and then in the thread loop { tx.send(chunk); sleep_thread_for_2_seconds(); }.

@scottlamb
Copy link

@mikeycgto, I wonder if your busy-looping is a bug in tokio or mio. I'm also seeing terrible performance with somewhat different code (a CpuPool sending chunks over a futures::sync::mpsc::channel), and just filed a bug about it in tokio:

tokio-rs/tokio-core#177

I don't understand the problem well enough to know if it's related to what you're seeing or not.

@seanmonstar
Copy link
Member Author

@scottlamb @mikeycgto the busy-looping may have been in part due to code in hyper, which was adjusted in #1074. Still seems to me that tokio shouldn't try to flush if it didn't write anything, but oh well.

@mjc-gh
Copy link

mjc-gh commented Feb 23, 2017

No longer seeing the busy looping. Thanks Sean!

@seanmonstar
Copy link
Member Author

hyper now uses the bytes crate, and so there is an impl From<Bytes> for Chunk, and related shortcuts for set_body. That might be the end of this issue...

I do wonder if the Chunk type has any value anymore, or if the default body stream should just use Bytes directly.

@seanmonstar
Copy link
Member Author

I'm leaning towards closing this as fixed, unless there's an issue that hasn't been addressed yet.

@abonander
Copy link
Contributor

Kind of a naive idea, but I was wondering if there could be some way to have a return stream of Bytes that have been completely read from so the same handles could be reused. This could be done entirely in client code by cloning each Bytes handle yielded and trying to convert them to BytesMut later, but that seems more complex and less efficient.

@seanmonstar
Copy link
Member Author

@abonander If you keep hold of a BytesMut, you can try to reclaim it later by calling reserve on it. If there are no more existing references (Bytes) alive, it will just reset on the same buffer. It might be a nice feature for the bytes crate to allow some way of using a memory pool, or something.

In that case though, it's probably better for this stuff to make use of Drop and such (so implemented outside of hyper).

@dekellum
Copy link

dekellum commented Jul 25, 2018

For anyone arriving here by searching for hyper and mmap, the body_image master branch (candidate for body_image 0.4.0 release) now has zero-copy/async. support for memory mapped http bodies using (glibc) madvise(SEQUENTIAL) (for aggressive OS read-ahead), and tokio-threadpool blocking annotation.

Thanks for publishing your bench results, @scottlamb. I have some non-hyper specific tokio benchmarks included in body_image as well (see CHANGELOG.md for those results.) An mlock strategy could also be added there in the future. Another item that is currently lacking is MAP_HUGE(TLB|_2MB|_1GB) , for lack of support in the memmap-rs crate, which is ripe for forking since its passively (or is it passive-aggressively?) maintained.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants