diff --git a/src/client.rs b/src/client.rs index 2e1601a3..5508911c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -780,6 +780,52 @@ async fn setup_http2(client: &Client) -> Result<(ConnectionTime, ClientStateHttp Ok((connection_time, client_state)) } +async fn work_http2_once( + client: &Client, + client_state: &mut ClientStateHttp2, + report_tx: &flume::Sender>, + connection_time: ConnectionTime, + start_latency_correction: Option, +) -> (bool, bool) { + let mut res = client.work_http2(client_state).await; + let is_cancel = is_cancel_error(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + if let Some(start_latency_correction) = start_latency_correction { + set_start_latency_correction(&mut res, start_latency_correction); + } + report_tx.send_async(res).await.unwrap(); + (is_cancel, is_reconnect) +} + +async fn work_http2_or_acquire( + client: &Client, + client_state: &mut ClientStateHttp2, + report_tx: &flume::Sender>, + connection_time: ConnectionTime, + start_latency_correction: Option, + semaphore: &tokio::sync::Semaphore, +) -> (bool, bool) { + tokio::select! { + mut res = + client.work_http2(client_state) => { + let is_cancel = is_cancel_error(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + if let Some(start_latency_correction) = start_latency_correction { + set_start_latency_correction(&mut res, start_latency_correction); + } + report_tx.send_async(res).await.unwrap(); + (is_cancel ,is_reconnect ) + + } + _ = semaphore.acquire() => { + report_tx.send_async(Err(ClientError::Deadline)).await.unwrap(); + (true, false) + } + } +} + fn set_connection_time(res: &mut Result, connection_time: ConnectionTime) { if let Ok(res) = res { res.connection_time = Some(connection_time); @@ -828,12 +874,15 @@ pub async fn work( tokio::spawn(async move { while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_cancel_error(&res); - let is_reconnect = is_hyper_error(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); + let (is_cancel, is_reconnect) = work_http2_once( + &client, + &mut client_state, + &report_tx, + connection_time, + None, + ) + .await; + if is_cancel || is_reconnect { return is_cancel; } @@ -971,12 +1020,15 @@ pub async fn work_with_qps( let mut client_state = client_state.clone(); tokio::spawn(async move { while let Ok(()) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_cancel_error(&res); - let is_reconnect = is_hyper_error(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); + let (is_cancel, is_reconnect) = work_http2_once( + &client, + &mut client_state, + &report_tx, + connection_time, + None, + ) + .await; + if is_cancel || is_reconnect { return is_cancel; } @@ -1114,13 +1166,15 @@ pub async fn work_with_qps_latency_correction( let mut client_state = client_state.clone(); tokio::spawn(async move { while let Ok(start) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_cancel_error(&res); - let is_reconnect = is_hyper_error(&res); - set_connection_time(&mut res, connection_time); - set_start_latency_correction(&mut res, start); - report_tx.send_async(res).await.unwrap(); + let (is_cancel, is_reconnect) = work_http2_once( + &client, + &mut client_state, + &report_tx, + connection_time, + Some(start), + ) + .await; + if is_cancel || is_reconnect { return is_cancel; } @@ -1230,21 +1284,19 @@ pub async fn work_until( tokio::spawn(async move { // This is where HTTP2 loops to make all the requests for a given client and worker loop { - tokio::select! { - mut res = - client.work_http2(&mut client_state) => { - let is_cancel = is_cancel_error(&res); - let is_reconnect = is_hyper_error(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel || is_reconnect { - break is_cancel; - } - } - _ = s.acquire() => { - report_tx.send_async(Err(ClientError::Deadline)).await.unwrap(); - break true; - } + let (is_cancel, is_reconnect) = + work_http2_or_acquire( + &client, + &mut client_state, + &report_tx, + connection_time, + None, + &s, + ) + .await; + + if is_cancel || is_reconnect { + break is_cancel; } } }) @@ -1397,20 +1449,18 @@ pub async fn work_until_with_qps( let s = s.clone(); tokio::spawn(async move { while let Ok(()) = rx.recv_async().await { - tokio::select! { - mut res = - client.work_http2(&mut client_state) => { - let is_cancel = is_cancel_error(&res); - let is_reconnect = is_hyper_error(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel || is_reconnect { - return is_cancel; - } - } - _ = s.acquire() => { - return true; - } + let (is_cancel, is_reconnect) = + work_http2_or_acquire( + &client, + &mut client_state, + &report_tx, + connection_time, + None, + &s, + ) + .await; + if is_cancel || is_reconnect { + return is_cancel; } } true @@ -1574,22 +1624,18 @@ pub async fn work_until_with_qps_latency_correction( let s = s.clone(); tokio::spawn(async move { while let Ok(start) = rx.recv_async().await { - tokio::select! { - mut res = - client.work_http2(&mut client_state) => { - set_start_latency_correction(&mut res, start); - set_connection_time(&mut res, connection_time); - let is_cancel = is_cancel_error(&res); - let is_reconnect = is_hyper_error(&res); - report_tx.send_async(res).await.unwrap(); - if is_cancel || is_reconnect { - return is_cancel; - } - } - _ = s.acquire() => { - return true; - } - + let (is_cancel, is_reconnect) = + work_http2_or_acquire( + &client, + &mut client_state, + &report_tx, + connection_time, + Some(start), + &s, + ) + .await; + if is_cancel || is_reconnect { + return is_cancel; } } true