Skip to content

Commit

Permalink
fix(cubestore): create metastore-current atomically, do not send co…
Browse files Browse the repository at this point in the history
…ntent-length to GCS

This fixes at least one potential race: the cleanup loop in queue fs
could delete the file while we were writing it.
We also guard against potential errors better now.

Note that any concurrent access to `metastore-current` can still result
in inconsistent state. We do not know any particular instances where
this happens, though.

The GCS change does not affect correctness of uploads, but guards
against potentially incorrect file sizes and make them more
discoverable, e.g. there is now a slightly higher chance to get an error
when reading the file.
  • Loading branch information
ilya-biryukov committed Apr 30, 2021
1 parent 458b837 commit a2a68a0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
17 changes: 11 additions & 6 deletions rust/cubestore/src/metastore/mod.rs
Expand Up @@ -1856,15 +1856,20 @@ impl RocksMetaStore {
v?;
}

let current_metastore_file = remote_fs.local_file("metastore-current").await?;
let uploads_dir = remote_fs.uploads_dir().await?;
let (file, file_path) = tokio::task::spawn_blocking(move || {
tempfile::Builder::new()
.prefix("metastore-current")
.tempfile_in(uploads_dir)
})
.await??
.into_parts();

{
let mut file = File::create(&current_metastore_file).await?;
tokio::io::AsyncWriteExt::write_all(&mut file, remote_path.as_bytes()).await?;
}
tokio::io::AsyncWriteExt::write_all(&mut fs::File::from_std(file), remote_path.as_bytes())
.await?;

remote_fs
.upload_file(&current_metastore_file, "metastore-current")
.upload_file(file_path.keep()?.to_str().unwrap(), "metastore-current")
.await?;

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions rust/cubestore/src/remotefs/gcs.rs
Expand Up @@ -52,13 +52,12 @@ impl RemoteFs for GCSRemoteFs {
let time = SystemTime::now();
debug!("Uploading {}", remote_path);
let file = File::open(temp_upload_path).await?;
let size = file.metadata().await?.len();
let stream = FramedRead::new(file, BytesCodec::new());
let stream = stream.map(|r| r.map(|b| b.to_vec()));
Object::create_streamed(
self.bucket.as_str(),
stream,
Some(size),
None,
self.gcs_path(remote_path).as_str(),
"application/octet-stream",
)
Expand Down
14 changes: 14 additions & 0 deletions rust/cubestore/src/remotefs/mod.rs
Expand Up @@ -43,6 +43,20 @@ pub trait RemoteFs: DIService + Send + Sync + Debug {
self.local_file(&format!("uploads/{}", remote_path)).await
}

/// Convention is to use this directory for creating files to be uploaded later.
async fn uploads_dir(&self) -> Result<String, CubeError> {
// Call to `temp_upload_path` ensures we created the uploads dir.
let file_in_dir = self
.temp_upload_path("never_created_remote_fs_file")
.await?;
Ok(Path::new(&file_in_dir)
.parent()
.unwrap()
.to_str()
.unwrap()
.to_owned())
}

/// In addition to uploading this file to the remote filesystem, this function moves the file
/// from `temp_upload_path` to `self.local_path(remote_path)` on the local file system.
async fn upload_file(&self, temp_upload_path: &str, remote_path: &str)
Expand Down

0 comments on commit a2a68a0

Please sign in to comment.