Skip to content

Commit

Permalink
Merge branch 'master' into testing-api
Browse files Browse the repository at this point in the history
  • Loading branch information
smangelsdorf committed Nov 29, 2017
2 parents c14b1ae + 8d41328 commit 867dd78
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 9 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ ci = []

[dependencies]
log = "0.3"
hyper = "~0.11.2"
hyper = "~0.11.7"
serde = "~1.0"
serde_derive = "~1.0"
bincode = "0.8"
mime = "0.3"
futures = "~0.1.11"
tokio-core = "0.1"
mio = "0.6"
borrow-bag = "0.3"
borrow-bag = "0.4"
url = "1.4.0"
uuid = { version = "0.5", features = ["v4"] }
chrono = "0.4"
Expand Down
3 changes: 2 additions & 1 deletion examples/kitchen-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ authors = ["Shaun Mangelsdorf <s.mangelsdorf@gmail.com>",
gotham = { path = "../.." }
gotham_derive = { path = "../../gotham_derive" }
futures = "*"
hyper="~0.11.2"
hyper="~0.11.7"
borrow-bag = "*"
log = "0.3"
fern = "0.4"
chrono = "0.4"
mime = "0.3"
tokio-timer = "0.1"
40 changes: 37 additions & 3 deletions examples/kitchen-sink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ extern crate chrono;
extern crate log;
extern crate fern;
extern crate mime;
extern crate tokio_timer;

mod middleware;

use std::panic::RefUnwindSafe;

use tokio_timer::*;
use std::time::Duration;
use futures::{future, Future, Stream};

use hyper::{Body, Response, Method, StatusCode};

use log::LogLevelFilter;

use chrono::prelude::*;

use gotham::router::request::path::NoopPathExtractor;
Expand Down Expand Up @@ -161,6 +161,15 @@ fn build_router() -> Router {
));
tree_builder.add_child(async);

let mut wait = NodeBuilder::new("wait", SegmentType::Static);
wait.add_route(static_route(
vec![Method::Get],
|| Ok(Echo::wait),
(global, ()),
pipeline_set.clone(),
));
tree_builder.add_child(wait);

let mut header_value = NodeBuilder::new("header_value", SegmentType::Static);
header_value.add_route(static_route(
vec![Method::Get],
Expand Down Expand Up @@ -243,6 +252,31 @@ impl Echo {
Box::new(future::lazy(move || future::ok((state, res))))
}

pub fn wait(state: State) -> Box<HandlerFuture> {
let timeout = Timer::default();
let sleep = timeout.sleep(Duration::from_secs(2));

let result = sleep.then(|res| match res {
Ok(_) => {
let res = create_response(
&state,
StatusCode::Ok,
Some((
String::from("delayed hello").into_bytes(),
mime::TEXT_PLAIN,
)),
);
future::ok((state, res))
}
Err(e) => {
let err = e.into_handler_error();
future::err((state, err))
}
});

Box::new(result)
}

fn header_value(mut state: State) -> (State, Response) {
state.borrow_mut::<KitchenSinkData>().header_value = "different value!".to_owned();

Expand Down
136 changes: 135 additions & 1 deletion src/handler/trap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

use std::panic::{AssertUnwindSafe, catch_unwind};
use std::error::Error;
use std::any::Any;
use std::{io, mem};

use hyper::{self, Response, StatusCode};
use futures::Async;
use futures::future::{self, Future, FutureResult};

use handler::{NewHandler, Handler, HandlerError, IntoResponse};
Expand Down Expand Up @@ -42,7 +45,9 @@ where
});

match res {
Ok(f) => f,
Ok(f) => Box::new(UnwindSafeFuture::new(f).catch_unwind().then(
finalize_catch_unwind_response,
)),
Err(_) => Box::new(finalize_panic_response(timer)),
}
}
Expand Down Expand Up @@ -101,6 +106,69 @@ fn finalize_panic_response(timer: Timer) -> FutureResult<Response, hyper::Error>
future::ok(Response::new().with_status(StatusCode::InternalServerError))
}

fn finalize_catch_unwind_response(
result: Result<Result<Response, hyper::Error>, Box<Any + Send>>,
) -> FutureResult<Response, hyper::Error> {
let response = result
.unwrap_or_else(|_| {
let e = io::Error::new(
io::ErrorKind::Other,
"Attempting to poll the future caused a panic",
);

Err(hyper::Error::Io(e))
})
.unwrap_or_else(|_| {
error!("[PANIC][A panic occurred while polling the future]");
Response::new().with_status(StatusCode::InternalServerError)
});

future::ok(response)
}

