Skip to content

Commit

Permalink
fix(cubestore): put temporary downloads into a separate directory
Browse files Browse the repository at this point in the history
To fix a race when the cleanup process removes the partially downloaded file.
  • Loading branch information
ilya-biryukov committed Feb 17, 2021
1 parent c82583a commit 33986e9
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
5 changes: 3 additions & 2 deletions rust/cubestore/src/remotefs/gcs.rs
Expand Up @@ -73,12 +73,13 @@ impl RemoteFs for GCSRemoteFs {
async fn download_file(&self, remote_path: &str) -> Result<String, CubeError> {
let local_file = self.dir.as_path().join(remote_path);
let local_dir = local_file.parent().unwrap();
let downloads_dirs = local_dir.join("downloads");

fs::create_dir_all(local_dir).await?;
fs::create_dir_all(&downloads_dirs).await?;
if !local_file.exists() {
let time = SystemTime::now();
debug!("Downloading {}", remote_path);
let (temp_file, temp_path) = NamedTempFile::new_in(local_dir)?.into_parts();
let (temp_file, temp_path) = NamedTempFile::new_in(&downloads_dirs)?.into_parts();
let mut writer = BufWriter::new(tokio::fs::File::from_std(temp_file));
let mut stream = Object::download_streamed(
self.bucket.as_str(),
Expand Down
5 changes: 3 additions & 2 deletions rust/cubestore/src/remotefs/mod.rs
Expand Up @@ -111,11 +111,12 @@ impl RemoteFs for LocalDirRemoteFs {
async fn download_file(&self, remote_path: &str) -> Result<String, CubeError> {
let local_file = self.dir.as_path().join(remote_path);
let local_dir = local_file.parent().unwrap();
fs::create_dir_all(local_dir).await?;
let downloads_dir = local_dir.join("downloads");
fs::create_dir_all(&downloads_dir).await?;
if !local_file.exists() {
debug!("Downloading {}", remote_path);
let remote_dir = self.remote_dir.read().await;
let temp_path = NamedTempFile::new_in(local_dir)?.into_temp_path();
let temp_path = NamedTempFile::new_in(&downloads_dir)?.into_temp_path();
fs::copy(remote_dir.as_path().join(remote_path), &temp_path)
.await
.map_err(|e| {
Expand Down
7 changes: 4 additions & 3 deletions rust/cubestore/src/remotefs/s3.rs
Expand Up @@ -83,18 +83,19 @@ impl RemoteFs for S3RemoteFs {
async fn download_file(&self, remote_path: &str) -> Result<String, CubeError> {
let local_file = self.dir.as_path().join(remote_path);
let local_dir = local_file.parent().unwrap();
let downloads_dir = local_dir.join("downloads");

let local_file_str = local_file.to_str().unwrap().to_string(); // return value.

fs::create_dir_all(local_dir).await?;
fs::create_dir_all(&downloads_dir).await?;
if !local_file.exists() {
let time = SystemTime::now();
debug!("Downloading {}", remote_path);
let path = self.s3_path(remote_path);
let bucket = self.bucket.clone();
let status_code = tokio::task::spawn_blocking(move || -> Result<u16, CubeError> {
let local_dir = local_file.parent().unwrap();
let (mut temp_file, temp_path) = NamedTempFile::new_in(local_dir)?.into_parts();
let (mut temp_file, temp_path) =
NamedTempFile::new_in(&downloads_dir)?.into_parts();

let res = bucket.get_object_stream_blocking(path.as_str(), &mut temp_file)?;
temp_file.flush()?;
Expand Down

0 comments on commit 33986e9

Please sign in to comment.