Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ services:
APP_ENV: development
LOG: debug
TZ: "Europe/Paris"
EDGE_KEY: "eyJzZXJ2ZXJVcmwiOiJodHRwOi8vbG9jYWxob3N0Ojg4ODciLCJhZ2VudElkIjoiNTMwMjNkYjQtN2EzYS00ZTM0LTk0MWEtNjU2ZTNlNzE2NzlkIiwibWFzdGVyS2V5QjY0IjoiMUh0djdtWCtYVkJxL0IzUEV2WDlZZjlQeUdVZW5oRHlXemo5THRqNW90WT0ifQ=="
EDGE_KEY: "eyJzZXJ2ZXJVcmwiOiJodHRwOi8vbG9jYWxob3N0Ojg4ODciLCJhZ2VudElkIjoiNzM0NjU3Y2YtMGQzYy00Y2UwLTkyODQtZDJmOGYyMjI2MzgzIiwibWFzdGVyS2V5QjY0IjoiMUh0djdtWCtYVkJxL0IzUEV2WDlZZjlQeUdVZW5oRHlXemo5THRqNW90WT0ifQ=="
#CHUNK_SIZE_MB: "1"
#POOLING: 1
#DATABASES_CONFIG_FILE: "config.toml"
extra_hosts:
Expand Down
14 changes: 14 additions & 0 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct Settings {
pub pooling: usize,
pub timezone: String,
pub log: String,
pub chunk_size: usize, // bytes
}

impl Settings {
Expand All @@ -35,6 +36,18 @@ impl Settings {
pooling_seconds
);
}

let chunk_size_mb = env::var("CHUNK_SIZE_MB")
.unwrap_or_else(|_| "1".to_string())
.parse::<usize>()
.expect("CHUNK_SIZE_MB must be a valid positive integer");

if chunk_size_mb == 0 || chunk_size_mb > 10 {
panic!("CHUNK_SIZE_MB must be between 1 and 10 MB");
}

let chunk_size = chunk_size_mb * 1024 * 1024;

let tz = env::var("TZ").unwrap_or_else(|_| "UTC".to_string());

Self {
Expand All @@ -49,6 +62,7 @@ impl Settings {
pooling: pooling_seconds,
timezone: tz,
log: env::var("LOG").unwrap_or_else(|_| "info".into()),
chunk_size
}
}
}
Expand Down
31 changes: 24 additions & 7 deletions src/utils/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ use anyhow::Result;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use std::pin::Pin;
use tokio_util::io::ReaderStream;
use tokio::io::{AsyncReadExt};
use crate::settings::CONFIG;

pub struct UploadStream {
pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
}



pub async fn build_stream(
file_path: &std::path::Path,
encrypt: bool,
Expand All @@ -25,13 +28,27 @@ pub async fn build_stream(

Ok(UploadStream { stream })
} else {
let file = tokio::fs::File::open(file_path).await?;
let reader = ReaderStream::new(file);
let mut file = tokio::fs::File::open(file_path).await?;

let stream = Box::pin(
reader.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))),
);
let stream = async_stream::stream! {
let mut buffer = vec![0u8; CONFIG.chunk_size];

Ok(UploadStream { stream })
loop {
let n = match file.read(&mut buffer).await {
Ok(0) => break,
Ok(n) => n,
Err(e) => {
yield Err(e);
break;
}
};

yield Ok(Bytes::copy_from_slice(&buffer[..n]));
}
};

Ok(UploadStream {
stream: Box::pin(stream),
})
}
}
7 changes: 4 additions & 3 deletions src/utils/tus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use bytes::Bytes;
use futures::{Stream, StreamExt};
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
use tracing::{error, info};

const PATCH_CHUNK_SIZE: usize = 1 * 1024 * 1024;
use crate::settings::CONFIG;

pub async fn upload_to_tus_stream_with_headers<S>(
encrypted_stream: S,
Expand Down Expand Up @@ -66,7 +65,9 @@ where
while let Some(chunk) = stream.next().await {
let chunk = chunk.context("Stream produced IO error")?;

for sub_chunk in chunk.chunks(PATCH_CHUNK_SIZE) {
info!("Chunk: {:?}", CONFIG.chunk_size);

for sub_chunk in chunk.chunks(CONFIG.chunk_size) {
let mut patch_headers = extra_headers.clone();
patch_headers.insert("Tus-Resumable", HeaderValue::from_static("1.0.0"));
patch_headers.insert(
Expand Down
Loading