Skip to content

Commit

Permalink
Merge pull request #507 from hatoo/refactor-workqueue
Browse files Browse the repository at this point in the history
refactor work queue
  • Loading branch information
hatoo committed May 29, 2024
2 parents fc2021d + da69cf7 commit 5171ee2
Showing 1 changed file with 39 additions and 36 deletions.
75 changes: 39 additions & 36 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,42 +942,41 @@ pub async fn work_with_qps(
) {
let (tx, rx) = flume::unbounded();

match query_limit {
QueryLimit::Qps(qps) => {
tokio::spawn(async move {
let work_queue = async move {
match query_limit {
QueryLimit::Qps(qps) => {
let start = std::time::Instant::now();
for i in 0..n_tasks {
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
tx.send_async(()).await.unwrap();
tx.send(())?;
}
// tx gone
});
}
QueryLimit::Burst(duration, rate) => {
tokio::spawn(async move {
}
QueryLimit::Burst(duration, rate) => {
let mut n = 0;
// Handle via rate till n_tasks out of bound
while n + rate < n_tasks {
tokio::time::sleep(duration).await;
for _ in 0..rate {
tx.send_async(()).await.unwrap();
tx.send(())?;
}
n += rate;
}
// Handle the remaining tasks
if n_tasks > n {
tokio::time::sleep(duration).await;
for _ in 0..n_tasks - n {
tx.send_async(()).await.unwrap();
tx.send(())?;
}
}
// tx gone
});
}
}
}
// tx gone
drop(tx);
Ok::<(), flume::SendError<_>>(())
};

let client = Arc::new(client);

Expand Down Expand Up @@ -1047,6 +1046,8 @@ pub async fn work_with_qps(
})
})
.collect::<Vec<_>>();

work_queue.await.unwrap();
for f in futures {
let _ = f.await;
}
Expand All @@ -1069,6 +1070,8 @@ pub async fn work_with_qps(
})
})
.collect::<Vec<_>>();

work_queue.await.unwrap();
for f in futures {
let _ = f.await;
}
Expand All @@ -1086,29 +1089,26 @@ pub async fn work_with_qps_latency_correction(
) {
let (tx, rx) = flume::unbounded();

match query_limit {
QueryLimit::Qps(qps) => {
tokio::spawn(async move {
let work_queue = async move {
match query_limit {
QueryLimit::Qps(qps) => {
let start = std::time::Instant::now();
for i in 0..n_tasks {
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
tx.send_async(std::time::Instant::now()).await.unwrap();
tx.send(std::time::Instant::now())?;
}
// tx gone
});
}
QueryLimit::Burst(duration, rate) => {
tokio::spawn(async move {
}
QueryLimit::Burst(duration, rate) => {
let mut n = 0;
// Handle via rate till n_tasks out of bound
while n + rate < n_tasks {
tokio::time::sleep(duration).await;
let now = std::time::Instant::now();
for _ in 0..rate {
tx.send_async(now).await.unwrap();
tx.send(now)?;
}
n += rate;
}
Expand All @@ -1117,13 +1117,16 @@ pub async fn work_with_qps_latency_correction(
tokio::time::sleep(duration).await;
let now = std::time::Instant::now();
for _ in 0..n_tasks - n {
tx.send_async(now).await.unwrap();
tx.send(now)?;
}
}
// tx gone
});
}
}
}

// tx gone
drop(tx);
Ok::<(), flume::SendError<_>>(())
};

let client = Arc::new(client);

Expand Down Expand Up @@ -1193,6 +1196,8 @@ pub async fn work_with_qps_latency_correction(
})
})
.collect::<Vec<_>>();

work_queue.await.unwrap();
for f in futures {
let _ = f.await;
}
Expand All @@ -1216,6 +1221,8 @@ pub async fn work_with_qps_latency_correction(
})
})
.collect::<Vec<_>>();

work_queue.await.unwrap();
for f in futures {
let _ = f.await;
}
Expand Down Expand Up @@ -1383,9 +1390,7 @@ pub async fn work_until_with_qps(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
if tx.send_async(()).await.is_err() {
break;
}
let _ = tx.send(());
}
// tx gone
});
Expand All @@ -1402,7 +1407,7 @@ pub async fn work_until_with_qps(

tokio::time::sleep(duration).await;
for _ in 0..rate {
tx.send_async(()).await.unwrap();
let _ = tx.send(());
}
}
// tx gone
Expand Down Expand Up @@ -1563,9 +1568,7 @@ pub async fn work_until_with_qps_latency_correction(
if now > dead_line {
break;
}
if tx.send_async(now).await.is_err() {
break;
}
let _ = tx.send(now);
}
// tx gone
});
Expand All @@ -1581,7 +1584,7 @@ pub async fn work_until_with_qps_latency_correction(
}

for _ in 0..rate {
tx.send_async(now).await.unwrap();
let _ = tx.send(now);
}
}
// tx gone
Expand Down

0 comments on commit 5171ee2

Please sign in to comment.