From 709b6027c3d4406e895cc5c733d73a1e9b361c7b Mon Sep 17 00:00:00 2001 From: itowlson Date: Mon, 22 Nov 2021 16:13:35 +1300 Subject: [PATCH] Added a test for streaming --- src/stream_writer.rs | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/stream_writer.rs b/src/stream_writer.rs index 7b6e23a..36f3e5e 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -33,7 +33,7 @@ impl StreamWriter { Ok(()) }, Err(e) => - Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e)) + Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e)) }; // This was meant to wake up listener threads when there was new data but it ended up // just stalling until input was complete. TODO: investigate so we can get rid of the @@ -181,6 +181,8 @@ fn split_at_two_newlines(source: &[u8]) -> Option<(Vec, Vec)> { #[cfg(test)] mod test { + use futures::StreamExt; + use super::*; #[test] @@ -205,4 +207,31 @@ mod test { assert_eq!(vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a], result.0); assert!(result.1.is_empty()); } + + #[tokio::test] + async fn streaming_splits_out_headers() { + let mut sw = StreamWriter::new(); + let mut sw2 = sw.clone(); + tokio::spawn(async move { + write!(sw2, "Header 1\n").unwrap(); + write!(sw2, "Header 2\n").unwrap(); + write!(sw2, "\n").unwrap(); + write!(sw2, "Body 1\n").unwrap(); + write!(sw2, "Body 2\n").unwrap(); + sw2.done().unwrap(); + }); + let header = sw.header_block().await.unwrap(); + let header_text = String::from_utf8(header).unwrap(); + assert!(header_text.contains("Header 1\n")); + assert!(header_text.contains("Header 2\n")); + + let mut stm = Box::pin(sw.as_stream()); + let mut body = vec![]; + while let Some(Ok(v)) = stm.next().await { + body.extend_from_slice(&v); + } + let body_text = String::from_utf8(body).unwrap(); + assert!(body_text.contains("Body 1\n")); + assert!(body_text.contains("Body 2\n")); + } }