Skip to content

Commit

Permalink
Add support for immediate chunk flushing, SSE.
Browse files Browse the repository at this point in the history
Problem:

To support Server-Side Events (SSE, aka JS EventSource) it is
necessary for the server to keep open an HTTP request and dribble out
data (the event stream) as it is generated.

Currently, Rocket handles this poorly.  It likes to fill in complete
chunks.  Also there is no way to force a flush of the underlying
stream: in particular, there is a BufWriter underneath hyper.  hyper
would honour a flush request, but there is no way to send one.

Options:

Ideally the code which is producing the data would be able to
explicitly designate when a flush should occur.  Certainly it would
not be acceptable to flush all the time for all readers.

1. Invent a new kind of Body (UnbufferedChunked) which translates the
data from each Read::read() call into a single call to the stream
write, and which always flushes.  This would be a seriously invasive
change.  And it would mean that SSE systems with fast event streams
might work poorly.

2. Invent a new kind of Body which doesn't use Read at all, and
instead has a more sophisticated API.  This would be super-invasive
and heavyweight.

3. Find a way to encode the necessary information in the Read trait
protocol.

Chosen solution:

It turns out that option 3 is quite easy.  The read() call can return
an io::Error.  There are at least some errors that clearly ought not
to occur here.  An obvious one is ErrorKind::WouldBlock.

Rocket expects the reader to block.  WouldBlock is only applicable to
nonblocking objects.  And indeed the reader will generally want to
return it (once) when it is about to block.

We have the Stream treat io::Error with ErrorKind::WouldBlock, from
its reader, as a request to flush.  There are two effects: we stop
trying to accumulate a full chunk, and we issue a flush call to the
underlying writer (which, eventually, makes it all the way down into
hyper and BufWriter).

Implementation:

We provide a method ReadExt::read_max_wfs which is like read_max but
which handles the WouldBlock case specially.  It tells its caller
whether a flush was wanted.

This is implemented by adding a new code to read_max_internal.  with a
boolean to control it.  This seemed better than inventing a trait or
something.  (The other read_max call site is reading http headers in
data.rs, and I think it wants to tread WouldBlock as an error.)

Risks and downsides:

Obviously this ad-hoc extension to the Read protocol is not
particularly pretty.  At least, people who aren't doing SSE (or
similar) won't need it and can ignore it.

If for some reason the data source is actually nonblocking, this new
arrangement would spin, rather than calling the situation a fatal
error.  This possibility seems fairly remote, in production settings
at least.  To migitate this it might be possible for the loop in
Rocket::issue_response to bomb out if it notices it is sending lots of
consecutive empty chunks.

It is possible that async Rocket will want to take a different
approach entirely.  But it will definitely need to solve this problem
somehow, and naively it seems like the obvious transformation to eg
the Tokio read trait would have the same API limitation and admit the
same solution.  (Having a flush occur every time the body stream
future returns Pending would not be good for performance, I think.)
  • Loading branch information
ijackson authored and SergioBenitez committed Oct 30, 2020
1 parent 7b1995c commit c24a963
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"examples/testing",
"examples/request_local_state",
"examples/request_guard",
"examples/sse",
"examples/stream",
"examples/json",
"examples/msgpack",
Expand Down
1 change: 1 addition & 0 deletions core/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ all-features = true
[features]
default = ["private-cookies"]
tls = ["rocket_http/tls"]
sse = []
private-cookies = ["rocket_http/private-cookies"]

[dependencies]
Expand Down
39 changes: 28 additions & 11 deletions core/lib/src/ext.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
use std::io;

