Skip to content

Commit

Permalink
Merge #92
Browse files Browse the repository at this point in the history
92: Test and fix flushing r=Nemo157 a=Nemo157

Tests and finishes fixing issues from #91 

Co-authored-by: Wim Looman <git@nemo157.com>
  • Loading branch information
bors[bot] and Nemo157 committed May 15, 2020
2 parents 0852f42 + 84f83a5 commit e8938d9
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/futures/write/generic/decoder.rs
Expand Up @@ -151,7 +151,7 @@ impl<W: AsyncWrite, D: Decode> AsyncWrite for Decoder<W, D> {

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
ready!(self.as_mut().do_poll_flush(cx))?;
ready!(self.project().writer.as_mut().poll_close(cx))?;
ready!(self.project().writer.as_mut().poll_flush(cx))?;
Poll::Ready(Ok(()))
}

Expand Down
19 changes: 13 additions & 6 deletions tests/utils/mod.rs
@@ -1,5 +1,7 @@
#![allow(dead_code, unused_macros)] // Different tests use a different subset of functions

mod track_closed;

use bytes::Bytes;
use futures::{
io::AsyncBufRead,
Expand Down Expand Up @@ -61,6 +63,7 @@ impl From<Vec<Vec<u8>>> for InputStream {
}

pub mod prelude {
use super::track_closed::TrackClosedExt as _;
pub use async_compression::Level;
pub use bytes::Bytes;
pub use futures::{
Expand Down Expand Up @@ -109,13 +112,17 @@ pub mod prelude {
{
let mut test_writer = (&mut output)
.limited_write(limit)
.interleave_pending_write();
let mut writer = create_writer(&mut test_writer);
for chunk in input {
block_on(writer.write_all(chunk)).unwrap();
block_on(writer.flush()).unwrap();
.interleave_pending_write()
.track_closed();
{
let mut writer = create_writer(&mut test_writer);
for chunk in input {
block_on(writer.write_all(chunk)).unwrap();
block_on(writer.flush()).unwrap();
}
block_on(writer.close()).unwrap();
}
block_on(writer.close()).unwrap();
assert!(test_writer.is_closed());
}
output
}
Expand Down
63 changes: 63 additions & 0 deletions tests/utils/track_closed.rs
@@ -0,0 +1,63 @@
use core::{
pin::Pin,
task::{Context, Poll},
};
use futures::io::AsyncWrite;
use std::io::{IoSlice, Result};

pub trait TrackClosedExt: AsyncWrite {
fn track_closed(self) -> TrackClosed<Self>
where
Self: Sized + Unpin,
{
TrackClosed {
inner: self,
closed: false,
}
}
}

impl<W: AsyncWrite> TrackClosedExt for W {}

pub struct TrackClosed<W: AsyncWrite + Unpin> {
inner: W,
closed: bool,
}

impl<W: AsyncWrite + Unpin> TrackClosed<W> {
pub fn is_closed(&self) -> bool {
self.closed
}
}

impl<W: AsyncWrite + Unpin> AsyncWrite for TrackClosed<W> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
assert!(!self.closed);
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
assert!(!self.closed);
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
assert!(!self.closed);
match Pin::new(&mut self.inner).poll_close(cx) {
Poll::Ready(Ok(())) => {
self.closed = true;
Poll::Ready(Ok(()))
}
other => other,
}
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context,
bufs: &[IoSlice],
) -> Poll<Result<usize>> {
assert!(!self.closed);
Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
}
}

0 comments on commit e8938d9

Please sign in to comment.