Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cubestore): cleanup warmup downloads on partition removal #2025

Merged
merged 1 commit into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/cubestore/src/cluster/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub enum NetworkMessage {

WarmupDownload(/*remote_path*/ String),
WarmupDownloadResult(Result<(), CubeError>),
WarmupCleanup(/*remote_path*/ String),
WarmupCleanupResult(Result<(), CubeError>),

MetaStoreCall(MetaStoreRpcMethodCall),
MetaStoreCallResult(MetaStoreRpcMethodResult),
Expand Down
20 changes: 19 additions & 1 deletion rust/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub trait Cluster: Send + Sync {
fn server_name(&self) -> &str;

async fn warmup_download(&self, node_name: &str, remote_path: String) -> Result<(), CubeError>;
async fn warmup_cleanup(&self, node_name: &str, remote_path: String) -> Result<(), CubeError>;

fn job_result_listener(&self) -> JobResultListener;

Expand Down Expand Up @@ -182,6 +183,17 @@ impl Cluster for ClusterImpl {
}
}

async fn warmup_cleanup(&self, node_name: &str, remote_path: String) -> Result<(), CubeError> {
// We only wait for the result is to ensure our request is delivered.
let response = self
.send_or_process_locally(node_name, NetworkMessage::WarmupCleanup(remote_path))
.await?;
match response {
NetworkMessage::WarmupCleanupResult(r) => r,
_ => panic!("unexpected result for warmup cleanup"),
}
}

fn job_result_listener(&self) -> JobResultListener {
JobResultListener {
receiver: self.event_sender.subscribe(),
Expand Down Expand Up @@ -578,7 +590,13 @@ impl ClusterImpl {
let res = self.remote_fs.download_file(&remote_path).await;
NetworkMessage::WarmupDownloadResult(res.map(|_| ()))
}
NetworkMessage::SelectResult(_) | NetworkMessage::WarmupDownloadResult(_) => {
NetworkMessage::WarmupCleanup(remote_path) => {
let res = self.remote_fs.delete_local_copy(&remote_path).await;
NetworkMessage::WarmupCleanupResult(res.map(|_| ()))
}
NetworkMessage::SelectResult(_)
| NetworkMessage::WarmupDownloadResult(_)
| NetworkMessage::WarmupCleanupResult(_) => {
panic!("result sent to worker");
}
NetworkMessage::MetaStoreCall(_) | NetworkMessage::MetaStoreCallResult(_) => {
Expand Down
18 changes: 12 additions & 6 deletions rust/cubestore/src/remotefs/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,21 @@ impl RemoteFs for GCSRemoteFs {
Object::delete(self.bucket.as_str(), self.gcs_path(remote_path).as_str()).await?;
info!("Deleting {} ({:?})", remote_path, time.elapsed()?);

self.delete_local_copy(remote_path).await
}

async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError> {
let _guard = self.delete_mut.lock().await;
let local = self.dir.as_path().join(remote_path);
if fs::metadata(local.clone()).await.is_ok() {
fs::remove_file(local.clone()).await?;
LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone())
.await?;
if let Err(e) = fs::remove_file(local.clone()).await {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(());
} else {
return Err(e)?;
}
}

Ok(())
// We have removed a file, cleanup.
LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()).await
}

async fn list(&self, remote_prefix: &str) -> Result<Vec<String>, CubeError> {
Expand Down
24 changes: 17 additions & 7 deletions rust/cubestore/src/remotefs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ pub trait RemoteFs: Send + Sync + Debug {

async fn delete_file(&self, remote_path: &str) -> Result<(), CubeError>;

/// Deletes the local copy of `remote_path`, if the latter exists.
/// If no file was downloaded, nothing happens.
async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError>;

async fn list(&self, remote_prefix: &str) -> Result<Vec<String>, CubeError>;

async fn list_with_metadata(&self, remote_prefix: &str) -> Result<Vec<RemoteFile>, CubeError>;
Expand Down Expand Up @@ -114,15 +118,21 @@ impl RemoteFs for LocalDirRemoteFs {
}
}

let _local_guard = self.dir_delete_mut.lock().await;
self.delete_local_copy(remote_path).await
}

async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError> {
let _guard = self.dir_delete_mut.lock().await;
let local = self.dir.as_path().join(remote_path);
if fs::metadata(local.clone()).await.is_ok() {
fs::remove_file(local.clone()).await?;
LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone())
.await?;
if let Err(e) = fs::remove_file(local.clone()).await {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(());
} else {
return Err(e)?;
}
}

Ok(())
// We have removed a file, cleanup.
LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()).await
}

async fn list(&self, remote_prefix: &str) -> Result<Vec<String>, CubeError> {
Expand Down
4 changes: 4 additions & 0 deletions rust/cubestore/src/remotefs/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ impl RemoteFs for QueueRemoteFs {
}
}

async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError> {
self.remote_fs.delete_local_copy(remote_path).await
}

async fn list(&self, remote_prefix: &str) -> Result<Vec<String>, CubeError> {
self.remote_fs.list(remote_prefix).await
}
Expand Down
18 changes: 12 additions & 6 deletions rust/cubestore/src/remotefs/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,21 @@ impl RemoteFs for S3RemoteFs {
)));
}

self.delete_local_copy(remote_path).await
}

async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError> {
let _guard = self.delete_mut.lock().await;
let local = self.dir.as_path().join(remote_path);
if fs::metadata(local.clone()).await.is_ok() {
fs::remove_file(local.clone()).await?;
LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone())
.await?;
if let Err(e) = fs::remove_file(local.clone()).await {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(());
} else {
return Err(e)?;
}
}

Ok(())
// We have removed a file, cleanup.
LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()).await
}

async fn list(&self, remote_prefix: &str) -> Result<Vec<String>, CubeError> {
Expand Down
16 changes: 16 additions & 0 deletions rust/cubestore/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ impl SchedulerImpl {
if partition.get_row().is_active() {
if let Some(file_name) = partition.get_row().get_full_name(partition.get_id()) {
self.remote_fs.delete_file(file_name.as_str()).await?;
self.schedule_partition_cleanup(partition.get_id(), file_name)
.await?;
}
}
}
Expand All @@ -162,6 +164,8 @@ impl SchedulerImpl {
{
if let Some(file_name) = partition.get_row().get_full_name(partition.get_id()) {
self.remote_fs.delete_file(file_name.as_str()).await?;
self.schedule_partition_cleanup(partition.get_id(), file_name)
.await?;
}
}
}
Expand Down Expand Up @@ -275,4 +279,16 @@ impl SchedulerImpl {
.await?;
self.cluster.warmup_download(&node_name, path).await
}

async fn schedule_partition_cleanup(
&self,
partition_id: u64,
path: String,
) -> Result<(), CubeError> {
let node_name = self
.cluster
.node_name_by_partitions(&[partition_id])
.await?;
self.cluster.warmup_cleanup(&node_name, path).await
}
}