Skip to content

Commit

Permalink
fix(lib): properly handle HTTP/1.0 remotes
Browse files Browse the repository at this point in the history
- Downgrades internal semantics to HTTP/1.0 if peer sends a message with
  1.0 version.
- If downgraded, chunked writers become EOF writers, with the connection
  closing once the writing is complete.
- When downgraded, if keep-alive was wanted, the `Connection: keep-alive`
  header is added.

Closes #1304
  • Loading branch information
seanmonstar committed Jan 22, 2018
1 parent 7d493aa commit 36e66a5
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 44 deletions.
121 changes: 85 additions & 36 deletions src/proto/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ where I: AsyncRead + AsyncWrite,
read_task: None,
reading: Reading::Init,
writing: Writing::Init,
// We assume a modern world where the remote speaks HTTP/1.1.
// If they tell us otherwise, we'll downgrade in `read_head`.
version: Version::Http11,
},
_marker: PhantomData,
}
Expand Down Expand Up @@ -189,43 +192,44 @@ where I: AsyncRead + AsyncWrite,
}
};

match version {
HttpVersion::Http10 | HttpVersion::Http11 => {
let decoder = match T::decoder(&head, &mut self.state.method) {
Ok(d) => d,
Err(e) => {
debug!("decoder error = {:?}", e);
self.state.close_read();
return Err(e);
}
};

debug!("incoming body is {}", decoder);

self.state.busy();
if head.expecting_continue() {
let msg = b"HTTP/1.1 100 Continue\r\n\r\n";
self.state.writing = Writing::Continue(Cursor::new(msg));
}
let wants_keep_alive = head.should_keep_alive();
self.state.keep_alive &= wants_keep_alive;
let (body, reading) = if decoder.is_eof() {
(false, Reading::KeepAlive)
} else {
(true, Reading::Body(decoder))
};
self.state.reading = reading;
if !body {
self.try_keep_alive();
}
Ok(Async::Ready(Some((head, body))))
},
self.state.version = match version {
HttpVersion::Http10 => Version::Http10,
HttpVersion::Http11 => Version::Http11,
_ => {
error!("unimplemented HTTP Version = {:?}", version);
self.state.close_read();
Err(::Error::Version)
return Err(::Error::Version);
}
};

let decoder = match T::decoder(&head, &mut self.state.method) {
Ok(d) => d,
Err(e) => {
debug!("decoder error = {:?}", e);
self.state.close_read();
return Err(e);
}
};

debug!("incoming body is {}", decoder);

self.state.busy();
if head.expecting_continue() {
let msg = b"HTTP/1.1 100 Continue\r\n\r\n";
self.state.writing = Writing::Continue(Cursor::new(msg));
}
let wants_keep_alive = head.should_keep_alive();
self.state.keep_alive &= wants_keep_alive;
let (body, reading) = if decoder.is_eof() {
(false, Reading::KeepAlive)
} else {
(true, Reading::Body(decoder))
};
self.state.reading = reading;
if !body {
self.try_keep_alive();
}
Ok(Async::Ready(Some((head, body))))
}

pub fn read_body(&mut self) -> Poll<Option<super::Chunk>, io::Error> {
Expand Down Expand Up @@ -414,11 +418,11 @@ where I: AsyncRead + AsyncWrite,
}
}

pub fn write_head(&mut self, head: super::MessageHead<T::Outgoing>, body: bool) {
pub fn write_head(&mut self, mut head: super::MessageHead<T::Outgoing>, body: bool) {
debug_assert!(self.can_write_head());

let wants_keep_alive = head.should_keep_alive();
self.state.keep_alive &= wants_keep_alive;
self.enforce_version(&mut head);

let buf = self.io.write_buf_mut();
// if a 100-continue has started but not finished sending, tack the
// remainder on to the start of the buffer.
Expand All @@ -435,6 +439,36 @@ where I: AsyncRead + AsyncWrite,
};
}

// If we know the remote speaks an older version, we try to fix up any messages
// to work with our older peer.
fn enforce_version(&mut self, head: &mut super::MessageHead<T::Outgoing>) {
use header::Connection;

let wants_keep_alive = if self.state.wants_keep_alive() {
let ka = head.should_keep_alive();
self.state.keep_alive &= ka;
ka
} else {
false
};

match self.state.version {
Version::Http10 => {
// If the remote only knows HTTP/1.0, we should force ourselves
// to do only speak HTTP/1.0 as well.
head.version = HttpVersion::Http10;
if wants_keep_alive {
head.headers.set(Connection::keep_alive());
}
},
Version::Http11 => {
// If the remote speaks HTTP/1.1, then it *should* be fine with
// both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
// the user's headers be.
}
}
}

