From 60c8ffa14e9ce156c4db2b765464dc467f579afd Mon Sep 17 00:00:00 2001 From: Samir Aguiar Date: Tue, 24 May 2022 23:00:28 -0300 Subject: [PATCH] Allow setting the content type for multipart uploads --- s3/src/bucket.rs | 103 ++++++++++++++++++++++++++++++++++++---- s3/src/command.rs | 7 ++- s3/src/request_trait.rs | 2 +- 3 files changed, 100 insertions(+), 12 deletions(-) diff --git a/s3/src/bucket.rs b/s3/src/bucket.rs index 7fcb9c150e..904ca5fd0e 100644 --- a/s3/src/bucket.rs +++ b/s3/src/bucket.rs @@ -834,7 +834,12 @@ impl Bucket { reader: &mut R, s3_path: impl AsRef, ) -> Result { - self._put_object_stream(reader, s3_path.as_ref()).await + self._put_object_stream_with_content_type( + reader, + s3_path.as_ref(), + "application/octet-stream", + ) + .await } #[maybe_async::sync_impl] @@ -843,14 +848,89 @@ impl Bucket { reader: &mut R, s3_path: impl AsRef, ) -> Result { - self._put_object_stream(reader, s3_path.as_ref()) + self._put_object_stream_with_content_type( + reader, + s3_path.as_ref(), + "application/octet-stream", + ) + } + + /// Stream file from local path to s3, generic over T: Write with explicit content type. + /// + /// # Example: + /// + /// ```rust,no_run + /// use s3::bucket::Bucket; + /// use s3::creds::Credentials; + /// use anyhow::Result; + /// use std::fs::File; + /// use std::io::Write; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// + /// let bucket_name = "rust-s3-test"; + /// let region = "us-east-1".parse()?; + /// let credentials = Credentials::default()?; + /// let bucket = Bucket::new(bucket_name, region, credentials)?; + /// let path = "path"; + /// let test: Vec = (0..1000).map(|_| 42).collect(); + /// let mut file = File::create(path)?; + /// file.write_all(&test)?; + /// + /// #[cfg(feature = "with-tokio")] + /// let mut path = tokio::fs::File::open(path).await?; + /// + /// #[cfg(feature = "with-async-std")] + /// let mut path = async_std::fs::File::open(path).await?; + /// // Async variant with `tokio` or `async-std` features + /// // Generic over futures_io::AsyncRead|tokio::io::AsyncRead + Unpin + /// let status_code = bucket + /// .put_object_stream_with_content_type(&mut path, "/path", "application/octet-stream") + /// .await?; + /// + /// // `sync` feature will produce an identical method + /// #[cfg(feature = "sync")] + /// // Generic over std::io::Read + /// let status_code = bucket + /// .put_object_stream_with_content_type(&mut path, "/path", "application/octet-stream")?; + /// + /// // Blocking variant, generated with `blocking` feature in combination + /// // with `tokio` or `async-std` features. + /// #[cfg(feature = "blocking")] + /// let status_code = bucket + /// .put_object_stream_with_content_type_blocking(&mut path, "/path", "application/octet-stream")?; + /// # + /// # Ok(()) + /// # } + /// ``` + #[maybe_async::async_impl] + pub async fn put_object_stream_with_content_type( + &self, + reader: &mut R, + s3_path: impl AsRef, + content_type: impl AsRef, + ) -> Result { + self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref()) + .await + } + + #[maybe_async::sync_impl] + pub fn put_object_stream_with_content_type( + &self, + reader: &mut R, + s3_path: impl AsRef, + content_type: impl AsRef, + ) -> Result { + self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref()) } #[maybe_async::async_impl] - async fn _put_object_stream( + async fn _put_object_stream_with_content_type( &self, reader: &mut R, s3_path: &str, + content_type: &str, ) -> Result { // If the file is smaller CHUNK_SIZE, just do a regular upload. // Otherwise perform a multi-part upload. @@ -863,7 +943,7 @@ impl Bucket { return Ok(code); } - let command = Command::InitiateMultipartUpload; + let command = Command::InitiateMultipartUpload { content_type }; let request = RequestImpl::new(self, s3_path, command); let (data, code) = request.response_data(false).await?; if code >= 300 { @@ -885,8 +965,8 @@ impl Bucket { let command = Command::PutObject { // part_number, content: &chunk, - content_type: "application/octet-stream", multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id, + content_type, }; let request = RequestImpl::new(self, &path, command); let (data, _code) = request.response_data(true).await?; @@ -923,8 +1003,13 @@ impl Bucket { } #[maybe_async::sync_impl] - fn _put_object_stream(&self, reader: &mut R, s3_path: &str) -> Result { - let command = Command::InitiateMultipartUpload; + fn _put_object_stream_with_content_type( + &self, + reader: &mut R, + s3_path: &str, + content_type: &str, + ) -> Result { + let command = Command::InitiateMultipartUpload { content_type }; let request = RequestImpl::new(self, s3_path, command); let (data, code) = request.response_data(false)?; if code >= 300 { @@ -952,8 +1037,8 @@ impl Bucket { let command = Command::PutObject { // part_number, content: &chunk, - content_type: "application/octet-stream", multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id, + content_type, }; let request = RequestImpl::new(self, &path, command); let (data, _code) = request.response_data(true)?; @@ -981,8 +1066,8 @@ impl Bucket { part_number += 1; let command = Command::PutObject { content: &chunk, - content_type: "application/octet-stream", multipart: Some(Multipart::new(part_number, upload_id)), + content_type, }; let request = RequestImpl::new(self, &path, command); let (data, _code) = request.response_data(true)?; diff --git a/s3/src/command.rs b/s3/src/command.rs index 756b96b777..34d53f02fb 100644 --- a/s3/src/command.rs +++ b/s3/src/command.rs @@ -105,7 +105,9 @@ pub enum Command<'a> { PresignDelete { expiry_secs: u32, }, - InitiateMultipartUpload, + InitiateMultipartUpload { + content_type: &'a str, + }, UploadPart { part_number: u32, content: &'a [u8], @@ -147,7 +149,7 @@ impl<'a> Command<'a> { | Command::AbortMultipartUpload { .. } | Command::PresignDelete { .. } | Command::DeleteBucket => HttpMethod::Delete, - Command::InitiateMultipartUpload | Command::CompleteMultipartUpload { .. } => { + Command::InitiateMultipartUpload { .. } | Command::CompleteMultipartUpload { .. } => { HttpMethod::Post } Command::HeadObject => HttpMethod::Head, @@ -174,6 +176,7 @@ impl<'a> Command<'a> { pub fn content_type(&self) -> String { match self { + Command::InitiateMultipartUpload { content_type } => content_type.to_string(), Command::PutObject { content_type, .. } => content_type.to_string(), Command::CompleteMultipartUpload { .. } => "application/xml".into(), _ => "text/plain".into(), diff --git a/s3/src/request_trait.rs b/s3/src/request_trait.rs index 80f1e9a366..43dcabf5e5 100644 --- a/s3/src/request_trait.rs +++ b/s3/src/request_trait.rs @@ -200,7 +200,7 @@ pub trait Request { // Append to url_path #[allow(clippy::collapsible_match)] match self.command() { - Command::InitiateMultipartUpload | Command::ListMultipartUploads { .. } => { + Command::InitiateMultipartUpload { .. } | Command::ListMultipartUploads { .. } => { url_str.push_str("?uploads") } Command::AbortMultipartUpload { upload_id } => {