Skip to content

Commit

Permalink
[git-packetline] Frame for async sideband
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed May 17, 2021
1 parent 88b8bc3 commit adc365e
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 14 deletions.
3 changes: 1 addition & 2 deletions git-packetline/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! Read and write the git packet line wire format without copying it.
//!
//! For reading the packet line format use the [`StreamingPeekableIter`], and for writing the `Writer`.
#![forbid(unsafe_code)]
#![deny(rust_2018_idioms, missing_docs)]
#![deny(unsafe_code, rust_2018_idioms, missing_docs)]

pub(crate) const U16_HEX_BYTES: usize = 4;
pub(crate) const MAX_DATA_LEN: usize = 65516;
Expand Down
9 changes: 8 additions & 1 deletion git-packetline/src/read/async_io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{decode, PacketLine, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES};
use crate::{decode, read::WithSidebands, PacketLine, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES};
use bstr::ByteSlice;
use futures_io::AsyncRead;
use futures_lite::AsyncReadExt;
Expand Down Expand Up @@ -138,4 +138,11 @@ where
Some(Ok(Ok(crate::decode(&self.peek_buf).expect("only valid data here"))))
}
}

/// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
///
/// Due to the preconfigured function type this method can be called without 'turbofish'.
pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8])> {
WithSidebands::new(self)
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use crate::{decode, PacketLine, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES};
use crate::{decode, read::WithSidebands, PacketLine, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES};
use bstr::ByteSlice;
use std::io;

mod sidebands;
pub use sidebands::WithSidebands;

type ExhaustiveOutcome<'a> = (
bool, // is_done
Option<PacketLine<'static>>, // stopped_at
Expand Down
5 changes: 3 additions & 2 deletions git-packetline/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ impl<T> StreamingPeekableIter<T> {

#[cfg(feature = "blocking-io")]
mod blocking_io;
#[cfg(feature = "blocking-io")]
pub use blocking_io::WithSidebands;

#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
mod async_io;

mod sidebands;
pub use sidebands::WithSidebands;
123 changes: 123 additions & 0 deletions git-packetline/src/read/sidebands/async_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use crate::{PacketLine, StreamingPeekableIter};
use futures_io::{AsyncBufRead, AsyncRead};
use std::pin::Pin;
use std::task::{Context, Poll};

/// An implementor of [`AsyncBufRead`] yielding packet lines on each call to [`read_line()`][AsyncBufRead::read_line()].
/// It's also possible to hide the underlying packet lines using the [`Read`][AsyncRead] implementation which is useful
/// if they represent binary data, like the one of a pack file.
pub struct WithSidebands<'a, T, F>
where
T: AsyncRead,
{
parent: &'a mut StreamingPeekableIter<T>,
handle_progress: Option<F>,
pos: usize,
cap: usize,
}

impl<'a, T, F> Drop for WithSidebands<'a, T, F>
where
T: AsyncRead,
{
fn drop(&mut self) {
self.parent.reset();
}
}

impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8])>
where
T: AsyncRead,
{
/// Create a new instance with the given provider as `parent`.
pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self {
WithSidebands {
parent,
handle_progress: None,
pos: 0,
cap: 0,
}
}
}

impl<'a, T, F> WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]),
{
/// Create a new instance with the given `parent` provider and the `handle_progress` function.
///
/// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool`
/// being true in case the `text` is to be interpreted as error.
pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self {
WithSidebands {
parent,
handle_progress: Some(handle_progress),
pos: 0,
cap: 0,
}
}

/// Create a new instance without a progress handler.
pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self {
WithSidebands {
parent,
handle_progress: None,
pos: 0,
cap: 0,
}
}

/// Forwards to the parent [StreamingPeekableIter::reset_with()]
pub fn reset_with(&mut self, delimiters: &'static [PacketLine<'static>]) {
self.parent.reset_with(delimiters)
}

/// Forwards to the parent [StreamingPeekableIter::stopped_at()]
pub fn stopped_at(&self) -> Option<PacketLine<'static>> {
self.parent.stopped_at
}

/// Set or unset the progress handler.
pub fn set_progress_handler(&mut self, handle_progress: Option<F>) {
self.handle_progress = handle_progress;
}

/// Effectively forwards to the parent [StreamingPeekableIter::peek_line()], allowing to see what would be returned
/// next on a call to [`read_line()`][io::BufRead::read_line()].
pub async fn peek_data_line(&mut self) -> Option<std::io::Result<Result<&[u8], crate::decode::Error>>> {
match self.parent.peek_line().await {
Some(Ok(Ok(crate::PacketLine::Data(line)))) => Some(Ok(Ok(line))),
Some(Ok(Err(err))) => Some(Ok(Err(err))),
Some(Err(err)) => Some(Err(err)),
_ => None,
}
}
}

impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F>
where
T: AsyncRead,
F: FnMut(bool, &[u8]),
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
todo!("poll fill buf")
}

fn consume(self: Pin<&mut Self>, amt: usize) {
// SAFETY: self isn't moved
#[allow(unsafe_code)]
let this = unsafe { self.get_unchecked_mut() };
this.pos = std::cmp::min(this.pos + amt, this.cap);
}
}

impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F>
where
T: AsyncRead,
F: FnMut(bool, &[u8]),
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
todo!("poll read")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ use std::io;
/// An implementor of [`BufRead`][io::BufRead] yielding packet lines on each call to [`read_line()`][io::BufRead::read_line()].
/// It's also possible to hide the underlying packet lines using the [`Read`][io::Read] implementation which is useful
/// if they represent binary data, like the one of a pack file.
///
/// # Performance Notice
/// Reading from this intermediary copies bytes 3 times:
/// OS -> (parent) line provider buffer -> our buffer -> caller's output buffer
/// which won't make this very efficient for huge bandwidths.
pub struct WithSidebands<'a, T, F>
where
T: io::Read,
Expand Down
9 changes: 9 additions & 0 deletions git-packetline/src/read/sidebands/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#[cfg(feature = "blocking-io")]
mod blocking_io;
#[cfg(feature = "blocking-io")]
pub use blocking_io::WithSidebands;

#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
mod async_io;
#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
pub use async_io::WithSidebands;
1 change: 1 addition & 0 deletions git-packetline/tests/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod sideband;
pub mod streaming_peek_iter {
use bstr::ByteSlice;
use git_packetline::PacketLine;
Expand Down
38 changes: 38 additions & 0 deletions git-packetline/tests/read/sideband.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
use futures_lite::io::AsyncReadExt;
use git_packetline::PacketLine;
#[cfg(feature = "blocking-io")]
use std::io::Read;

#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn handling_of_err_lines() {
let input = b"0009ERR e0009ERR x0000";
let mut rd = git_packetline::StreamingPeekableIter::new(&input[..], &[]);
rd.fail_on_err_lines(true);
let mut buf = [0u8; 2];
let mut reader = rd.as_read();
let res = reader.read(buf.as_mut()).await;
assert_eq!(
res.unwrap_err().to_string(),
"e",
"it respects errors and passes them on"
);
let res = reader.read(buf.as_mut()).await;
assert_eq!(
res.expect("read to succeed - EOF"),
0,
"it stops reading after an error despite there being more to read"
);
reader.reset_with(&[PacketLine::Flush]);
let res = reader.read(buf.as_mut()).await;
assert_eq!(
res.unwrap_err().to_string(),
"x",
"after a reset it continues reading, but retains the 'fail_on_err_lines' setting"
);
assert_eq!(
reader.stopped_at(),
None,
"An error can also be the reason, which is not distinguishable from an EOF"
);
}

0 comments on commit adc365e

Please sign in to comment.