Skip to content

Commit

Permalink
fix!: packet-line progress loop is now interruptible.
Browse files Browse the repository at this point in the history
WithSidebands may loop internally to consume remote progress, but that
doesn't play well if this reader is supposed to be interruptible, as
interrupt checks are implemented on the outer layer, not on the innermost one.

Now the progress handler can return a flag indicating whether to continue or
abort, which allows the caller to check for interrupts, and thus abort the inner
loop.
  • Loading branch information
Byron committed Mar 12, 2023
1 parent 531dd19 commit 92f6d32
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 23 deletions.
10 changes: 7 additions & 3 deletions gix-packetline/src/read/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bstr::ByteSlice;
use futures_io::AsyncRead;
use futures_lite::AsyncReadExt;

use crate::read::ProgressAction;
use crate::{
decode,
read::{ExhaustiveOutcome, WithSidebands},
Expand Down Expand Up @@ -150,7 +151,8 @@ where
/// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
///
/// Due to the preconfigured function type this method can be called without 'turbofish'.
pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8])> {
#[allow(clippy::type_complexity)]
pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> {
WithSidebands::new(self)
}

Expand All @@ -161,7 +163,7 @@ where
/// being true in case the `text` is to be interpreted as error.
///
/// _Please note_ that side bands need to be negotiated with the server.
pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) + Unpin>(
pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction + Unpin>(
&mut self,
handle_progress: F,
) -> WithSidebands<'_, T, F> {
Expand All @@ -172,7 +174,9 @@ where
///
/// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator.
/// Use [`as_read()`][StreamingPeekableIter::as_read()].
pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) + Unpin>(&mut self) -> WithSidebands<'_, T, F> {
pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction + Unpin>(
&mut self,
) -> WithSidebands<'_, T, F> {
WithSidebands::without_progress_handler(self)
}
}
11 changes: 8 additions & 3 deletions gix-packetline/src/read/blocking_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io;

use bstr::ByteSlice;

use crate::read::ProgressAction;
use crate::{
decode,
read::{ExhaustiveOutcome, WithSidebands},
Expand Down Expand Up @@ -146,22 +147,26 @@ where
/// being true in case the `text` is to be interpreted as error.
///
/// _Please note_ that side bands need to be negotiated with the server.
pub fn as_read_with_sidebands<F: FnMut(bool, &[u8])>(&mut self, handle_progress: F) -> WithSidebands<'_, T, F> {
pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(
&mut self,
handle_progress: F,
) -> WithSidebands<'_, T, F> {
WithSidebands::with_progress_handler(self, handle_progress)
}

/// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
///
/// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator.
/// Use [`as_read()`][StreamingPeekableIter::as_read()].
pub fn as_read_without_sidebands<F: FnMut(bool, &[u8])>(&mut self) -> WithSidebands<'_, T, F> {
pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(&mut self) -> WithSidebands<'_, T, F> {
WithSidebands::without_progress_handler(self)
}

/// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
///
/// Due to the preconfigured function type this method can be called without 'turbofish'.
pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8])> {
#[allow(clippy::type_complexity)]
pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> {
WithSidebands::new(self)
}
}
9 changes: 9 additions & 0 deletions gix-packetline/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
use crate::MAX_LINE_LEN;
use crate::{PacketLineRef, StreamingPeekableIter, U16_HEX_BYTES};

/// Allow the read-progress handler to determine how to continue.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ProgressAction {
/// Continue reading the next progress if available.
Continue,
/// Abort all IO even if more would be available, claiming the operation was interrupted.
Interrupt,
}

#[cfg(any(feature = "blocking-io", feature = "async-io"))]
type ExhaustiveOutcome<'a> = (
bool, // is_done
Expand Down
33 changes: 25 additions & 8 deletions gix-packetline/src/read/sidebands/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
use futures_io::{AsyncBufRead, AsyncRead};
use futures_lite::ready;

use crate::read::ProgressAction;
use crate::{decode, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES};

type ReadLineResult<'a> = Option<std::io::Result<Result<PacketLineRef<'a>, decode::Error>>>;
Expand Down Expand Up @@ -37,7 +38,7 @@ where
}
}

impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8])>
impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction>
where
T: AsyncRead,
{
Expand Down Expand Up @@ -93,7 +94,7 @@ mod tests {
impl<'a, T, F> WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
/// Create a new instance with the given `parent` provider and the `handle_progress` function.
///
Expand Down Expand Up @@ -201,7 +202,7 @@ pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> {
impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
type Output = std::io::Result<usize>;

Expand All @@ -228,7 +229,7 @@ pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> {
impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
type Output = std::io::Result<usize>;

Expand All @@ -251,7 +252,7 @@ where
impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
use std::io;
Expand Down Expand Up @@ -310,11 +311,27 @@ where
}
BandRef::Progress(d) => {
let text = TextRef::from(d).0;
handle_progress(false, text);
match handle_progress(false, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Poll::Ready(Err(io::Error::new(
std::io::ErrorKind::Other,
"interrupted by user",
)))
}
};
}
BandRef::Error(d) => {
let text = TextRef::from(d).0;
handle_progress(true, text);
match handle_progress(true, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"interrupted by user",
)))
}
};
}
};
}
Expand Down Expand Up @@ -353,7 +370,7 @@ where
impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
let nread = {
Expand Down
29 changes: 23 additions & 6 deletions gix-packetline/src/read/sidebands/blocking_io.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{io, io::BufRead};

use crate::read::ProgressAction;
use crate::{BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES};

/// An implementor of [`BufRead`][io::BufRead] yielding packet lines on each call to [`read_line()`][io::BufRead::read_line()].
Expand All @@ -24,7 +25,7 @@ where
}
}

impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8])>
impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction>
where
T: io::Read,
{
Expand All @@ -42,7 +43,7 @@ where
impl<'a, T, F> WithSidebands<'a, T, F>
where
T: io::Read,
F: FnMut(bool, &[u8]),
F: FnMut(bool, &[u8]) -> ProgressAction,
{
/// Create a new instance with the given `parent` provider and the `handle_progress` function.
///
Expand Down Expand Up @@ -130,7 +131,7 @@ where
impl<'a, T, F> BufRead for WithSidebands<'a, T, F>
where
T: io::Read,
F: FnMut(bool, &[u8]),
F: FnMut(bool, &[u8]) -> ProgressAction,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.pos >= self.cap {
Expand All @@ -154,11 +155,27 @@ where
}
BandRef::Progress(d) => {
let text = TextRef::from(d).0;
handle_progress(false, text);
match handle_progress(false, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"interrupted by user",
))
}
};
}
BandRef::Error(d) => {
let text = TextRef::from(d).0;
handle_progress(true, text);
match handle_progress(true, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"interrupted by user",
))
}
};
}
};
}
Expand Down Expand Up @@ -189,7 +206,7 @@ where
impl<'a, T, F> io::Read for WithSidebands<'a, T, F>
where
T: io::Read,
F: FnMut(bool, &[u8]),
F: FnMut(bool, &[u8]) -> ProgressAction,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let nread = {
Expand Down
8 changes: 5 additions & 3 deletions gix-packetline/tests/read/sideband.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bstr::{BString, ByteSlice};
#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
use futures_lite::io::AsyncReadExt;
use gix_odb::pack;
use gix_packetline::read::ProgressAction;
use gix_packetline::PacketLineRef;

use crate::read::streaming_peek_iter::fixture_bytes;
Expand Down Expand Up @@ -51,9 +52,10 @@ async fn read_pack_with_progress_extraction() -> crate::Result {
b"NAK".as_bstr()
);
let mut seen_texts = Vec::<BString>::new();
let mut do_nothing = |is_err: bool, data: &[u8]| {
let mut do_nothing = |is_err: bool, data: &[u8]| -> ProgressAction {
assert!(!is_err);
seen_texts.push(data.as_bstr().into());
ProgressAction::Continue
};
let pack_read = rd.as_read_with_sidebands(&mut do_nothing);
#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
Expand Down Expand Up @@ -132,7 +134,7 @@ async fn read_line_trait_method_reads_one_packet_line_at_a_time() -> crate::Resu

drop(r);

let mut r = rd.as_read_with_sidebands(|_, _| ());
let mut r = rd.as_read_with_sidebands(|_, _| ProgressAction::Continue);
out.clear();
r.read_line_to_string(&mut out).await?;
assert_eq!(out, "&");
Expand Down Expand Up @@ -174,7 +176,7 @@ async fn readline_reads_one_packet_line_at_a_time() -> crate::Result {

drop(r);

let mut r = rd.as_read_with_sidebands(|_, _| ());
let mut r = rd.as_read_with_sidebands(|_, _| ProgressAction::Continue);
let line = r.read_data_line().await.unwrap()??.as_bstr().unwrap();
assert_eq!(
line.as_bstr(),
Expand Down

0 comments on commit 92f6d32

Please sign in to comment.