Skip to content

Commit 4e2483a

Browse files
dicejApu Islam
authored andcommitted
prototype support for 1xx informational responses
This is a prototype intended to spur discussion about what support for 1xx informational responses should look like in a Hyper server. The good news is that it works great (for HTTP/1 only, so far). The bad news is it's kind of ugly. Here's what I did: - Add `ext::InformationalSender`, a type which wraps a `futures_channel::mspc::Sender<Response<()>>`. This may be added as an extension to an inbound `Request` by the Hyper server, and the application and/or middleware may use it to send one or more informational responses before sending the real one. - Add code to `proto::h1::dispatch` and friends to add such an extension to each inbound request and then poll the `Receiver` end along with the future representing the final response. If the app never sends any informational responses, then everything proceeds as normal. Otherwise, we send those responses as they become available until the final response is ready. TODO items: - [ ] Also support informational responses in the HTTP/2 server. - [ ] Determine best way to handle when the app sends an informational response with a non-1xx status code. Currently we just silently ignore it. - [ ] Come up with a less hacky API? - [ ] Add test coverage. Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 1c70fab commit 4e2483a

File tree

6 files changed

+96
-51
lines changed

6 files changed

+96
-51
lines changed

src/error.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,6 @@ pub(super) enum User {
146146
#[cfg(any(feature = "http1", feature = "http2"))]
147147
#[cfg(feature = "server")]
148148
UnexpectedHeader,
149-
/// User tried to respond with a 1xx (not 101) response code.
150-
#[cfg(feature = "http1")]
151-
#[cfg(feature = "server")]
152-
UnsupportedStatusCode,
153149

154150
/// User tried polling for an upgrade that doesn't exist.
155151
NoUpgrade,
@@ -392,12 +388,6 @@ impl Error {
392388
Error::new(Kind::HeaderTimeout)
393389
}
394390

395-
#[cfg(feature = "http1")]
396-
#[cfg(feature = "server")]
397-
pub(super) fn new_user_unsupported_status_code() -> Error {
398-
Error::new_user(User::UnsupportedStatusCode)
399-
}
400-
401391
pub(super) fn new_user_no_upgrade() -> Error {
402392
Error::new_user(User::NoUpgrade)
403393
}
@@ -537,11 +527,6 @@ impl Error {
537527
#[cfg(any(feature = "http1", feature = "http2"))]
538528
#[cfg(feature = "server")]
539529
Kind::User(User::UnexpectedHeader) => "user sent unexpected header",
540-
#[cfg(feature = "http1")]
541-
#[cfg(feature = "server")]
542-
Kind::User(User::UnsupportedStatusCode) => {
543-
"response has 1xx status code, not supported by server"
544-
}
545530
Kind::User(User::NoUpgrade) => "no upgrade available",
546531
#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
547532
Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use",

src/ext/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,3 +293,13 @@ impl OriginalHeaderOrder {
293293
self.entry_order.iter()
294294
}
295295
}
296+
297+
/// Request extension type for sending one or more 1xx informational responses
298+
/// prior to the final response.
299+
///
300+
/// This extension is meant to be attached to inbound `Request`s, allowing a
301+
/// server to send informational responses immediately (i.e. without delaying
302+
/// them until it has constructed a final, non-informational response).
303+
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
304+
#[derive(Clone, Debug)]
305+
pub struct InformationalSender(pub futures_channel::mpsc::Sender<http::Response<()>>);

src/proto/h1/conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ where
641641
head.extensions.remove::<crate::ext::OnInformational>();
642642
}
643643