pub fn write_body(&mut self, chunk: Option<B>) -> StartSend<Option<B>, io::Error> {
debug_assert!(self.can_write_body());

Expand Down Expand Up @@ -486,7 +520,7 @@ where I: AsyncRead + AsyncWrite,
}
} else {
// end of stream, that means we should try to eof
match encoder.eof() {
match encoder.end() {
Ok(Some(end)) => Writing::Ending(Cursor::new(end)),
Ok(None) => Writing::KeepAlive,
Err(_not_eof) => Writing::Closed,
Expand Down Expand Up @@ -701,6 +735,7 @@ struct State<B, K> {
read_task: Option<Task>,
reading: Reading,
writing: Writing<B>,
version: Version,
}

#[derive(Debug)]
Expand Down Expand Up @@ -819,6 +854,14 @@ impl<B, K: KeepAlive> State<B, K> {
self.keep_alive.disable();
}

fn wants_keep_alive(&self) -> bool {
if let KA::Disabled = self.keep_alive.status() {
false
} else {
true
}
}

fn try_keep_alive(&mut self) {
match (&self.reading, &self.writing) {
(&Reading::KeepAlive, &Writing::KeepAlive) => {
Expand Down Expand Up @@ -881,6 +924,12 @@ impl<B, K: KeepAlive> State<B, K> {
}
}

#[derive(Debug, Clone, Copy)]
enum Version {
Http10,
Http11,
}

// The DebugFrame and DebugChunk are simple Debug implementations that allow
// us to dump the frame into logs, without logging the entirety of the bytes.
#[cfg(feature = "tokio-proto")]
Expand Down
19 changes: 18 additions & 1 deletion src/proto/h1/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ enum Kind {
///
/// Enforces that the body is not longer than the Content-Length header.
Length(u64),
/// An Encoder for when neither Content-Length nore Chunked encoding is set.
///
/// This is mostly only used with HTTP/1.0 with a length. This kind requires
/// the connection to be closed when the body is finished.
Eof
}

impl Encoder {
Expand All @@ -32,6 +37,12 @@ impl Encoder {
}
}

pub fn eof() -> Encoder {
Encoder {
kind: Kind::Eof,
}
}

pub fn is_eof(&self) -> bool {
match self.kind {
Kind::Length(0) |
Expand All @@ -40,7 +51,7 @@ impl Encoder {
}
}

pub fn eof(&self) -> Result<Option<&'static [u8]>, NotEof> {
pub fn end(&self) -> Result<Option<&'static [u8]>, NotEof> {
match self.kind {
Kind::Length(0) => Ok(None),
Kind::Chunked(Chunked::Init) => Ok(Some(b"0\r\n\r\n")),
Expand Down Expand Up @@ -73,6 +84,12 @@ impl Encoder {
trace!("encoded {} bytes, remaining = {}", n, remaining);
Ok(n)
},
Kind::Eof => {
if msg.is_empty() {
return Ok(0);
}
w.write_atomic(&[msg])
}
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions src/proto/h1/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use proto::{MessageHead, RawStatus, Http1Transaction, ParseResult,
use proto::h1::{Encoder, Decoder, date};
use method::Method;
use status::StatusCode;
use version::HttpVersion::{Http10, Http11};
use version::HttpVersion::{self, Http10, Http11};

const MAX_HEADERS: usize = 100;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
Expand Down Expand Up @@ -166,7 +166,7 @@ impl ServerTransaction {
};

if has_body && can_have_body {
set_length(&mut head.headers)
set_length(head.version, &mut head.headers)
} else {
head.headers.remove::<TransferEncoding>();
if can_have_body {
Expand Down Expand Up @@ -302,7 +302,7 @@ impl Http1Transaction for ClientTransaction {
impl ClientTransaction {
fn set_length(head: &mut RequestHead, has_body: bool) -> Encoder {
if has_body {
set_length(&mut head.headers)
set_length(head.version, &mut head.headers)
} else {
head.headers.remove::<ContentLength>();
head.headers.remove::<TransferEncoding>();
Expand All @@ -311,12 +311,12 @@ impl ClientTransaction {
}
}

fn set_length(headers: &mut Headers) -> Encoder {
fn set_length(version: HttpVersion, headers: &mut Headers) -> Encoder {
let len = headers.get::<header::ContentLength>().map(|n| **n);

if let Some(len) = len {
Encoder::length(len)
} else {
} else if version == Http11 {
let encodings = match headers.get_mut::<header::TransferEncoding>() {
Some(&mut header::TransferEncoding(ref mut encodings)) => {
if encodings.last() != Some(&header::Encoding::Chunked) {
Expand All @@ -331,6 +331,9 @@ fn set_length(headers: &mut Headers) -> Encoder {
headers.set(header::TransferEncoding(vec![header::Encoding::Chunked]));
}
Encoder::chunked()
} else {
headers.remove::<TransferEncoding>();
Encoder::eof()
}
}

Expand Down
Loading

0 comments on commit 36e66a5

Please sign in to comment.