Skip to content

Commit

Permalink
Add initial SSE support
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Apr 22, 2020
1 parent dd596a8 commit 0549ecb
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ docs = ["unstable"]
unstable = []

[dependencies]
async-sse = "2.1.0"
http-types = "1.0.1"
http-service = "0.5.0"
http-service-h1 = { version = "0.1.0", optional = true }
Expand Down
13 changes: 13 additions & 0 deletions examples/sse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use tide::sse;

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
let mut app = tide::new();
app.at("/sse").get(sse::endpoint(|_req, sender| async move {
sender.send("fruit", "banana").await;
sender.send("fruit", "apple").await;
Ok(())
}));
app.listen("localhost:8080").await?;
Ok(())
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ pub mod prelude;
pub use endpoint::Endpoint;
pub use redirect::redirect;
pub use request::Request;
pub mod sse;

#[doc(inline)]
pub use http_types::{Error, Result, Status};
Expand Down
2 changes: 1 addition & 1 deletion src/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(crate) enum CookieEvent {
/// An HTTP response
#[derive(Debug)]
pub struct Response {
res: http_service::Response,
pub(crate) res: http_service::Response,
// tracking here
pub(crate) cookie_events: Vec<CookieEvent>,
}
Expand Down
69 changes: 69 additions & 0 deletions src/sse/endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::http::{mime, Body, StatusCode};
use crate::sse::Sender;
use crate::utils::BoxFuture;
use crate::{Endpoint, Request, Response, Result};

use async_std::future::Future;
use async_std::io::BufReader;
use async_std::task;

use std::marker::PhantomData;
use std::sync::Arc;

/// Create an endpoint that can handle SSE connections.
pub fn endpoint<F, Fut, State>(handler: F) -> SseEndpoint<F, Fut, State>
where
State: Send + Sync + 'static,
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
{
SseEndpoint {
handler: Arc::new(handler),
__state: PhantomData,
__fut: PhantomData,
}
}

/// An endpoint that can handle SSE connections.
#[derive(Debug)]
pub struct SseEndpoint<F, Fut, State>
where
State: Send + Sync + 'static,
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
{
handler: Arc<F>,
__state: PhantomData<State>,
__fut: PhantomData<Fut>,
}

impl<F, Fut, State> Endpoint<State> for SseEndpoint<F, Fut, State>
where
State: Send + Sync + 'static,
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
{
fn call<'a>(&'a self, req: Request<State>) -> BoxFuture<'a, Result<Response>> {
let handler = self.handler.clone();
Box::pin(async move {
let (sender, encoder) = async_sse::encode();
task::spawn(async move {
let sender = Sender::new(sender);
if let Err(err) = handler(req, sender).await {
log::error!("SSE handler error: {:?}", err);
}
});

// Perform the handshake as described here:
// https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
let mut res = Response::new(StatusCode::Ok);
res.res.insert_header("Cache-Control", "no-cache").unwrap();
res.res.set_content_type(mime::SSE);

let body = Body::from_reader(BufReader::new(encoder), None);
res.set_body(body);

Ok(res)
})
}
}
53 changes: 53 additions & 0 deletions src/sse/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//! Server-Sent Events (SSE) types.
//!
//! # Errors
//!
//! Errors originating in the SSE handler will be logged. Errors originating
//! during the encoding of the SSE stream will be handled by the backend engine
//! the way any other IO error is handled.
//!
//! In the future we may introduce a better mechanism to handle errors that
//! originate outside of regular endpoints.
//!
//! # Examples
//!
//! ```no_run
//! # fn main() -> Result<(), std::io::Error> { async_std::task::block_on(async {
//! #
//! use tide::sse;
//!
//! let mut app = tide::new();
//! app.at("/sse").get(sse::endpoint(|_req, sender| async move {
//! sender.send("fruit", "banana").await;
//! sender.send("fruit", "apple").await;
//! Ok(())
//! }));
//! app.listen("localhost:8080").await?;
//! # Ok(()) }) }
//! ```

mod endpoint;
mod upgrade;

pub use endpoint::{endpoint, SseEndpoint};
pub use upgrade::upgrade;

/// An SSE message sender.
#[derive(Debug)]
pub struct Sender {
sender: async_sse::Sender,
}

impl Sender {
/// Create a new instance of `Sender`.
fn new(sender: async_sse::Sender) -> Self {
Self { sender }
}

/// Send data from the SSE channel.
///
/// Each message constists of a "name" and "data".
pub async fn send(&self, name: &str, data: impl AsRef<[u8]>) {
self.sender.send(name, data.as_ref(), None).await;
}
}
35 changes: 35 additions & 0 deletions src/sse/upgrade.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::http::{mime, Body, StatusCode};
use crate::{Request, Response, Result};

use super::Sender;

use async_std::future::Future;
use async_std::io::BufReader;
use async_std::task;

/// Upgrade an existing HTTP connection to an SSE connection.
pub fn upgrade<F, Fut, State>(req: Request<State>, handler: F) -> Response
where
State: Send + Sync + 'static,
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
{
let (sender, encoder) = async_sse::encode();
task::spawn(async move {
let sender = Sender::new(sender);
if let Err(err) = handler(req, sender).await {
log::error!("SSE handler error: {:?}", err);
}
});

// Perform the handshake as described here:
// https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
let mut res = Response::new(StatusCode::Ok);
res.res.insert_header("Cache-Control", "no-cache").unwrap();
res.res.set_content_type(mime::SSE);

let body = Body::from_reader(BufReader::new(encoder), None);
res.set_body(body);

res
}

0 comments on commit 0549ecb

Please sign in to comment.