Skip to content

Commit

Permalink
feat(client): add method to end a chunked body for a Request
Browse files Browse the repository at this point in the history
Closes #831
  • Loading branch information
seanmonstar committed Jun 17, 2016
1 parent 1b4f857 commit c856de0
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 36 deletions.
35 changes: 20 additions & 15 deletions src/http/conn.rs
Expand Up @@ -646,21 +646,14 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
_ => Reading::Closed,
};
let writing = match http1.writing {
Writing::Ready(ref encoder) if encoder.is_eof() => {
if http1.keep_alive {
Writing::KeepAlive
} else {
Writing::Closed
}
},
Writing::Ready(encoder) => {
if encoder.is_eof() {
if http1.keep_alive {
Writing::KeepAlive
} else {
Writing::Closed
}
} else if let Some(buf) = encoder.end() {
} else if let Some(buf) = encoder.finish() {
Writing::Chunk(Chunk {
buf: buf.bytes,
pos: buf.pos,
Expand All @@ -680,7 +673,7 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
} else {
Writing::Closed
}
} else if let Some(buf) = encoder.end() {
} else if let Some(buf) = encoder.finish() {
Writing::Chunk(Chunk {
buf: buf.bytes,
pos: buf.pos,
Expand Down Expand Up @@ -719,14 +712,26 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
};

http1.writing = match http1.writing {
Writing::Ready(encoder) => if encoder.is_eof() {
if http1.keep_alive {
Writing::KeepAlive
Writing::Ready(encoder) => {
if encoder.is_eof() {
if http1.keep_alive {
Writing::KeepAlive
} else {
Writing::Closed
}
} else if encoder.is_closed() {
if let Some(buf) = encoder.finish() {
Writing::Chunk(Chunk {
buf: buf.bytes,
pos: buf.pos,
next: (h1::Encoder::length(0), Next::wait())
})
} else {
Writing::Closed
}
} else {
Writing::Closed
Writing::Wait(encoder)
}
} else {
Writing::Wait(encoder)
},
Writing::Chunk(chunk) => if chunk.is_written() {
Writing::Wait(chunk.next.0)
Expand Down
26 changes: 19 additions & 7 deletions src/http/h1/encode.rs
Expand Up @@ -8,7 +8,8 @@ use http::internal::{AtomicWrite, WriteBuf};
#[derive(Debug, Clone)]
pub struct Encoder {
kind: Kind,
prefix: Prefix, //Option<WriteBuf<Vec<u8>>>
prefix: Prefix,
is_closed: bool,
}

#[derive(Debug, PartialEq, Clone)]
Expand All @@ -25,14 +26,16 @@ impl Encoder {
pub fn chunked() -> Encoder {
Encoder {
kind: Kind::Chunked(Chunked::Init),
prefix: Prefix(None)
prefix: Prefix(None),
is_closed: false,
}
}

pub fn length(len: u64) -> Encoder {
Encoder {
kind: Kind::Length(len),
prefix: Prefix(None)
prefix: Prefix(None),
is_closed: false,
}
}

Expand All @@ -51,7 +54,16 @@ impl Encoder {
}
}

pub fn end(self) -> Option<WriteBuf<Cow<'static, [u8]>>> {
/// User has called `encoder.close()` in a `Handler`.
pub fn is_closed(&self) -> bool {
self.is_closed
}

pub fn close(&mut self) {
self.is_closed = true;
}

pub fn finish(self) -> Option<WriteBuf<Cow<'static, [u8]>>> {
let trailer = self.trailer();
let buf = self.prefix.0;

Expand Down Expand Up @@ -335,7 +347,7 @@ mod tests {
use mock::{Async, Buf};

#[test]
fn test_write_chunked_sync() {
fn test_chunked_encode_sync() {
let mut dst = Buf::new();
let mut encoder = Encoder::chunked();

Expand All @@ -346,7 +358,7 @@ mod tests {
}

#[test]
fn test_write_chunked_async() {
fn test_chunked_encode_async() {
let mut dst = Async::new(Buf::new(), 7);
let mut encoder = Encoder::chunked();

Expand All @@ -360,7 +372,7 @@ mod tests {
}

#[test]
fn test_write_sized() {
fn test_sized_encode() {
let mut dst = Buf::new();
let mut encoder = Encoder::length(8);
encoder.encode(&mut dst, b"foo bar").unwrap();
Expand Down
18 changes: 17 additions & 1 deletion src/http/mod.rs
Expand Up @@ -72,6 +72,7 @@ impl<'a, T: Read> Decoder<'a, T> {
Decoder(DecoderImpl::H1(decoder, transport))
}


/// Get a reference to the transport.
pub fn get_ref(&self) -> &T {
match self.0 {
Expand All @@ -85,6 +86,17 @@ impl<'a, T: Transport> Encoder<'a, T> {
Encoder(EncoderImpl::H1(encoder, transport))
}

/// Closes an encoder, signaling that no more writing will occur.
///
/// This is needed for encodings that don't know length of the content
/// beforehand. Most common instance would be usage of
/// `Transfer-Enciding: chunked`. You would call `close()` to signal
/// the `Encoder` should write the end chunk, or `0\r\n\r\n`.
pub fn close(&mut self) {
match self.0 {
EncoderImpl::H1(ref mut encoder, _) => encoder.close()
}
}

/// Get a reference to the transport.
pub fn get_ref(&self) -> &T {
Expand Down Expand Up @@ -113,7 +125,11 @@ impl<'a, T: Transport> Write for Encoder<'a, T> {
}
match self.0 {
EncoderImpl::H1(ref mut encoder, ref mut transport) => {
encoder.encode(*transport, data)
if encoder.is_closed() {
Ok(0)
} else {
encoder.encode(*transport, data)
}
}
}
}
Expand Down

0 comments on commit c856de0

Please sign in to comment.