Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip empty chucks for BodyStream and SizedStream #1308

Merged
merged 6 commits into from
Jan 28, 2020
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 67 additions & 9 deletions actix-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{fmt, mem};

use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::ready;
use pin_project::{pin_project, project};

use crate::error::Error;
Expand Down Expand Up @@ -389,12 +390,19 @@ where
BodySize::Stream
}

/// Attempts to pull out the next value of the underlying [`Stream`].
///
/// Empty values are skipped to prevent [`BodyStream`]'s transmission being
/// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
unsafe { Pin::new_unchecked(self) }
.project()
.stream
.poll_next(cx)
.map(|res| res.map(|res| res.map_err(std::convert::Into::into)))
let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
loop {
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
opt => opt.map(|res| res.map_err(Into::into)),
});
}
}
}

Expand Down Expand Up @@ -424,17 +432,26 @@ where
BodySize::Sized64(self.size)
}

/// Attempts to pull out the next value of the underlying [`Stream`].
///
/// Empty values are skipped to prevent [`SizedStream`]'s transmission being
/// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
unsafe { Pin::new_unchecked(self) }
.project()
.stream
.poll_next(cx)
let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
loop {
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
val => val,
});
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::stream;
use futures_util::future::poll_fn;

impl Body {
Expand Down Expand Up @@ -589,4 +606,45 @@ mod tests {
BodySize::Sized(25)
);
}

mod body_stream {
use super::*;

#[actix_rt::test]
async fn skips_empty_chunks() {
let mut body = BodyStream::new(stream::iter(
["1", "", "2"]
.iter()
.map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>),
));
assert_eq!(
poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("1")),
);
assert_eq!(
poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("2")),
);
}
}

mod sized_stream {
use super::*;

#[actix_rt::test]
async fn skips_empty_chunks() {
let mut body = SizedStream::new(
2,
stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
);
assert_eq!(
poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("1")),
);
assert_eq!(
poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("2")),
);
}
}
}