enum UnwindSafeFuture<F>
where
F: Future<Error = hyper::Error>,
{
Available(AssertUnwindSafe<F>),
Poisoned,
}

impl<F> Future for UnwindSafeFuture<F>
where
F: Future<Error = hyper::Error>,
{
type Item = F::Item;
type Error = hyper::Error;

fn poll(&mut self) -> Result<Async<Self::Item>, hyper::Error> {
match mem::replace(self, UnwindSafeFuture::Poisoned) {
UnwindSafeFuture::Available(mut f) => {
let r = f.poll();
*self = UnwindSafeFuture::Available(f);
r
}
UnwindSafeFuture::Poisoned => {
let e = io::Error::new(
io::ErrorKind::Other,
"Poisoned future due to previous panic",
);

Err(hyper::Error::Io(e))
}
}
}
}

impl<F> UnwindSafeFuture<F>
where
F: Future<Error = hyper::Error>,
{
fn new(f: F) -> UnwindSafeFuture<F> {
UnwindSafeFuture::Available(AssertUnwindSafe(f))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -131,6 +199,32 @@ mod tests {
assert_eq!(response.status(), StatusCode::Accepted);
}

#[test]
fn async_success_repeat_poll() {
let new_handler = || {
Ok(|state| {
let f = future::lazy(move || {
let res = create_response(&state, StatusCode::Accepted, None);
future::ok((state, res))
});

let f = future::lazy(move || f);
let f = future::lazy(move || f);
let f = future::lazy(move || f);

Box::new(f) as Box<HandlerFuture>
})
};

let mut state = State::new();
state.put(Headers::new());
set_request_id(&mut state);

let r = call_handler(&new_handler, AssertUnwindSafe(state));
let response = r.wait().unwrap();
assert_eq!(response.status(), StatusCode::Accepted);
}

#[test]
fn error() {
let new_handler = || {
Expand Down Expand Up @@ -167,4 +261,44 @@ mod tests {
let response = r.wait().unwrap();
assert_eq!(response.status(), StatusCode::InternalServerError);
}

#[test]
fn async_panic() {
let new_handler = || {
Ok(|_| {
let val: Option<Box<HandlerFuture>> = None;
Box::new(future::lazy(move || val.expect("test panic"))) as Box<HandlerFuture>
})
};

let mut state = State::new();
state.put(Headers::new());
set_request_id(&mut state);

let r = call_handler(&new_handler, AssertUnwindSafe(state));
let response = r.wait().unwrap();
assert_eq!(response.status(), StatusCode::InternalServerError);
}

#[test]
fn async_panic_repeat_poll() {
let new_handler = || {
Ok(|_| {
let val: Option<Box<HandlerFuture>> = None;
let f = future::lazy(move || val.expect("test panic"));
let f = future::lazy(move || f);
let f = future::lazy(move || f);
let f = future::lazy(move || f);
Box::new(f) as Box<HandlerFuture>
})
};

let mut state = State::new();
state.put(Headers::new());
set_request_id(&mut state);

let r = call_handler(&new_handler, AssertUnwindSafe(state));
let response = r.wait().unwrap();
assert_eq!(response.status(), StatusCode::InternalServerError);
}
}
4 changes: 2 additions & 2 deletions src/router/route/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::sync::Arc;
use std::panic::RefUnwindSafe;
use borrow_bag::{new_borrow_bag, BorrowBag, Handle, Lookup};
use borrow_bag::{BorrowBag, Handle, Lookup};
use futures::future;

use handler::{Handler, NewHandler, HandlerFuture, IntoHandlerError};
Expand All @@ -19,7 +19,7 @@ pub type EditablePipelineSet<P> = BorrowBag<P>;
///
/// See BorrowBag#add to insert new `Pipeline` instances.
pub fn new_pipeline_set() -> EditablePipelineSet<()> {
new_borrow_bag()
BorrowBag::new()
}

/// Wraps the current set of `Pipeline` instances into a thread-safe reference counting pointer for
Expand Down

0 comments on commit 867dd78

Please sign in to comment.