Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ tokio = { version = "1", features = [
] }
tokio-test = "0.4"
tokio-util = "0.7.10"
tracing-subscriber = "0.3"

[features]
# Nothing by default
Expand Down Expand Up @@ -243,7 +242,7 @@ required-features = ["full"]
[[test]]
name = "ready_stream"
path = "tests/ready_stream.rs"
required-features = ["full", "tracing"]
required-features = ["full"]

[[test]]
name = "server"
Expand Down
29 changes: 7 additions & 22 deletions tests/ready_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tokio::sync::mpsc;
use tracing::{error, info};

pin_project! {
#[derive(Debug)]
Expand Down Expand Up @@ -139,7 +138,7 @@ impl Write for TxReadyStream {
Poll::Ready(Ok(len))
}
Err(_) => {
error!("ReadyStream::poll_write failed - channel closed");
println!("ReadyStream::poll_write failed - channel closed");
Poll::Ready(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"Write channel closed",
Expand All @@ -165,7 +164,7 @@ impl Write for TxReadyStream {

// Abort the panic task if it exists
if let Some(task) = self.panic_task.take() {
info!("Task polled to completion. Aborting panic (aka waker stand-in task).");
println!("Task polled to completion. Aborting panic (aka waker stand-in task).");
task.abort();
}

Expand All @@ -178,32 +177,18 @@ impl Write for TxReadyStream {
}
}

fn init_tracing() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(true)
.with_thread_ids(true)
.with_thread_names(true)
.init();
});
}

const TOTAL_CHUNKS: usize = 16;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn body_test() {
init_tracing();
// Create a pair of connected streams
let (server_stream, mut client_stream) = TxReadyStream::new_pair();

let mut http_builder = http1::Builder::new();
http_builder.max_buf_size(CHUNK_SIZE);
const CHUNK_SIZE: usize = 64 * 1024;
let service = service_fn(|_| async move {
info!(
println!(
"Creating payload of {} chunks of {} KiB each ({} MiB total)...",
TOTAL_CHUNKS,
CHUNK_SIZE / 1024,
Expand All @@ -216,7 +201,7 @@ async fn body_test() {
.map(|b| Ok::<_, Infallible>(Frame::data(b))),
);
let body = StreamBody::new(stream);
info!("Server: Sending data response...");
println!("Server: Sending data response...");
Ok::<_, hyper::Error>(
Response::builder()
.status(StatusCode::OK)
Expand All @@ -230,20 +215,20 @@ async fn body_test() {
let server_task = tokio::spawn(async move {
let conn = http_builder.serve_connection(server_stream, service);
if let Err(e) = conn.await {
error!("Server connection error: {}", e);
println!("Server connection error: {}", e);
}
});

let get_request = "GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n";
client_stream.send(get_request.as_bytes()).unwrap();

info!("Client is reading response...");
println!("Client is reading response...");
let mut bytes_received = 0;
while let Some(chunk) = client_stream.recv().await {
bytes_received += chunk.len();
}
// Clean up
server_task.abort();

info!(bytes_received, "Client done receiving bytes");
println!("Client done receiving bytes: {}", bytes_received);
}