644-
Some(encoder)
644+
encoder
645645
}
646646
Err(err) => {
647647
self.state.error = Some(err);

src/proto/h1/dispatch.rs

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,22 @@ use std::{
88

99
use crate::rt::{Read, Write};
1010
use bytes::{Buf, Bytes};
11-
use futures_core::ready;
11+
#[cfg(feature = "server")]
12+
use futures_channel::mpsc::{self, Receiver};
13+
use futures_util::ready;
14+
#[cfg(feature = "server")]
15+
use futures_util::StreamExt;
1216
use http::Request;
17+
#[cfg(feature = "server")]
18+
use http::Response;
1319

1420
use super::{Http1Transaction, Wants};
1521
use crate::body::{Body, DecodedLength, Incoming as IncomingBody};
1622
#[cfg(feature = "client")]
1723
use crate::client::dispatch::TrySendError;
1824
use crate::common::task;
25+
#[cfg(feature = "server")]
26+
use crate::ext::InformationalSender;
1927
use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
2028
use crate::upgrade::OnUpgrade;
2129

@@ -35,7 +43,7 @@ pub(crate) trait Dispatch {
3543
fn poll_msg(
3644
self: Pin<&mut Self>,
3745
cx: &mut Context<'_>,
38-
) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
46+
) -> Poll<Option<Result<(Self::PollItem, Option<Self::PollBody>), Self::PollError>>>;
3947
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>)
4048
-> crate::Result<()>;
4149
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>;
@@ -46,6 +54,7 @@ cfg_server! {
4654
use crate::service::HttpService;
4755

4856
pub(crate) struct Server<S: HttpService<B>, B> {
57+
informational_rx: Option<Receiver<Response<()>>>,
4958
in_flight: Pin<Box<Option<S::Future>>>,
5059
pub(crate) service: S,
5160
}
@@ -336,17 +345,22 @@ where
336345
if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
337346
let (head, body) = msg.map_err(crate::Error::new_user_service)?;
338347

339-
let body_type = if body.is_end_stream() {
348+
let body_type = if let Some(body) = body {
349+
if body.is_end_stream() {
350+
self.body_rx.set(None);
351+
None
352+
} else {
353+
let btype = body
354+
.size_hint()
355+
.exact()
356+
.map(BodyLength::Known)
357+
.or(Some(BodyLength::Unknown));
358+
self.body_rx.set(Some(body));
359+
btype
360+
}
361+
} else {
340362
self.body_rx.set(None);
341363
None
342-
} else {
343-
let btype = body
344-
.size_hint()
345-
.exact()
346-
.map(BodyLength::Known)
347-
.or(Some(BodyLength::Unknown));
348-
self.body_rx.set(Some(body));
349-
btype
350364
};
351365
self.conn.write_head(head, body_type);
352366
} else {
@@ -505,6 +519,7 @@ cfg_server! {
505519
{
506520
pub(crate) fn new(service: S) -> Server<S, B> {
507521
Server {
522+
informational_rx: None,
508523
in_flight: Box::pin(None),
509524
service,
510525
}
@@ -532,8 +547,33 @@ cfg_server! {
532547
fn poll_msg(
533548
mut self: Pin<&mut Self>,
534549
cx: &mut Context<'_>,
535-
) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
550+
) -> Poll<Option<Result<(Self::PollItem, Option<Self::PollBody>), Self::PollError>>> {
536551
let mut this = self.as_mut();
552+
553+
if let Some(informational_rx) = this.informational_rx.as_mut() {
554+
if let Poll::Ready(informational) = informational_rx.poll_next_unpin(cx) {
555+
if let Some(informational) = informational {
556+
let (parts, _) = informational.into_parts();
557+
if parts.status.is_informational() {
558+
let head = MessageHead {
559+
version: parts.version,
560+
subject: parts.status,
561+
headers: parts.headers,
562+
extensions: parts.extensions,
563+
};
564+
return Poll::Ready(Some(Ok((head, None))));
565+
} else {
566+
// TODO: We should return an error here, but we have
567+
// no way of creating a `Self::PollError`; might
568+
// need to change the signature of
569+
// `Dispatch::poll_msg`.
570+
}
571+
} else {
572+
this.informational_rx = None;
573+
}
574+
}
575+
}
576+
537577
let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
538578
let resp = ready!(fut.as_mut().poll(cx)?);
539579
let (parts, body) = resp.into_parts();
@@ -543,13 +583,14 @@ cfg_server! {
543583
headers: parts.headers,
544584
extensions: parts.extensions,
545585
};
546-
Poll::Ready(Some(Ok((head, body))))
586+
Poll::Ready(Some(Ok((head, Some(body)))))
547587
} else {
548588
unreachable!("poll_msg shouldn't be called if no inflight");
549589
};
550590

551591
// Since in_flight finished, remove it
552592
this.in_flight.set(None);
593+
this.informational_rx = None;
553594
ret
554595
}
555596

@@ -561,7 +602,10 @@ cfg_server! {
561602
*req.headers_mut() = msg.headers;
562603
*req.version_mut() = msg.version;
563604
*req.extensions_mut() = msg.extensions;
605+
let (informational_tx, informational_rx) = mpsc::channel(1);
606+
assert!(req.extensions_mut().insert(InformationalSender(informational_tx)).is_none());
564607
let fut = self.service.call(req);
608+
self.informational_rx = Some(informational_rx);
565609
self.in_flight.set(Some(fut));
566610
Ok(())
567611
}
@@ -607,7 +651,7 @@ cfg_client! {
607651
fn poll_msg(
608652
mut self: Pin<&mut Self>,
609653
cx: &mut Context<'_>,
610-
) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Infallible>>> {
654+
) -> Poll<Option<Result<(Self::PollItem, Option<Self::PollBody>), Infallible>>> {
611655
let mut this = self.as_mut();
612656
debug_assert!(!this.rx_closed);
613657
match this.rx.poll_recv(cx) {
@@ -627,7 +671,7 @@ cfg_client! {
627671
extensions: parts.extensions,
628672
};
629673
this.callback = Some(cb);
630-
Poll::Ready(Some(Ok((head, body))))
674+
Poll::Ready(Some(Ok((head, Some(body)))))
631675
}
632676
}
633677
}

src/proto/h1/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ pub(crate) trait Http1Transaction {
3333
#[cfg(feature = "tracing")]
3434
const LOG: &'static str;
3535
fn parse(bytes: &mut BytesMut, ctx: ParseContext<'_>) -> ParseResult<Self::Incoming>;
36-
fn encode(enc: Encode<'_, Self::Outgoing>, dst: &mut Vec<u8>) -> crate::Result<Encoder>;
36+
fn encode(enc: Encode<'_, Self::Outgoing>, dst: &mut Vec<u8>)
37+
-> crate::Result<Option<Encoder>>;
3738

3839
fn on_error(err: &crate::Error) -> Option<MessageHead<Self::Outgoing>>;
3940

src/proto/h1/role.rs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ fn is_complete_fast(bytes: &[u8], prev_len: usize) -> bool {
111111
pub(super) fn encode_headers<T>(
112112
enc: Encode<'_, T::Outgoing>,
113113
dst: &mut Vec<u8>,
114-
) -> crate::Result<Encoder>
114+
) -> crate::Result<Option<Encoder>>
115115
where
116116
T: Http1Transaction,
117117
{
@@ -356,7 +356,10 @@ impl Http1Transaction for Server {
356356
}))
357357
}
358358

359-
fn encode(mut msg: Encode<'_, Self::Outgoing>, dst: &mut Vec<u8>) -> crate::Result<Encoder> {
359+
fn encode(
360+
msg: Encode<'_, Self::Outgoing>,
361+
dst: &mut Vec<u8>,
362+
) -> crate::Result<Option<Encoder>> {
360363
trace!(
361364
"Server::encode status={:?}, body={:?}, req_method={:?}",
362365
msg.head.subject,
@@ -366,25 +369,19 @@ impl Http1Transaction for Server {
366369

367370
let mut wrote_len = false;
368371

369-
// hyper currently doesn't support returning 1xx status codes as a Response
370-
// This is because Service only allows returning a single Response, and
371-
// so if you try to reply with a e.g. 100 Continue, you have no way of
372-
// replying with the latter status code response.
373-
let (ret, is_last) = if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS {
374-
(Ok(()), true)
372+
let informational = msg.head.subject.is_informational();
373+
374+
let is_last = if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS {
375+
true
375376
} else if msg.req_method == &Some(Method::CONNECT) && msg.head.subject.is_success() {
376377
// Sending content-length or transfer-encoding header on 2xx response
377378
// to CONNECT is forbidden in RFC 7231.
378379
wrote_len = true;
379-
(Ok(()), true)
380-
} else if msg.head.subject.is_informational() {
381-
warn!("response with 1xx status code not supported");
382-
*msg.head = MessageHead::default();
383-
msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR;
384-
msg.body = None;
385-
(Err(crate::Error::new_user_unsupported_status_code()), true)
380+
true
381+
} else if informational {
382+
false
386383
} else {
387-
(Ok(()), !msg.keep_alive)
384+
!msg.keep_alive
388385
};
389386

390387
// In some error cases, we don't know about the invalid message until already
@@ -442,6 +439,7 @@ impl Http1Transaction for Server {
442439
}
443440
orig_headers => orig_headers,
444441
};
442+
445443
let encoder = if let Some(orig_headers) = orig_headers {
446444
Self::encode_headers_with_original_case(
447445
msg,
@@ -455,7 +453,11 @@ impl Http1Transaction for Server {
455453
Self::encode_headers_with_lower_case(msg, dst, is_last, orig_len, wrote_len)?
456454
};
457455

458-
ret.map(|()| encoder)
456+
// If we're sending a 1xx informational response, it won't have a body,
457+
// so we'll return `None` here. Additionally, that will tell
458+
// `Conn::write_head` to stay in the `Writing::Init` state since we
459+
// haven't yet sent the final response.
460+
Ok(if informational { None } else { Some(encoder) })
459461
}
460462

461463
fn on_error(err: &crate::Error) -> Option<MessageHead<Self::Outgoing>> {
@@ -1165,7 +1167,10 @@ impl Http1Transaction for Client {
11651167
}
11661168
}
11671169

1168-
fn encode(msg: Encode<'_, Self::Outgoing>, dst: &mut Vec<u8>) -> crate::Result<Encoder> {
1170+
fn encode(
1171+
msg: Encode<'_, Self::Outgoing>,
1172+
dst: &mut Vec<u8>,
1173+
) -> crate::Result<Option<Encoder>> {
11691174
trace!(
11701175
"Client::encode method={:?}, body={:?}",
11711176
msg.head.subject.0,
@@ -1211,7 +1216,7 @@ impl Http1Transaction for Client {
12111216
extend(dst, b"\r\n");
12121217
msg.head.headers.clear(); //TODO: remove when switching to drain()
12131218

1214-
Ok(body)
1219+
Ok(Some(body))
12151220
}
12161221

12171222
fn on_error(_err: &crate::Error) -> Option<MessageHead<Self::Outgoing>> {

0 commit comments

Comments
 (0)