Skip to content

Commit

Permalink
Rework streams
Browse files Browse the repository at this point in the history
  • Loading branch information
durch committed Sep 25, 2022
1 parent f9769f0 commit 8992587
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 140 deletions.
46 changes: 24 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,67 +48,69 @@ All runtimes support either `native-tls` or `rustls-tls`, there are features for
#### Buckets

| | |
|----------|------------------------------------------------------------------------------------|
| -------- | ---------------------------------------------------------------------------------- |
| `create` | [async](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.create) |
| `delete` | [async](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.delete) |

#### Presign

| | |
|-------|-----------------------------------------------------------------------------------------------|
| `PUT` | [presign_put](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.presign_put) |
| `GET` | [presign_get](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.presign_get) |
| `DELETE` | [presign_delete](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.presign_delete)|
| | |
| -------- | --------------------------------------------------------------------------------------------------- |
| `POST` | [presign_put](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.presign_post) |
| `PUT` | [presign_put](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.presign_put) |
| `GET` | [presign_get](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.presign_get) |
| `DELETE` | [presign_delete](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.presign_delete) |

#### GET

There are a few different options for getting an object. `sync` and `async` methods are generic over `std::io::Write`,
while `tokio` methods are generic over `tokio::io::AsyncWriteExt`.

| | |
|---------|-----------------------------------------------------------------------------------------------------------|
| `async/sync/async-blocking` | [get_object](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.get_object) |
| `async/sync/async-blocking` | [get_object_stream](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.get_object_stream) |
| | |
| --------------------------- | --------------------------------------------------------------------------------------------------------------- |
| `async/sync/async-blocking` | [get_object](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.get_object) |
| `async/sync/async-blocking` | [get_object_stream](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.get_object_stream) |
| `async/sync/async-blocking` | [get_object_to_writer](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.get_object_to_writer) |

#### PUT

Each `GET` method has a `PUT` companion `sync` and `async` methods are generic over `std::io::Read`. `async` `stream` methods are generic over `futures_io::AsyncReadExt`, while `tokio` methods are generic over `tokio::io::AsyncReadExt`.

| | |
|---------|---------------------------------------------------------------------------------------------------------------------------------|
| | |
| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------- |
| `async/sync/async-blocking` | [put_object](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.put_object) |
| `async/sync/async-blocking` | [put_object_with_content_type](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.put_object_with_content_type) |
| `async/sync/async-blocking` | [put_object_stream](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.put_object_stream) |

#### List

| | |
|---------|---------------------------------------------------------------------------------|
| | |
| --------------------------- | ------------------------------------------------------------------------------- |
| `async/sync/async-blocking` | [list](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.list) |

#### DELETE

| | |
|---------|---------------------------------------------------------------------------------------------------|
| | |
| --------------------------- | ------------------------------------------------------------------------------------------------- |
| `async/sync/async-blocking` | [delete_object](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.delete_object) |

#### Location

| | |
|---------|-----------------------------------------------------------------------------------------|
| | |
| --------------------------- | --------------------------------------------------------------------------------------- |
| `async/sync/async-blocking` | [location](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.location) |

#### Tagging

| | |
|---------|-------------------------------------------------------------------------------------------------------------|
| | |
| --------------------------- | ----------------------------------------------------------------------------------------------------------- |
| `async/sync/async-blocking` | [put_object_tagging](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.put_object_tagging) |
| `async/sync/async-blocking` | [get_object_tagging](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.get_object_tagging) |

#### Head