pub trait ReadExt: io::Read {
fn read_max(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
let start_len = buf.len();
while !buf.is_empty() {
match self.read(buf) {
Ok(0) => break,
Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; }
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
fn read_max_internal<T: io::Read>(reader: &mut T, mut buf: &mut [u8],
wouldblock_flush_signalling: bool)
-> io::Result<(usize, bool)> {
let start_len = buf.len();
let need_flush = loop {
if buf.is_empty() { break false }
match reader.read(buf) {
Ok(0) => { break true }
Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; }
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(ref e) if (e.kind() == io::ErrorKind::WouldBlock &&
wouldblock_flush_signalling) => { break true }
Err(e) => return Err(e),
}
};

Ok(start_len - buf.len())
Ok((start_len - buf.len(), need_flush))
}

pub trait ReadExt: io::Read + Sized {
fn read_max(&mut self, buf: &mut [u8]) -> io::Result<usize> {
Ok(read_max_internal(self, buf, false)?.0)
}

/// Tries to fill buf with data. Short reads can occur for EOF or
/// flush requests. With SSE enabled, a flush request occurs if
/// the underlying reader returns ErrorKind::Wouldblock
fn read_max_wfs(&mut self, buf: &mut [u8])
-> io::Result<(usize, bool)> {
read_max_internal(self, buf, cfg!(feature="sse"))
}
}

Expand Down
6 changes: 6 additions & 0 deletions core/lib/src/response/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,12 @@ impl<'r> Response<'r> {
/// [DEFAULT_CHUNK_SIZE](::response::DEFAULT_CHUNK_SIZE). Use
/// [set_chunked_body](#method.set_chunked_body) for custom chunk sizes.
///
/// Normally, data will be buffered and sent only in complete
/// chunks. If you need timely transmission of available data,
/// rather than buffering, enable the `sse` feature and use the
/// `WouldBlock` technique described in
/// [Stream](::response::Stream).
///
/// # Example
///
/// ```rust
Expand Down
24 changes: 24 additions & 0 deletions core/lib/src/response/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ impl<T: Read> Stream<T> {
/// # #[allow(unused_variables)]
/// let response = Stream::chunked(io::stdin(), 10);
/// ```
///
/// # Buffering and blocking
///
/// Normally, data will be buffered and sent only in complete
/// `chunk_size` chunks.
///
/// With the feature `sse` enabled, the `Read`er may signal that
/// data sent so far should be transmitted in a timely fashion
/// (e.g. it is responding to a Server-Side Events (JavaScript
/// `EventSource`) request. To do this it should return an
/// [io::Error](std::io::Error) of kind `WouldBlock` (which should
/// not normally occur), after returning a collection of data.
/// This will cause a flush of data seen so far, rather than being
/// treated as an error.
///
/// Note that long-running responses may easily exhaust Rocket's
/// thread pool, so consider increasing the number of threads.
/// If doing SSE, also note the 'maximum open connections' browser
/// limitation which is described in the
/// [EventSource documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource)
/// on the Mozilla Developer Network.
///
/// Without the `sse` feature, a `WouldBlock` error is treated
/// as an actual error.
pub fn chunked(reader: T, chunk_size: u64) -> Stream<T> {
Stream(reader, chunk_size)
}
Expand Down
9 changes: 6 additions & 3 deletions core/lib/src/rocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,12 @@ impl Rocket {
let mut buffer = vec![0; chunk_size as usize];
let mut stream = hyp_res.start()?;
loop {
match body.read_max(&mut buffer)? {
0 => break,
n => stream.write_all(&buffer[..n])?,
match body.read_max_wfs(&mut buffer)? {
(0, _) => break,
(n, f) => {
stream.write_all(&buffer[..n])?;
if f { stream.flush()? }
},
}
}

Expand Down
8 changes: 8 additions & 0 deletions examples/sse/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "sse"
version = "0.0.0"
workspace = "../../"
publish = false

[dependencies]
rocket = { path = "../../core/lib", features = ["sse"] }
79 changes: 79 additions & 0 deletions examples/sse/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#![feature(proc_macro_hygiene, decl_macro)]
#[macro_use]
extern crate rocket;

use rocket::http::ContentType;
use rocket::response::Content;
use rocket::response::Responder;
use std::io::BufReader;
use std::io::Read;
use std::thread::sleep;
use std::time::Duration;

#[get("/")]
fn index<'r>() -> impl Responder<'r> {
Content(
ContentType::HTML,
r##"
<body>
<h1>Hi!</h1>
<div id="spong">nothing yet</div>
</body>
<script src="script.js"></script>
"##,
)
}

#[get("/script.js")]
fn script<'r>() -> impl Responder<'r> {
Content(
ContentType::JavaScript,
r##"
status_node = document.getElementById('spong');
status_node.innerHTML = 'js-done'
es = new EventSource("updates");
es.onmessage = function(event) {
status_node.innerHTML = event.data;
}
"##,
)
}

const BUF_SIZE : usize = 4096;

type TestCounter = BufReader<TestCounterInner>;
#[derive(Debug)]
struct TestCounterInner {
next: usize,
}
impl Read for TestCounterInner {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
sleep(Duration::from_millis(500));
let data = format!("data: {}\n\n", self.next);
self.next += 1;
// `BufReader` won't call us unless its buffer is empty, and
// then buf will be the whole of the buffer, ie of size
// BUF_SIZE (due to the `with_capacity` call). So `data` is
// definitely going to fit.
buf[0..data.len()].copy_from_slice(data.as_bytes());
Ok(buf.len())
}
}

#[get("/updates")]
fn updates<'x>() -> impl Responder<'x> {
let tc = TestCounterInner { next: 0 };
let tc = BufReader::with_capacity(BUF_SIZE, tc);
let ch = rocket::response::Stream::from(tc);
let ct = ContentType::parse_flexible("text/event-stream; charset=utf-8").unwrap();
Content(ct, ch)
}

fn main() {
rocket::ignite()
.mount("/", routes![index, script, updates,])
.launch();
}

0 comments on commit c24a963

Please sign in to comment.