Skip to content

Commit

Permalink
[git-packetline] first step towards state based impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed May 18, 2021
1 parent 88511f7 commit 22740c5
Showing 1 changed file with 83 additions and 62 deletions.
145 changes: 83 additions & 62 deletions git-packetline/src/read/sidebands/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ where
T: AsyncRead,
{
#[pin]
parent: Option<&'a mut StreamingPeekableIter<T>>,
state: State<'a, T>,
handle_progress: Option<F>,
read_line: Option<Pin<Box<dyn Future<Output = ReadLineResult<'a>> + 'a>>>,
pos: usize,
Expand All @@ -34,8 +34,10 @@ where
T: AsyncRead,
{
fn drop(mut self: Pin<&mut Self>) {
let mut this = self.project();
this.parent.take().map(|p| p.reset());
let this = self.project();
if let State::Idle { parent } = this.state.get_mut() {
parent.reset();
}
}
}

Expand All @@ -46,7 +48,7 @@ where
/// Create a new instance with the given provider as `parent`.
pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self {
WithSidebands {
parent: Some(parent),
state: State::Idle { parent },
handle_progress: None,
read_line: None,
pos: 0,
Expand All @@ -55,6 +57,16 @@ where
}
}

enum State<'a, T> {
Idle {
parent: &'a mut StreamingPeekableIter<T>,
},
ReadLine {
read_line: Pin<Box<dyn Future<Output = ReadLineResult<'a>> + 'a>>,
parent_inactive: *mut StreamingPeekableIter<T>,
},
}

impl<'a, T, F> WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
Expand All @@ -66,7 +78,7 @@ where
/// being true in case the `text` is to be interpreted as error.
pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self {
WithSidebands {
parent: Some(parent),
state: State::Idle { parent },
handle_progress: Some(handle_progress),
read_line: None,
pos: 0,
Expand All @@ -77,7 +89,7 @@ where
/// Create a new instance without a progress handler.
pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self {
WithSidebands {
parent: Some(parent),
state: State::Idle { parent },
handle_progress: None,
read_line: None,
pos: 0,
Expand All @@ -87,12 +99,17 @@ where

/// Forwards to the parent [StreamingPeekableIter::reset_with()]
pub fn reset_with(&mut self, delimiters: &'static [PacketLine<'static>]) {
self.parent.as_mut().unwrap().reset_with(delimiters)
if let State::Idle { ref mut parent } = self.state {
parent.reset_with(delimiters)
}
}

/// Forwards to the parent [StreamingPeekableIter::stopped_at()]
pub fn stopped_at(&self) -> Option<PacketLine<'static>> {
self.parent.as_ref().unwrap().stopped_at
match self.state {
State::Idle { ref parent } => parent.stopped_at,
_ => None,
}
}

/// Set or unset the progress handler.
Expand All @@ -103,10 +120,13 @@ where
/// Effectively forwards to the parent [StreamingPeekableIter::peek_line()], allowing to see what would be returned
/// next on a call to [`read_line()`][io::BufRead::read_line()].
pub async fn peek_data_line(&mut self) -> Option<std::io::Result<Result<&[u8], crate::decode::Error>>> {
match self.parent.as_mut().unwrap().peek_line().await {
Some(Ok(Ok(crate::PacketLine::Data(line)))) => Some(Ok(Ok(line))),
Some(Ok(Err(err))) => Some(Ok(Err(err))),
Some(Err(err)) => Some(Err(err)),
match self.state {
State::Idle { ref mut parent } => match parent.peek_line().await {
Some(Ok(Ok(crate::PacketLine::Data(line)))) => Some(Ok(Ok(line))),
Some(Ok(Err(err))) => Some(Ok(Err(err))),
Some(Err(err)) => Some(Err(err)),
_ => None,
},
_ => None,
}
}
Expand All @@ -117,56 +137,57 @@ where
T: AsyncRead + Unpin + Send,
F: FnMut(bool, &[u8]),
{
fn poll_fill_buf(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
use futures_lite::FutureExt;
use std::io;
{
let this = self.as_mut().get_mut();
if this.pos >= this.cap {
let (ofs, cap) = loop {
// todo!("poll a future based on a field of ourselves - self-ref once again");
this.read_line = Some(this.parent.take().unwrap().read_line().boxed());
let line = match ready!(this.read_line.as_mut().expect("set above").poll(_cx)) {
Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
None => break (0, 0),
};
match this.handle_progress.as_mut() {
Some(handle_progress) => {
let band = line
.decode_band()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
const ENCODED_BAND: usize = 1;
match band {
Band::Data(d) => break (U16_HEX_BYTES + ENCODED_BAND, d.len()),
Band::Progress(d) => {
let text = Text::from(d).0;
handle_progress(false, text);
}
Band::Error(d) => {
let text = Text::from(d).0;
handle_progress(true, text);
}
};
}
None => {
break match line.as_slice() {
Some(d) => (U16_HEX_BYTES, d.len()),
None => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"encountered non-data line in a data-line only context",
)))
}
}
}
}
};
this.cap = cap + ofs;
this.pos = ofs;
}
}
let range = self.pos..self.cap;
Poll::Ready(Ok(&self.get_mut().parent.as_ref().unwrap().buf[range]))
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
// use futures_lite::FutureExt;
// use std::io;
// {
// let this = self.as_mut().get_mut();
// if this.pos >= this.cap {
// let (ofs, cap) = loop {
// // todo!("poll a future based on a field of ourselves - self-ref once again");
// this.read_line = Some(this.parent.take().unwrap().read_line().boxed());
// let line = match ready!(this.read_line.as_mut().expect("set above").poll(cx)) {
// Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
// None => break (0, 0),
// };
// match this.handle_progress.as_mut() {
// Some(handle_progress) => {
// let band = line
// .decode_band()
// .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
// const ENCODED_BAND: usize = 1;
// match band {
// Band::Data(d) => break (U16_HEX_BYTES + ENCODED_BAND, d.len()),
// Band::Progress(d) => {
// let text = Text::from(d).0;
// handle_progress(false, text);
// }
// Band::Error(d) => {
// let text = Text::from(d).0;
// handle_progress(true, text);
// }
// };
// }
// None => {
// break match line.as_slice() {
// Some(d) => (U16_HEX_BYTES, d.len()),
// None => {
// return Poll::Ready(Err(io::Error::new(
// io::ErrorKind::UnexpectedEof,
// "encountered non-data line in a data-line only context",
// )))
// }
// }
// }
// }
// };
// this.cap = cap + ofs;
// this.pos = ofs;
// }
// }
// let range = self.pos..self.cap;
// Poll::Ready(Ok(&self.get_mut().parent.as_ref().unwrap().buf[range]))
todo!("all")
}

fn consume(self: Pin<&mut Self>, amt: usize) {
Expand Down

0 comments on commit 22740c5

Please sign in to comment.