| | |
|---------|-----------------------------------------------------------------------------------------------|
| | |
| --------------------------- | --------------------------------------------------------------------------------------------- |
| `async/sync/async-blocking` | [head_object](https://docs.rs/rust-s3/latest/s3/bucket/struct.Bucket.html#method.head_object) |

### Usage (in `Cargo.toml`)
Expand Down
2 changes: 1 addition & 1 deletion s3/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rust-s3"
version = "0.33.0"
version = "0.33.0-beta1"
authors = ["Drazen Urch"]
description = "Rust library for working with Amazon S3 and compatible object storage APIs"
repository = "https://github.com/durch/rust-s3"
Expand Down
51 changes: 30 additions & 21 deletions s3/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::command::{Command, Multipart};
use crate::creds::Credentials;
use crate::region::Region;
use crate::request_trait::ResponseData;
#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
use crate::request_trait::ResponseDataStream;
use std::str::FromStr;

pub type Query = HashMap<String, String>;
Expand All @@ -30,15 +32,6 @@ use tokio::io::AsyncWrite;
use crate::blocking::AttoRequest as RequestImpl;
use std::io::Read;

#[cfg(any(feature = "with-async-std", feature = "with-tokio"))]
use bytes::Bytes;

#[cfg(feature = "with-tokio")]
use tokio_stream::Stream;

#[cfg(feature = "with-async-std")]
use futures_util::Stream;

use crate::error::S3Error;
use crate::request_trait::Request;
use crate::serde_types::{
Expand Down Expand Up @@ -786,22 +779,22 @@ impl Bucket {
/// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
///
/// // Async variant with `tokio` or `async-std` features
/// let status_code = bucket.get_object_stream("/test.file", &mut async_output_file).await?;
/// let status_code = bucket.get_object_to_writer("/test.file", &mut async_output_file).await?;
///
/// // `sync` feature will produce an identical method
/// #[cfg(feature = "sync")]
/// let status_code = bucket.get_object_stream("/test.file", &mut output_file)?;
/// let status_code = bucket.get_object_to_writer("/test.file", &mut output_file)?;
///
/// // Blocking variant, generated with `blocking` feature in combination
/// // with `tokio` or `async-std` features. Based of the async branch
/// #[cfg(feature = "blocking")]
/// let status_code = bucket.get_object_stream_blocking("/test.file", &mut async_output_file)?;
/// let status_code = bucket.get_object_to_writer_blocking("/test.file", &mut async_output_file)?;
/// #
/// # Ok(())
/// # }
/// ```
#[maybe_async::async_impl]
pub async fn get_object_stream<T: AsyncWrite + Send + Unpin, S: AsRef<str>>(
pub async fn get_object_to_writer<T: AsyncWrite + Send + Unpin, S: AsRef<str>>(
&self,
path: S,
writer: &mut T,
Expand All @@ -812,7 +805,7 @@ impl Bucket {
}

#[maybe_async::sync_impl]
pub fn get_object_stream<T: std::io::Write + Send, S: AsRef<str>>(
pub fn get_object_to_writer<T: std::io::Write + Send, S: AsRef<str>>(
&self,
path: S,
writer: &mut T,
Expand All @@ -835,7 +828,7 @@ impl Bucket {
/// #[cfg(feature = "with-tokio")]
/// use tokio::io::AsyncWriteExt;
/// #[cfg(feature = "with-async-std")]
/// use futures_util::TryStreamExt;
/// use futures_util::StreamExt;
/// #[cfg(feature = "with-async-std")]
/// use futures_util::AsyncWriteExt;
///
Expand All @@ -848,14 +841,14 @@ impl Bucket {
/// let bucket = Bucket::new(bucket_name, region, credentials)?;
/// let path = "path";
///
/// let (mut stream, status_code) = bucket.get_object_async_stream(path).await?;
/// let mut response_data_stream = bucket.get_object_stream(path).await?;
///
/// #[cfg(feature = "with-tokio")]
/// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
/// #[cfg(feature = "with-async-std")]
/// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
///
/// while let Some(chunk) = stream.try_next().await? {
/// while let Some(chunk) = response_data_stream.bytes().next().await {
/// async_output_file.write_all(&chunk).await?;
/// }
///
Expand All @@ -864,10 +857,10 @@ impl Bucket {
/// # }
/// ```
#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
pub async fn get_object_async_stream<S: AsRef<str>>(
pub async fn get_object_stream<S: AsRef<str>>(
&self,
path: S,
) -> Result<(impl Stream<Item = Result<Bytes, S3Error>>, u16), S3Error> {
) -> Result<ResponseDataStream, S3Error> {
let command = Command::GetObject;
let request = RequestImpl::new(self, path.as_ref(), command);
request.response_data_to_stream().await
Expand Down Expand Up @@ -2322,6 +2315,10 @@ mod test {
)
)]
async fn streaming_test_put_get_delete_big_object() {
#[cfg(feature = "with-async-std")]
use async_std::stream::StreamExt;
#[cfg(feature = "with-tokio")]
use futures::StreamExt;
use std::fs::File;
use std::io::Write;

Expand All @@ -2343,14 +2340,26 @@ mod test {
assert_eq!(code, 200);
let mut writer = Vec::new();
let code = bucket
.get_object_stream(remote_path, &mut writer)
.get_object_to_writer(remote_path, &mut writer)
.await
.unwrap();
assert_eq!(code, 200);
assert_eq!(content, writer);
assert_eq!(content.len(), writer.len());
assert_eq!(content.len(), 20_000_000);

#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
{
let mut response_data_stream = bucket.get_object_stream(remote_path).await.unwrap();

let mut bytes = vec![];

while let Some(chunk) = response_data_stream.bytes().next().await {
bytes.push(chunk)
}
assert_ne!(bytes.len(), 0);
}

let response_data = bucket.delete_object(remote_path).await.unwrap();
assert_eq!(response_data.status_code(), 204);
std::fs::remove_file(local_path).unwrap_or_else(|_| {});
Expand Down Expand Up @@ -2388,7 +2397,7 @@ mod test {
assert_eq!(code, 200);
let mut writer = Vec::new();
let code = bucket
.get_object_stream(remote_path, &mut writer)
.get_object_to_writer(remote_path, &mut writer)
.await
.unwrap();
assert_eq!(code, 200);
Expand Down
45 changes: 7 additions & 38 deletions s3/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@ use bytes::Bytes;
use maybe_async::maybe_async;
use reqwest::{Client, Response};
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use time::OffsetDateTime;
use tokio_stream::Stream;

use crate::bucket::Bucket;
use crate::command::Command;
use crate::command::HttpMethod;
use crate::error::S3Error;
use crate::request_trait::{Request, ResponseData};
use crate::request_trait::{Request, ResponseData, ResponseDataStream};

use tokio_stream::StreamExt;

Expand All @@ -31,7 +28,6 @@ pub struct Reqwest<'a> {
impl<'a> Request for Reqwest<'a> {
type Response = reqwest::Response;
type HeaderMap = reqwest::header::HeaderMap;
type ResponseStream = GetObjectStream;

fn command(&self) -> Command {
self.command.clone()
Expand Down Expand Up @@ -157,12 +153,15 @@ impl<'a> Request for Reqwest<'a> {
Ok((headers, status_code))
}

async fn response_data_to_stream(&self) -> Result<(Self::ResponseStream, u16), S3Error> {
async fn response_data_to_stream(&self) -> Result<ResponseDataStream, S3Error> {
let response = self.response().await?;
let status_code = response.status();
let stream = response.bytes_stream();
let stream = response.bytes_stream().filter_map(|b| b.ok());

Ok((GetObjectStream::new(stream), status_code.as_u16()))
Ok(ResponseDataStream {
bytes: Box::pin(stream),
status_code: status_code.as_u16(),
})
}
}

Expand All @@ -178,36 +177,6 @@ impl<'a> Reqwest<'a> {
}
}

pub struct GetObjectStream {
inner: Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>>>>,
}

impl GetObjectStream {
pub(crate) fn new<S: 'static>(stream: S) -> Self
where
S: Stream<Item = Result<Bytes, reqwest::Error>>,
{
Self {
inner: Box::pin(stream),
}
}
}

impl Stream for GetObjectStream {
type Item = Result<Bytes, S3Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.inner.as_mut().poll_next(cx) {
Poll::Ready(v) => Poll::Ready(v.map(|v| v.map_err(S3Error::from))),
Poll::Pending => Poll::Pending,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

#[cfg(test)]
mod tests {
use crate::bucket::Bucket;
Expand Down
20 changes: 16 additions & 4 deletions s3/src/request_trait.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use hmac::Mac;
use std::collections::HashMap;
#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
use std::pin::Pin;
use time::format_description::well_known::Rfc2822;
use time::OffsetDateTime;
use url::Url;
Expand Down Expand Up @@ -30,6 +32,19 @@ pub struct ResponseData {
headers: HashMap<String, String>,
}

#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
pub struct ResponseDataStream {
pub bytes: Pin<Box<dyn Stream<Item = Bytes>>>,
pub status_code: u16,
}

#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
impl ResponseDataStream {
pub fn bytes(&mut self) -> &mut Pin<Box<dyn Stream<Item = Bytes>>> {
&mut self.bytes
}
}

impl From<ResponseData> for Vec<u8> {
fn from(data: ResponseData) -> Vec<u8> {
data.to_vec()
Expand Down Expand Up @@ -93,9 +108,6 @@ pub trait Request {
type Response;
type HeaderMap;

#[cfg(any(feature = "with-async-std", feature = "with-tokio"))]
type ResponseStream: Stream<Item = Result<Bytes, S3Error>>;

async fn response(&self) -> Result<Self::Response, S3Error>;
async fn response_data(&self, etag: bool) -> Result<ResponseData, S3Error>;
#[cfg(feature = "with-tokio")]
Expand All @@ -114,7 +126,7 @@ pub trait Request {
writer: &mut T,
) -> Result<u16, S3Error>;
#[cfg(any(feature = "with-async-std", feature = "with-tokio"))]
async fn response_data_to_stream(&self) -> Result<(Self::ResponseStream, u16), S3Error>;
async fn response_data_to_stream(&self) -> Result<ResponseDataStream, S3Error>;
async fn response_header(&self) -> Result<(Self::HeaderMap, u16), S3Error>;
fn datetime(&self) -> OffsetDateTime;
fn bucket(&self) -> Bucket;
Expand Down
Loading

0 comments on commit 8992587

Please sign in to comment.