-
-
Notifications
You must be signed in to change notification settings - Fork 269
/
util.rs
73 lines (61 loc) · 1.87 KB
/
util.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use h2;
use bytes::{BufMut, Bytes};
use futures::ready;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub fn byte_str(s: &str) -> h2::frame::BytesStr {
h2::frame::BytesStr::try_from(Bytes::copy_from_slice(s.as_bytes())).unwrap()
}
pub async fn concat(mut body: h2::RecvStream) -> Result<Bytes, h2::Error> {
let mut vec = Vec::new();
while let Some(chunk) = body.data().await {
vec.put(chunk?);
}
Ok(vec.into())
}
pub async fn yield_once() {
let mut yielded = false;
futures::future::poll_fn(move |cx| {
if yielded {
Poll::Ready(())
} else {
yielded = true;
cx.waker().clone().wake();
Poll::Pending
}
})
.await;
}
/// Should only be called after a non-0 capacity was requested for the stream.
pub fn wait_for_capacity(stream: h2::SendStream<Bytes>, target: usize) -> WaitForCapacity {
WaitForCapacity {
stream: Some(stream),
target: target,
}
}
pub struct WaitForCapacity {
stream: Option<h2::SendStream<Bytes>>,
target: usize,
}
impl WaitForCapacity {
fn stream(&mut self) -> &mut h2::SendStream<Bytes> {
self.stream.as_mut().unwrap()
}
}
impl Future for WaitForCapacity {
type Output = h2::SendStream<Bytes>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let _ = ready!(self.stream().poll_capacity(cx)).unwrap();
let act = self.stream().capacity();
// If a non-0 capacity was requested for the stream before calling
// wait_for_capacity, then poll_capacity should return Pending
// until there is a non-0 capacity.
assert_ne!(act, 0);
if act >= self.target {
return Poll::Ready(self.stream.take().unwrap().into());
}
}
}
}