Skip to content

Commit

Permalink
feat(transport): add Encode and Decode traits
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
  • Loading branch information
rvolosatovs committed Jun 24, 2024
1 parent b7a41eb commit 948eae7
Show file tree
Hide file tree
Showing 5 changed files with 1,423 additions and 690 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 15 additions & 14 deletions crates/transport-quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl<P: AsRef<[Option<usize>]>> FromIterator<P> for IndexTree {
}

impl IndexTree {
#[instrument(level = "trace", skip_all)]
#[instrument(level = "trace", skip(self))]
fn take_rx(&mut self, path: &[usize]) -> Option<oneshot::Receiver<RecvStream>> {
let Some((i, path)) = path.split_first() else {
return match self {
Expand Down Expand Up @@ -163,7 +163,7 @@ impl IndexTree {
}
}

#[instrument(level = "trace", skip_all)]
#[instrument(level = "trace", skip(self))]
fn take_tx(&mut self, path: &[usize]) -> Option<oneshot::Sender<RecvStream>> {
let Some((i, path)) = path.split_first() else {
return match self {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl IndexTree {

/// Inserts `sender` and `receiver` under a `path` - returns `false` if it failed and `true` if it succeeded.
/// Tree state after `false` is returned is undefined
#[instrument(level = "trace", skip_all)]
#[instrument(level = "trace", skip(self, sender, receiver), ret)]
fn insert(
&mut self,
path: &[Option<usize>],
Expand Down Expand Up @@ -244,8 +244,9 @@ impl IndexTree {
true
}
(_, _, [Some(i), path @ ..]) => {
if nested.len() < *i {
nested.resize_with(i.saturating_add(1), Option::default);
let cap = i.saturating_add(1);
if nested.len() < cap {
nested.resize_with(cap, Option::default);
}
let nested = &mut nested[*i];
if let Some(nested) = nested {
Expand Down Expand Up @@ -406,7 +407,7 @@ impl wrpc_transport::Index<Self> for Incoming {
}

impl AsyncRead for Incoming {
#[instrument(level = "trace", skip_all)]
#[instrument(level = "trace", skip_all, ret)]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -486,7 +487,7 @@ impl wrpc_transport::Index<Self> for Outgoing {
}

impl AsyncWrite for Outgoing {
#[instrument(level = "trace", skip_all, fields(?buf))]
#[instrument(level = "trace", skip_all, ret, fields(buf = format!("{buf:02x?}")))]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -511,7 +512,7 @@ impl AsyncWrite for Outgoing {
..
} => {
while !header.is_empty() {
trace!(?header, "writing header");
trace!(header = format!("{header:02x?}"), "writing header");
let n = ready!(AsyncWrite::poll_write(tx.as_mut(), cx, header))?;
if n < header.len() {
header.advance(n);
Expand All @@ -525,7 +526,7 @@ impl AsyncWrite for Outgoing {
}
}

#[instrument(level = "trace", skip_all)]
#[instrument(level = "trace", skip_all, ret)]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match self.as_mut().project() {
OutgoingProj::Opening { .. } => Poll::Ready(Ok(())),
Expand All @@ -536,7 +537,7 @@ impl AsyncWrite for Outgoing {
}
}

#[instrument(level = "trace", skip_all)]
#[instrument(level = "trace", skip_all, ret)]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match self.as_mut().project() {
OutgoingProj::Opening { .. } => Poll::Ready(Ok(())),
Expand Down Expand Up @@ -566,13 +567,13 @@ impl Session {
conn: conn.clone(),
incoming,
outgoing,
reader: spawn(demux_connection(index, conn)),
reader: spawn(demux_connection(index, conn).in_current_span()),
}
}
}

impl wrpc_transport::Session for Session {
#[instrument(level = "trace", skip_all)]
#[instrument(level = "trace", skip_all, ret)]
async fn finish(mut self, res: Result<(), &str>) -> anyhow::Result<Result<(), String>> {
if let Err(err) = res {
let mut buf = BytesMut::with_capacity(6 + err.len());
Expand Down Expand Up @@ -731,7 +732,7 @@ impl wrpc_transport::Invoke for Client {
type Outgoing = Outgoing;
type Incoming = Incoming;

#[instrument(level = "trace", skip(self, paths))]
#[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
async fn invoke(
&self,
cx: Self::Context,
Expand Down Expand Up @@ -786,7 +787,7 @@ impl wrpc_transport::Invoke for Client {
.set_priority(1)
.context("failed to set result stream priority")?;
let index = Arc::new(std::sync::Mutex::new(paths.iter().collect()));
trace!(?params, "writing parameters");
trace!("writing parameters");
param_tx
.write_all_chunks(&mut [Bytes::from_static(&[PROTOCOL]), params])
.await
Expand Down
Loading

0 comments on commit 948eae7

Please sign in to comment.