From a2a68a04ab89d4df30236fa175ddc1abde79503d Mon Sep 17 00:00:00 2001 From: Ilya Biryukov Date: Fri, 30 Apr 2021 12:27:05 +0300 Subject: [PATCH] fix(cubestore): create `metastore-current` atomically, do not send content-length to GCS This fixes at least one potential race: the cleanup loop in queue fs could delete the file while we were writing it. We also guard against potential errors better now. Note that any concurrent access to `metastore-current` can still result in inconsistent state. We do not know any particular instances where this happens, though. The GCS change does not affect correctness of uploads, but guards against potentially incorrect file sizes and make them more discoverable, e.g. there is now a slightly higher chance to get an error when reading the file. --- rust/cubestore/src/metastore/mod.rs | 17 +++++++++++------ rust/cubestore/src/remotefs/gcs.rs | 3 +-- rust/cubestore/src/remotefs/mod.rs | 14 ++++++++++++++ 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/rust/cubestore/src/metastore/mod.rs b/rust/cubestore/src/metastore/mod.rs index a1f8d24f0bfe..3f1867f33c14 100644 --- a/rust/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/src/metastore/mod.rs @@ -1856,15 +1856,20 @@ impl RocksMetaStore { v?; } - let current_metastore_file = remote_fs.local_file("metastore-current").await?; + let uploads_dir = remote_fs.uploads_dir().await?; + let (file, file_path) = tokio::task::spawn_blocking(move || { + tempfile::Builder::new() + .prefix("metastore-current") + .tempfile_in(uploads_dir) + }) + .await?? + .into_parts(); - { - let mut file = File::create(¤t_metastore_file).await?; - tokio::io::AsyncWriteExt::write_all(&mut file, remote_path.as_bytes()).await?; - } + tokio::io::AsyncWriteExt::write_all(&mut fs::File::from_std(file), remote_path.as_bytes()) + .await?; remote_fs - .upload_file(¤t_metastore_file, "metastore-current") + .upload_file(file_path.keep()?.to_str().unwrap(), "metastore-current") .await?; Ok(()) diff --git a/rust/cubestore/src/remotefs/gcs.rs b/rust/cubestore/src/remotefs/gcs.rs index 1c5ea7f65b38..6ad834b47514 100644 --- a/rust/cubestore/src/remotefs/gcs.rs +++ b/rust/cubestore/src/remotefs/gcs.rs @@ -52,13 +52,12 @@ impl RemoteFs for GCSRemoteFs { let time = SystemTime::now(); debug!("Uploading {}", remote_path); let file = File::open(temp_upload_path).await?; - let size = file.metadata().await?.len(); let stream = FramedRead::new(file, BytesCodec::new()); let stream = stream.map(|r| r.map(|b| b.to_vec())); Object::create_streamed( self.bucket.as_str(), stream, - Some(size), + None, self.gcs_path(remote_path).as_str(), "application/octet-stream", ) diff --git a/rust/cubestore/src/remotefs/mod.rs b/rust/cubestore/src/remotefs/mod.rs index 96d346d63c18..ec78fce2a165 100644 --- a/rust/cubestore/src/remotefs/mod.rs +++ b/rust/cubestore/src/remotefs/mod.rs @@ -43,6 +43,20 @@ pub trait RemoteFs: DIService + Send + Sync + Debug { self.local_file(&format!("uploads/{}", remote_path)).await } + /// Convention is to use this directory for creating files to be uploaded later. + async fn uploads_dir(&self) -> Result { + // Call to `temp_upload_path` ensures we created the uploads dir. + let file_in_dir = self + .temp_upload_path("never_created_remote_fs_file") + .await?; + Ok(Path::new(&file_in_dir) + .parent() + .unwrap() + .to_str() + .unwrap() + .to_owned()) + } + /// In addition to uploading this file to the remote filesystem, this function moves the file /// from `temp_upload_path` to `self.local_path(remote_path)` on the local file system. async fn upload_file(&self, temp_upload_path: &str, remote_path: &str)